feat: enhance Gmail and Google Drive connectors with document status management and duplicate content checks

This commit is contained in:
Anish Sarkar 2026-02-05 22:59:56 +05:30
parent 6cd3f5c1f6
commit 5042fbfb85
4 changed files with 708 additions and 467 deletions

View file

@ -16,11 +16,15 @@ from sqlalchemy.orm import selectinload
from app.config import config
from app.connectors.composio_connector import ComposioConnector
from app.db import Document, DocumentType
from app.db import Document, DocumentStatus, DocumentType
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import calculate_date_range
from app.tasks.connector_indexers.base import (
calculate_date_range,
check_duplicate_document_by_hash,
safe_set_chunks,
)
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
@ -206,26 +210,24 @@ class ComposioGmailConnector(ComposioConnector):
# ============ Indexer Functions ============
async def _process_gmail_message_batch(
async def _analyze_gmail_messages_phase1(
session: AsyncSession,
messages: list[dict[str, Any]],
composio_connector: ComposioGmailConnector,
connector_id: int,
search_space_id: int,
user_id: str,
total_documents_indexed: int = 0,
) -> tuple[int, int]:
) -> tuple[list[dict[str, Any]], int, int]:
"""
Process a batch of Gmail messages and index them.
Args:
total_documents_indexed: Running total of documents indexed so far (for batch commits).
Phase 1: Analyze all messages, create pending documents.
Makes ALL documents visible in the UI immediately with pending status.
Returns:
Tuple of (documents_indexed, documents_skipped)
Tuple of (messages_to_process, documents_skipped, duplicate_content_count)
"""
documents_indexed = 0
messages_to_process = []
documents_skipped = 0
duplicate_content_count = 0
for message in messages:
try:
@ -235,11 +237,7 @@ async def _process_gmail_message_batch(
documents_skipped += 1
continue
# Composio's GMAIL_FETCH_EMAILS already returns full message content
# No need for a separate detail API call
# Extract message info from Composio response
# Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds
payload = message.get("payload", {})
headers = payload.get("headers", [])
@ -262,7 +260,7 @@ async def _process_gmail_message_batch(
message
)
# Check for empty content (defensive parsing per Composio best practices)
# Check for empty content
if not markdown_content.strip():
logger.warning(f"Skipping Gmail message with no content: {subject}")
documents_skipped += 1
@ -280,99 +278,51 @@ async def _process_gmail_message_batch(
session, unique_identifier_hash
)
# Get label IDs from Composio response
# Get label IDs and thread_id from Composio response
label_ids = message.get("labelIds", [])
# Extract thread_id if available (for consistency with non-Composio implementation)
thread_id = message.get("threadId", "") or message.get("thread_id", "")
if existing_document:
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Update existing
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"document_type": "Gmail Message (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
existing_document.title = subject
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"labels": label_ids,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
current_total = total_documents_indexed + documents_indexed
if current_total % 10 == 0:
logger.info(
f"Committing batch: {current_total} Gmail messages processed so far"
)
await session.commit()
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'message_id': message_id,
'thread_id': thread_id,
'subject': subject,
'sender': sender,
'date_str': date_str,
'label_ids': label_ids,
})
continue
# Create new document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"document_type": "Gmail Message (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from standard connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
chunks = await create_document_chunks(markdown_content)
if duplicate_by_content:
logger.info(
f"Message {subject} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=subject,
@ -388,39 +338,138 @@ async def _process_gmail_message_batch(
"toolkit_id": "gmail",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
messages_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'message_id': message_id,
'thread_id': thread_id,
'subject': subject,
'sender': sender,
'date_str': date_str,
'label_ids': label_ids,
})
except Exception as e:
logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True)
documents_skipped += 1
continue
return messages_to_process, documents_skipped, duplicate_content_count
async def _process_gmail_messages_phase2(
session: AsyncSession,
messages_to_process: list[dict[str, Any]],
connector_id: int,
search_space_id: int,
user_id: str,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int]:
"""
Phase 2: Process each document one by one.
Each document transitions: pending processing ready/failed
Returns:
Tuple of (documents_indexed, documents_failed)
"""
documents_indexed = 0
documents_failed = 0
last_heartbeat_time = time.time()
for item in messages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata_for_summary = {
"message_id": item['message_id'],
"thread_id": item['thread_id'],
"subject": item['subject'],
"sender": item['sender'],
"document_type": "Gmail Message (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
)
else:
summary_content = (
f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
# Update document to READY with actual content
document.title = item['subject']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"message_id": item['message_id'],
"thread_id": item['thread_id'],
"subject": item['subject'],
"sender": item['sender'],
"date": item['date_str'],
"labels": item['label_ids'],
"connector_id": connector_id,
"source": "composio",
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents
current_total = total_documents_indexed + documents_indexed
if current_total % 10 == 0:
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {current_total} Gmail messages processed so far"
f"Committing batch: {documents_indexed} Gmail messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
documents_skipped += 1
# Rollback on error to avoid partial state (per Composio best practices)
# Mark document as failed with reason (visible in UI)
try:
await session.rollback()
except Exception as rollback_error:
logger.error(
f"Error during rollback: {rollback_error!s}", exc_info=True
)
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
return documents_indexed, documents_skipped
return documents_indexed, documents_failed
async def index_composio_gmail(
@ -437,7 +486,7 @@ async def index_composio_gmail(
max_items: int = 1000,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str]:
"""Index Gmail messages via Composio with pagination and incremental processing."""
"""Index Gmail messages via Composio with real-time document status updates."""
try:
composio_connector = ComposioGmailConnector(session, connector_id)
@ -448,14 +497,10 @@ async def index_composio_gmail(
end_date = None
# Use provided dates directly if both are provided, otherwise calculate from last_indexed_at
# This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior)
if start_date is not None and end_date is not None:
# User provided both dates - use them directly
start_date_str = start_date
end_date_str = end_date
else:
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
@ -473,48 +518,32 @@ async def index_composio_gmail(
f"(start_date={start_date_str}, end_date={end_date_str})"
)
# Use smaller batch size to avoid 413 payload too large errors
await task_logger.log_task_progress(
log_entry,
f"Fetching Gmail messages via Composio for connector {connector_id}",
{"stage": "fetching_messages"},
)
# =======================================================================
# FETCH ALL MESSAGES FIRST
# =======================================================================
batch_size = 50
page_token = None
total_documents_indexed = 0
total_documents_skipped = 0
total_messages_fetched = 0
result_size_estimate = None # Will be set from first API response
all_messages = []
result_size_estimate = None
last_heartbeat_time = time.time()
while total_messages_fetched < max_items:
# Send heartbeat periodically to indicate task is still alive
while len(all_messages) < max_items:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(total_documents_indexed)
await on_heartbeat_callback(len(all_messages))
last_heartbeat_time = current_time
# Calculate how many messages to fetch in this batch
remaining = max_items - total_messages_fetched
remaining = max_items - len(all_messages)
current_batch_size = min(batch_size, remaining)
# Use result_size_estimate if available, otherwise fall back to max_items
estimated_total = (
result_size_estimate if result_size_estimate is not None else max_items
)
# Cap estimated_total at max_items to avoid showing misleading progress
estimated_total = min(estimated_total, max_items)
await task_logger.log_task_progress(
log_entry,
f"Fetching Gmail messages batch via Composio for connector {connector_id} "
f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)",
{
"stage": "fetching_messages",
"batch_size": current_batch_size,
"total_fetched": total_messages_fetched,
"total_indexed": total_documents_indexed,
"estimated_total": estimated_total,
},
)
# Fetch batch of messages
(
messages,
next_token,
@ -533,97 +562,136 @@ async def index_composio_gmail(
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
# No more messages available
break
# Update result_size_estimate from first response (Gmail provides this estimate)
if result_size_estimate is None and result_size_estimate_batch is not None:
result_size_estimate = result_size_estimate_batch
logger.info(
f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'"
)
total_messages_fetched += len(messages)
# Recalculate estimated_total after potentially updating result_size_estimate
estimated_total = (
result_size_estimate if result_size_estimate is not None else max_items
)
estimated_total = min(estimated_total, max_items)
all_messages.extend(messages)
logger.info(f"Fetched {len(messages)} messages (total: {len(all_messages)})")
logger.info(
f"Fetched batch of {len(messages)} Gmail messages "
f"(total: {total_messages_fetched}/{estimated_total})"
)
# Process batch incrementally
batch_indexed, batch_skipped = await _process_gmail_message_batch(
session=session,
messages=messages,
composio_connector=composio_connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
total_documents_indexed=total_documents_indexed,
)
total_documents_indexed += batch_indexed
total_documents_skipped += batch_skipped
logger.info(
f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped "
f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)"
)
# Batch commits happen in _process_gmail_message_batch every 10 documents
# This ensures progress is saved incrementally, preventing data loss on crashes
# Check if we should continue
if not next_token:
# No more pages available
if not next_token or len(messages) < current_batch_size:
break
if len(messages) < current_batch_size:
# Last page had fewer items than requested, we're done
break
# Continue with next page
page_token = next_token
if total_messages_fetched == 0:
if not all_messages:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
# CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found
return (
0,
None,
) # Return None (not error) when no items found - this is success with 0 items
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
logger.info(f"Found {len(all_messages)} Gmail messages to index via Composio")
# =======================================================================
# PHASE 1: Analyze all messages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
await task_logger.log_task_progress(
log_entry,
f"Phase 1: Creating pending documents for {len(all_messages)} messages",
{"stage": "phase1_pending"},
)
(
messages_to_process,
documents_skipped,
duplicate_content_count,
) = await _analyze_gmail_messages_phase1(
session=session,
messages=all_messages,
composio_connector=composio_connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
# Commit all pending documents - they all appear in UI now
new_documents_count = len([m for m in messages_to_process if m['is_new']])
if new_documents_count > 0:
logger.info(f"Phase 1: Committing {new_documents_count} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
await task_logger.log_task_progress(
log_entry,
f"Phase 2: Processing {len(messages_to_process)} documents",
{"stage": "phase2_processing"},
)
documents_indexed, documents_failed = await _process_gmail_messages_phase2(
session=session,
messages_to_process=messages_to_process,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
on_heartbeat_callback=on_heartbeat_callback,
)
# CRITICAL: Always update timestamp so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
# This matches the pattern used in non-Composio Gmail indexer
# Final commit to ensure all documents are persisted
logger.info(
f"Final commit: Total {total_documents_indexed} Gmail messages processed"
)
await session.commit()
logger.info(
"Successfully committed all Composio Gmail document changes to database"
f"Final commit: Total {documents_indexed} Gmail messages processed"
)
try:
await session.commit()
logger.info(
"Successfully committed all Composio Gmail document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
f"Successfully completed Gmail indexing via Composio for connector {connector_id}",
{
"documents_indexed": total_documents_indexed,
"documents_skipped": total_documents_skipped,
"messages_fetched": total_messages_fetched,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
return total_documents_indexed, None
logger.info(
f"Composio Gmail indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return documents_indexed, warning_message
except Exception as e:
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)

View file

@ -21,10 +21,14 @@ from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.connectors.composio_connector import ComposioConnector
from app.db import Document, DocumentType, Log
from app.db import Document, DocumentStatus, DocumentType, Log
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_duplicate_document_by_hash,
safe_set_chunks,
)
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
@ -537,22 +541,6 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first()
async def check_document_by_content_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
"""Check if a document with the given content hash already exists.
This is used to prevent duplicate content from being indexed, regardless
of which connector originally indexed it.
"""
from sqlalchemy.future import select
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()
async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
@ -843,14 +831,16 @@ async def _index_composio_drive_delta_sync(
log_entry,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
"""Index Google Drive files using delta sync (only changed files).
"""Index Google Drive files using delta sync with real-time document status updates.
Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync.
Handles: new files, modified files, and deleted files.
"""
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
processing_errors = []
duplicate_content_count = 0
last_heartbeat_time = time.time()
# Fetch all changes with pagination
@ -881,14 +871,13 @@ async def _index_composio_drive_delta_sync(
logger.info(f"Processing {len(all_changes)} changes from delta sync")
for change in all_changes[:max_items]:
# Send heartbeat periodically to indicate task is still alive
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
# =======================================================================
# PHASE 1: Analyze all changes, handle deletions, create pending documents
# =======================================================================
files_to_process = []
new_documents_created = False
for change in all_changes[:max_items]:
try:
# Handle removed files
is_removed = change.get("removed", False)
@ -899,9 +888,8 @@ async def _index_composio_drive_delta_sync(
documents_skipped += 1
continue
# Check if file was trashed or removed
# Check if file was trashed or removed - handle deletions immediately
if is_removed or file_info.get("trashed", False):
# Remove document from database
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
@ -923,37 +911,219 @@ async def _index_composio_drive_delta_sync(
if mime_type == "application/vnd.google-apps.folder":
continue
# Process the file
indexed, skipped, errors = await _process_single_drive_file(
session=session,
composio_connector=composio_connector,
file_id=file_id,
file_name=file_name,
mime_type=mime_type,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
# Check for existing document by file ID (from any connector)
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
documents_indexed += indexed
documents_skipped += skipped
processing_errors.extend(errors)
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists by unique identifier
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_by_file_id and not existing_document:
# File already indexed by different connector - skip
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value}"
)
documents_skipped += 1
continue
if existing_document:
# Queue existing document for update
files_to_process.append({
'document': existing_document,
'is_new': False,
'file_id': file_id,
'file_name': file_name,
'mime_type': mime_type,
})
continue
# Create new document with PENDING status
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
document_metadata={
"file_id": file_id,
"file_name": file_name,
"FILE_NAME": file_name,
"mime_type": mime_type,
"connector_id": connector_id,
"toolkit_id": "googledrive",
"source": "composio",
},
content="Pending...",
content_hash=unique_identifier_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[],
status=DocumentStatus.pending(),
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
files_to_process.append({
'document': document,
'is_new': True,
'file_id': file_id,
'file_name': file_name,
'mime_type': mime_type,
})
except Exception as e:
logger.error(f"Error in Phase 1 for change: {e!s}", exc_info=True)
documents_skipped += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# =======================================================================
logger.info(f"Phase 2: Processing {len(files_to_process)} documents")
for item in files_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit
document.status = DocumentStatus.processing()
await session.commit()
# Get file content
content, content_error = await composio_connector.get_drive_file_content(
item['file_id'], original_mime_type=item['mime_type']
)
if content_error or not content:
logger.warning(f"Could not get content for file {item['file_name']}: {content_error}")
markdown_content = f"# {item['file_name']}\n\n"
markdown_content += f"**File ID:** {item['file_id']}\n"
markdown_content += f"**Type:** {item['mime_type']}\n"
elif isinstance(content, dict):
error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}"
logger.error(error_msg)
processing_errors.append(error_msg)
markdown_content = f"# {item['file_name']}\n\n"
markdown_content += f"**File ID:** {item['file_id']}\n"
markdown_content += f"**Type:** {item['mime_type']}\n"
else:
markdown_content = await _process_file_content(
content=content,
file_name=item['file_name'],
file_id=item['file_id'],
mime_type=item['mime_type'],
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
processing_errors=processing_errors,
)
content_hash = generate_content_hash(markdown_content, search_space_id)
# For existing documents, check if content changed
if not item['is_new'] and document.content_hash == content_hash:
if not DocumentStatus.is_state(document.status, DocumentStatus.READY):
document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Check for duplicate content hash (for new documents)
if item['is_new']:
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"File {item['file_name']} already indexed by another connector. Skipping."
)
await session.delete(document)
duplicate_content_count += 1
documents_skipped += 1
continue
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata_for_summary = {
"file_id": item['file_id'],
"file_name": item['file_name'],
"mime_type": item['mime_type'],
"document_type": "Google Drive File (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata_for_summary
)
else:
summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
# Update document to READY
document.title = item['file_name']
document.content = summary_content
document.content_hash = content_hash
document.embedding = summary_embedding
document.document_metadata = {
"file_id": item['file_id'],
"file_name": item['file_name'],
"FILE_NAME": item['file_name'],
"mime_type": item['mime_type'],
"connector_id": connector_id,
"source": "composio",
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed > 0 and documents_indexed % 10 == 0:
if documents_indexed % 10 == 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} changes processed")
except Exception as e:
error_msg = f"Error processing change for file {file_id}: {e!s}"
error_msg = f"Error processing change for file {item['file_id']}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
logger.info(
f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped"
f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, "
f"{documents_failed} failed ({duplicate_content_count} duplicate content)"
)
return documents_indexed, documents_skipped, processing_errors
@ -973,10 +1143,12 @@ async def _index_composio_drive_full_scan(
log_entry,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
"""Index Google Drive files using full scan (first sync or when no delta token)."""
"""Index Google Drive files using full scan with real-time document status updates."""
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
processing_errors = []
duplicate_content_count = 0
last_heartbeat_time = time.time()
all_files = []
@ -1108,14 +1280,14 @@ async def _index_composio_drive_full_scan(
f"Found {len(all_files)} Google Drive files to index via Composio (full scan)"
)
for file_info in all_files:
# Send heartbeat periodically to indicate task is still alive
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
# =======================================================================
# PHASE 1: Analyze all files, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
files_to_process = [] # List of dicts with document and file data
new_documents_created = False
for file_info in all_files:
try:
# Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "")
@ -1132,227 +1304,228 @@ async def _index_composio_drive_full_scan(
if mime_type == "application/vnd.google-apps.folder":
continue
# Process the file
indexed, skipped, errors = await _process_single_drive_file(
session=session,
composio_connector=composio_connector,
file_id=file_id,
file_name=file_name,
mime_type=mime_type,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
# ========== EARLY DUPLICATE CHECK BY FILE ID ==========
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
if existing_by_file_id:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value}"
)
documents_skipped += 1
continue
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
documents_indexed += indexed
documents_skipped += skipped
processing_errors.extend(errors)
# Check if document exists by unique identifier
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Queue existing document for update (will be set to processing in Phase 2)
files_to_process.append({
'document': existing_document,
'is_new': False,
'file_id': file_id,
'file_name': file_name,
'mime_type': mime_type,
})
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
document_metadata={
"file_id": file_id,
"file_name": file_name,
"FILE_NAME": file_name,
"mime_type": mime_type,
"connector_id": connector_id,
"toolkit_id": "googledrive",
"source": "composio",
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
files_to_process.append({
'document': document,
'is_new': True,
'file_id': file_id,
'file_name': file_name,
'mime_type': mime_type,
})
except Exception as e:
logger.error(f"Error in Phase 1 for file: {e!s}", exc_info=True)
documents_skipped += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(files_to_process)} documents")
for item in files_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Get file content (pass mime_type for Google Workspace export handling)
content, content_error = await composio_connector.get_drive_file_content(
item['file_id'], original_mime_type=item['mime_type']
)
if content_error or not content:
logger.warning(f"Could not get content for file {item['file_name']}: {content_error}")
markdown_content = f"# {item['file_name']}\n\n"
markdown_content += f"**File ID:** {item['file_id']}\n"
markdown_content += f"**Type:** {item['mime_type']}\n"
elif isinstance(content, dict):
error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}"
logger.error(error_msg)
processing_errors.append(error_msg)
markdown_content = f"# {item['file_name']}\n\n"
markdown_content += f"**File ID:** {item['file_id']}\n"
markdown_content += f"**Type:** {item['mime_type']}\n"
else:
# Process content based on file type
markdown_content = await _process_file_content(
content=content,
file_name=item['file_name'],
file_id=item['file_id'],
mime_type=item['mime_type'],
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
processing_errors=processing_errors,
)
content_hash = generate_content_hash(markdown_content, search_space_id)
# For existing documents, check if content changed
if not item['is_new'] and document.content_hash == content_hash:
# Ensure status is ready
if not DocumentStatus.is_state(document.status, DocumentStatus.READY):
document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Check for duplicate content hash (for new documents)
if item['is_new']:
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"File {item['file_name']} already indexed by another connector. Skipping."
)
# Remove the pending document we created
await session.delete(document)
duplicate_content_count += 1
documents_skipped += 1
continue
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata_for_summary = {
"file_id": item['file_id'],
"file_name": item['file_name'],
"mime_type": item['mime_type'],
"document_type": "Google Drive File (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata_for_summary
)
else:
summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
# Update document to READY with actual content
document.title = item['file_name']
document.content = summary_content
document.content_hash = content_hash
document.embedding = summary_embedding
document.document_metadata = {
"file_id": item['file_id'],
"file_name": item['file_name'],
"FILE_NAME": item['file_name'],
"mime_type": item['mime_type'],
"connector_id": connector_id,
"source": "composio",
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed > 0 and documents_indexed % 10 == 0:
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit()
except Exception as e:
error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
error_msg = f"Error processing Drive file {item['file_name']}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
logger.info(
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, "
f"{documents_failed} failed ({duplicate_content_count} duplicate content)"
)
return documents_indexed, documents_skipped, processing_errors
async def _process_single_drive_file(
session: AsyncSession,
composio_connector: ComposioGoogleDriveConnector,
file_id: str,
file_name: str,
mime_type: str,
connector_id: int,
search_space_id: int,
user_id: str,
task_logger: TaskLoggingService,
log_entry,
) -> tuple[int, int, list[str]]:
"""Process a single Google Drive file for indexing.
Returns:
Tuple of (documents_indexed, documents_skipped, processing_errors)
"""
processing_errors = []
# ========== EARLY DUPLICATE CHECK BY FILE ID ==========
# Check if this Google Drive file was already indexed by ANY connector
# This happens BEFORE download/ETL to save expensive API calls
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
if existing_by_file_id:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' "
f"(saved download & ETL cost)"
)
return 0, 1, processing_errors # Skip - NO download, NO ETL!
# ======================================================
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists by unique identifier (same connector, same file)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get file content (pass mime_type for Google Workspace export handling)
content, content_error = await composio_connector.get_drive_file_content(
file_id, original_mime_type=mime_type
)
if content_error or not content:
logger.warning(f"Could not get content for file {file_name}: {content_error}")
# Use metadata as content fallback
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
elif isinstance(content, dict):
# Safety check: if content is still a dict, log error and use fallback
error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}"
logger.error(error_msg)
processing_errors.append(error_msg)
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
else:
# Process content based on file type
markdown_content = await _process_file_content(
content=content,
file_name=file_name,
file_id=file_id,
mime_type=mime_type,
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
processing_errors=processing_errors,
)
content_hash = generate_content_hash(markdown_content, search_space_id)
if existing_document:
if existing_document.content_hash == content_hash:
return 0, 1, processing_errors # Skipped - unchanged
# Update existing document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"file_id": file_id,
"file_name": file_name,
"FILE_NAME": file_name, # For compatibility
"mime_type": mime_type,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
return 1, 0, processing_errors # Indexed - updated
# Check if content_hash already exists (from any connector)
# This prevents duplicate content and avoids IntegrityError on unique constraint
existing_by_content_hash = await check_document_by_content_hash(
session, content_hash
)
if existing_by_content_hash:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): identical content "
f"already indexed as '{existing_by_content_hash.title}'"
)
return 0, 1, processing_errors # Skipped - duplicate content
# Create new document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
document_metadata={
"file_id": file_id,
"file_name": file_name,
"FILE_NAME": file_name, # For compatibility
"mime_type": mime_type,
"toolkit_id": "googledrive",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
return 1, 0, processing_errors # Indexed - new
async def _fetch_folder_files_recursively(
composio_connector: ComposioGoogleDriveConnector,
folder_id: str,

View file

@ -56,7 +56,7 @@ function StatusIndicator({ status }: { status?: DocumentStatus }) {
<Spinner size="sm" className="text-primary" />
</div>
</TooltipTrigger>
<TooltipContent side="top">Processing...</TooltipContent>
<TooltipContent side="top">Syncing</TooltipContent>
</Tooltip>
);
case "failed":

View file

@ -119,7 +119,7 @@ export function RowActions({
<Button
variant="ghost"
size="icon"
className={`h-8 w-8 ${isDeleteDisabled ? "text-muted-foreground/50 cursor-not-allowed" : "text-muted-foreground hover:text-destructive hover:bg-destructive/10"}`}
className={`h-8 w-8 ${isDeleteDisabled ? "text-muted-foreground cursor-not-allowed" : "text-muted-foreground hover:text-destructive hover:bg-destructive/10"}`}
onClick={() => !isDeleteDisabled && setIsDeleteOpen(true)}
disabled={isDeleting || isDeleteDisabled}
>
@ -164,7 +164,7 @@ export function RowActions({
<Button
variant="ghost"
size="icon"
className={`h-8 w-8 ${isDeleteDisabled ? "text-muted-foreground/50 cursor-not-allowed" : "text-muted-foreground hover:text-destructive hover:bg-destructive/10"}`}
className={`h-8 w-8 ${isDeleteDisabled ? "text-muted-foreground cursor-not-allowed" : "text-muted-foreground hover:text-destructive hover:bg-destructive/10"}`}
onClick={() => !isDeleteDisabled && setIsDeleteOpen(true)}
disabled={isDeleting || isDeleteDisabled}
>