feat: enhance Google connectors indexing with content extraction and document migration

- Added `download_and_extract_content` function to extract content from Google Drive files as markdown.
- Updated Google Drive indexer to utilize the new content extraction method.
- Implemented document migration logic to update legacy Composio document types to their native Google types.
- Introduced identifier hashing for stable document identification.
- Improved file pre-filtering to handle unchanged and rename-only files efficiently.
This commit is contained in:
Anish Sarkar 2026-03-25 18:33:44 +05:30
parent 2da6fd89ea
commit f7b52470eb
8 changed files with 951 additions and 1588 deletions

View file

@ -1,9 +1,8 @@
"""
Google Calendar connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding.
"""
import time
@ -15,29 +14,25 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.google_calendar_connector import GoogleCalendarConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import (
compute_content_hash,
compute_unique_identifier_hash,
)
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
parse_date_flexible,
safe_set_chunks,
update_connector_last_indexed,
)
@ -46,13 +41,60 @@ ACCEPTED_CALENDAR_CONNECTOR_TYPES = {
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
}
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
event: dict,
event_markdown: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Google Calendar API event dict to a ConnectorDocument."""
event_id = event.get("id", "")
event_summary = event.get("summary", "No Title")
calendar_id = event.get("calendarId", "")
start = event.get("start", {})
end = event.get("end", {})
start_time = start.get("dateTime") or start.get("date", "")
end_time = end.get("dateTime") or end.get("date", "")
location = event.get("location", "")
metadata = {
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"connector_id": connector_id,
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
fallback_summary = (
f"Google Calendar Event: {event_summary}\n\n{event_markdown}"
)
return ConnectorDocument(
title=event_summary,
source_markdown=event_markdown,
unique_id=event_id,
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_google_calendar_events(
session: AsyncSession,
connector_id: int,
@ -82,7 +124,6 @@ async def index_google_calendar_events(
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_calendar_events_indexing",
source="connector_indexing_task",
@ -96,7 +137,7 @@ async def index_google_calendar_events(
)
try:
# Accept both native and Composio Calendar connectors
# ── Connector lookup ──────────────────────────────────────────
connector = None
for ct in ACCEPTED_CALENDAR_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
@ -112,7 +153,7 @@ async def index_google_calendar_events(
)
return 0, 0, f"Connector with ID {connector_id} not found"
# Build credentials based on connector type
# ── Credential building ───────────────────────────────────────
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
@ -184,6 +225,7 @@ async def index_google_calendar_events(
)
return 0, 0, "Google Calendar credentials not found in connector config"
# ── Calendar client init ──────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Initializing Google Calendar client for connector {connector_id}",
@ -203,36 +245,26 @@ async def index_google_calendar_events(
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range
# For calendar connectors, allow future dates to index upcoming events
# ── Date range calculation ────────────────────────────────────
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
# Default to today (users can manually select future dates if needed)
calculated_end_date = datetime.now()
# Use last_indexed_at as start date if available, otherwise use 30 days ago
if connector.last_indexed_at:
# Convert dates to be comparable (both timezone-naive)
last_indexed_naive = (
connector.last_indexed_at.replace(tzinfo=None)
if connector.last_indexed_at.tzinfo
else connector.last_indexed_at
)
# Allow future dates - use last_indexed_at as start date
calculated_start_date = last_indexed_naive
logger.info(
f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
)
else:
calculated_start_date = datetime.now() - timedelta(
days=365
) # Use 365 days as default for calendar events (matches frontend)
calculated_start_date = datetime.now() - timedelta(days=365)
logger.info(
f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
)
# Use calculated dates if not provided
start_date_str = (
start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
)
@ -240,19 +272,14 @@ async def index_google_calendar_events(
end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
)
else:
# Use provided dates (including future dates)
start_date_str = start_date
end_date_str = end_date
# FIX: Ensure end_date is at least 1 day after start_date to avoid
# "start_date must be strictly before end_date" errors when dates are the same
# (e.g., when last_indexed_at is today)
if start_date_str == end_date_str:
logger.info(
f"Start date ({start_date_str}) equals end date ({end_date_str}), "
"adjusting end date to next day to ensure valid date range"
)
# Parse end_date and add 1 day
try:
end_dt = parse_date_flexible(end_date_str)
except ValueError:
@ -264,6 +291,7 @@ async def index_google_calendar_events(
end_date_str = end_dt.strftime("%Y-%m-%d")
logger.info(f"Adjusted end date to {end_date_str}")
# ── Fetch events ──────────────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Calendar events from {start_date_str} to {end_date_str}",
@ -274,27 +302,19 @@ async def index_google_calendar_events(
},
)
# Get events within date range from primary calendar
try:
events, error = await calendar_client.get_all_primary_calendar_events(
start_date=start_date_str, end_date=end_date_str
)
if error:
# Don't treat "No events found" as an error that should stop indexing
if "No events found" in error:
logger.info(f"No Google Calendar events found: {error}")
logger.info(
"No events found is not a critical error, continuing with update"
)
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
logger.info(
f"Updated last_indexed_at to {connector.last_indexed_at} despite no events found"
)
await task_logger.log_task_success(
log_entry,
@ -304,7 +324,6 @@ async def index_google_calendar_events(
return 0, 0, None
else:
logger.error(f"Failed to get Google Calendar events: {error}")
# Check if this is an authentication error that requires re-authentication
error_message = error
error_type = "APIError"
if (
@ -329,28 +348,15 @@ async def index_google_calendar_events(
logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True)
return 0, 0, f"Error fetching Google Calendar events: {e!s}"
documents_indexed = 0
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
documents_failed = 0 # Track events that failed processing
duplicate_content_count = (
0 # Track events skipped due to duplicate content_hash
)
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all events, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
events_to_process = [] # List of dicts with document and event data
new_documents_created = False
duplicate_content_count = 0
for event in events:
try:
event_id = event.get("id")
event_summary = event.get("summary", "No Title")
calendar_id = event.get("calendarId", "")
if not event_id:
logger.warning(f"Skipping event with missing ID: {event_summary}")
@ -363,223 +369,73 @@ async def index_google_calendar_events(
documents_skipped += 1
continue
start = event.get("start", {})
end = event.get("end", {})
start_time = start.get("dateTime") or start.get("date", "")
end_time = end.get("dateTime") or end.get("date", "")
location = event.get("location", "")
description = event.get("description", "")
# Generate unique identifier hash for this Google Calendar event
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_CALENDAR_CONNECTOR, event_id, search_space_id
doc = _build_connector_doc(
event,
event_markdown,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
# Generate content hash
content_hash = generate_content_hash(event_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
event_id,
search_space_id,
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = (
unique_identifier_hash
)
if (
existing_document.document_type
== DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
existing_document.document_type = (
DocumentType.GOOGLE_CALENDAR_CONNECTOR
)
logger.info(
f"Migrated legacy Composio Calendar document: {event_id}"
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
events_to_process.append(
{
"document": existing_document,
"is_new": False,
"event_markdown": event_markdown,
"content_hash": content_hash,
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"description": description,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
duplicate = await check_duplicate_document_by_hash(
session, compute_content_hash(doc)
)
if duplicate_by_content:
# A document with the same content already exists (likely from Composio connector)
if duplicate:
logger.info(
f"Event {event_summary} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content."
f"Event {doc.title} already indexed by another connector "
f"(existing document ID: {duplicate.id}, "
f"type: {duplicate.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=event_summary,
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
document_metadata={
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
events_to_process.append(
{
"document": document,
"is_new": True,
"event_markdown": event_markdown,
"content_hash": content_hash,
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"description": description,
}
)
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
documents_failed += 1
logger.error(f"Error building ConnectorDocument for event: {e!s}", exc_info=True)
documents_skipped += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents"
)
await session.commit()
# ── Pipeline: migrate legacy docs + prepare + index ───────────
pipeline = IndexingPipelineService(session)
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(events_to_process)} documents")
await pipeline.migrate_legacy_docs(connector_docs)
for item in events_to_process:
# Send heartbeat periodically
documents = await pipeline.prepare_for_indexing(connector_docs)
doc_map = {
compute_unique_identifier_hash(cd): cd for cd in connector_docs
}
documents_indexed = 0
documents_failed = 0
last_heartbeat_time = time.time()
for document in documents:
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
connector_doc = doc_map.get(document.unique_identifier_hash)
if connector_doc is None:
logger.warning(
f"No matching ConnectorDocument for document {document.id}, skipping"
)
documents_failed += 1
continue
# Heavy processing (LLM, embeddings, chunks)
try:
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"event_id": item["event_id"],
"event_summary": item["event_summary"],
"calendar_id": item["calendar_id"],
"start_time": item["start_time"],
"end_time": item["end_time"],
"location": item["location"] or "No location",
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["event_markdown"], user_llm, document_metadata_for_summary
)
else:
summary_content = f"Google Calendar Event: {item['event_summary']}\n\n{item['event_markdown']}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["event_markdown"])
# Update document to READY with actual content
document.title = item["event_summary"]
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"event_id": item["event_id"],
"event_summary": item["event_summary"],
"calendar_id": item["calendar_id"],
"start_time": item["start_time"],
"end_time": item["end_time"],
"location": item["location"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
await pipeline.index(document, connector_doc, user_llm)
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
@ -588,21 +444,12 @@ async def index_google_calendar_events(
except Exception as e:
logger.error(f"Error processing Calendar event: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} Google Calendar events processed"
)
@ -612,22 +459,18 @@ async def index_google_calendar_events(
"Successfully committed all Google Calendar document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")

View file

@ -1,11 +1,11 @@
"""
Google Gmail connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding.
"""
import logging
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
@ -15,21 +15,15 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.google_gmail_connector import GoogleGmailConnector
from app.db import (
Document,
DocumentStatus,
DocumentType,
SearchSourceConnectorType,
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import (
compute_content_hash,
compute_unique_identifier_hash,
)
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
@ -37,12 +31,9 @@ from app.utils.google_credentials import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -51,13 +42,70 @@ ACCEPTED_GMAIL_CONNECTOR_TYPES = {
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
}
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
message: dict,
markdown_content: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Gmail API message dict to a ConnectorDocument."""
message_id = message.get("id", "")
thread_id = message.get("threadId", "")
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = "Unknown Date"
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
fallback_summary = (
f"Google Gmail Message: {subject}\n\n"
f"From: {sender}\nDate: {date_str}\n\n"
f"{markdown_content}"
)
return ConnectorDocument(
title=subject,
source_markdown=markdown_content,
unique_id=message_id,
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_google_gmail_messages(
session: AsyncSession,
connector_id: int,
@ -80,7 +128,7 @@ async def index_google_gmail_messages(
start_date: Start date for filtering messages (YYYY-MM-DD format)
end_date: End date for filtering messages (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
max_messages: Maximum number of messages to fetch (default: 100)
max_messages: Maximum number of messages to fetch (default: 1000)
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
@ -88,7 +136,6 @@ async def index_google_gmail_messages(
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_gmail_messages_indexing",
source="connector_indexing_task",
@ -103,7 +150,7 @@ async def index_google_gmail_messages(
)
try:
# Accept both native and Composio Gmail connectors
# ── Connector lookup ──────────────────────────────────────────
connector = None
for ct in ACCEPTED_GMAIL_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
@ -117,7 +164,7 @@ async def index_google_gmail_messages(
)
return 0, 0, error_msg
# Build credentials based on connector type
# ── Credential building ───────────────────────────────────────
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
@ -189,6 +236,7 @@ async def index_google_gmail_messages(
)
return 0, 0, "Google gmail credentials not found in connector config"
# ── Gmail client init ─────────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Initializing Google gmail client for connector {connector_id}",
@ -199,14 +247,11 @@ async def index_google_gmail_messages(
credentials, session, user_id, connector_id
)
# Calculate date range using last_indexed_at if dates not provided
# This ensures Gmail uses the same date logic as other connectors
# (uses last_indexed_at → now, or 365 days back for first-time indexing)
calculated_start_date, calculated_end_date = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Fetch recent Google gmail messages
# ── Fetch messages ────────────────────────────────────────────
logger.info(
f"Fetching emails for connector {connector_id} "
f"from {calculated_start_date} to {calculated_end_date}"
@ -218,7 +263,6 @@ async def index_google_gmail_messages(
)
if error:
# Check if this is an authentication error that requires re-authentication
error_message = error
error_type = "APIError"
if (
@ -243,263 +287,92 @@ async def index_google_gmail_messages(
logger.info(f"Found {len(messages)} Google gmail messages to index")
documents_indexed = 0
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
documents_failed = 0 # Track messages that failed processing
duplicate_content_count = (
0 # Track messages skipped due to duplicate content_hash
)
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all messages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
new_documents_created = False
duplicate_content_count = 0
for message in messages:
try:
# Extract message information
message_id = message.get("id", "")
thread_id = message.get("threadId", "")
# Extract headers for subject and sender
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = "Unknown Date"
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
if not message_id:
logger.warning(f"Skipping message with missing ID: {subject}")
logger.warning("Skipping message with missing ID")
documents_skipped += 1
continue
# Format message to markdown
markdown_content = gmail_connector.format_message_to_markdown(message)
if not markdown_content.strip():
logger.warning(f"Skipping message with no content: {subject}")
logger.warning(f"Skipping message with no content: {message_id}")
documents_skipped += 1
continue
# Generate unique identifier hash for this Gmail message
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_GMAIL_CONNECTOR, message_id, search_space_id
doc = _build_connector_doc(
message,
markdown_content,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
# Generate content hash
content_hash = generate_content_hash(markdown_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GMAIL_CONNECTOR,
message_id,
search_space_id,
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = (
unique_identifier_hash
)
if (
existing_document.document_type
== DocumentType.COMPOSIO_GMAIL_CONNECTOR
):
existing_document.document_type = (
DocumentType.GOOGLE_GMAIL_CONNECTOR
)
logger.info(
f"Migrated legacy Composio Gmail document: {message_id}"
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date_str": date_str,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
duplicate = await check_duplicate_document_by_hash(
session, compute_content_hash(doc)
)
if duplicate_by_content:
if duplicate:
logger.info(
f"Gmail message {subject} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
f"Gmail message {doc.title} already indexed by another connector "
f"(existing document ID: {duplicate.id}, "
f"type: {duplicate.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=subject,
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
document_metadata={
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
messages_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date_str": date_str,
}
)
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True)
documents_failed += 1
logger.error(f"Error building ConnectorDocument for message: {e!s}", exc_info=True)
documents_skipped += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
)
await session.commit()
# ── Pipeline: migrate legacy docs + prepare + index ───────────
pipeline = IndexingPipelineService(session)
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
await pipeline.migrate_legacy_docs(connector_docs)
for item in messages_to_process:
# Send heartbeat periodically
documents = await pipeline.prepare_for_indexing(connector_docs)
doc_map = {
compute_unique_identifier_hash(cd): cd for cd in connector_docs
}
documents_indexed = 0
documents_failed = 0
last_heartbeat_time = time.time()
for document in documents:
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
connector_doc = doc_map.get(document.unique_identifier_hash)
if connector_doc is None:
logger.warning(
f"No matching ConnectorDocument for document {document.id}, skipping"
)
documents_failed += 1
continue
# Heavy processing (LLM, embeddings, chunks)
try:
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"message_id": item["message_id"],
"thread_id": item["thread_id"],
"subject": item["subject"],
"sender": item["sender"],
"date": item["date_str"],
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
summary_content = f"Google Gmail Message: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}\n\n{item['markdown_content']}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item["subject"]
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"message_id": item["message_id"],
"thread_id": item["thread_id"],
"subject": item["subject"],
"sender": item["sender"],
"date": item["date_str"],
"connector_id": connector_id,
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
await pipeline.index(document, connector_doc, user_llm)
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Gmail messages processed so far"
@ -508,21 +381,12 @@ async def index_google_gmail_messages(
except Exception as e:
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed")
try:
await session.commit()
@ -530,22 +394,18 @@ async def index_google_gmail_messages(
"Successfully committed all Google Gmail document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same message was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
@ -555,7 +415,6 @@ async def index_google_gmail_messages(
total_processed = documents_indexed
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google Gmail indexing for connector {connector_id}",