diff --git a/surfsense_backend/app/connectors/composio_connector.py b/surfsense_backend/app/connectors/composio_connector.py index 18fd9564c..21e339d12 100644 --- a/surfsense_backend/app/connectors/composio_connector.py +++ b/surfsense_backend/app/connectors/composio_connector.py @@ -151,21 +151,23 @@ class ComposioConnector: async def list_gmail_messages( self, query: str = "", - max_results: int = 100, - ) -> tuple[list[dict[str, Any]], str | None]: + max_results: int = 50, + page_token: str | None = None, + ) -> tuple[list[dict[str, Any]], str | None, int | None, str | None]: """ - List Gmail messages via Composio. + List Gmail messages via Composio with pagination support. Args: query: Gmail search query. - max_results: Maximum number of messages. + max_results: Maximum number of messages per page (default: 50). + page_token: Optional pagination token for next page. Returns: - Tuple of (messages list, error message). + Tuple of (messages list, next_page_token, result_size_estimate, error message). """ connected_account_id = await self.get_connected_account_id() if not connected_account_id: - return [], "No connected account ID found" + return [], None, None, "No connected account ID found" entity_id = await self.get_entity_id() service = await self._get_service() @@ -174,6 +176,7 @@ class ComposioConnector: entity_id=entity_id, query=query, max_results=max_results, + page_token=page_token, ) async def get_gmail_message_detail( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 9ad03fba8..1578ad0d5 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -957,7 +957,7 @@ async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id: connector = result.scalars().first() if connector: - connector.last_indexed_at = datetime.now() + connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency await session.commit() logger.info(f"Updated last_indexed_at for connector {connector_id}") except Exception as e: @@ -1097,18 +1097,22 @@ async def _run_indexing_with_notifications( ) await update_timestamp_func(session, connector_id) + await session.commit() # Commit timestamp update logger.info( f"Indexing completed successfully: {documents_processed} documents processed" ) # Update notification on success if notification: + # Refresh notification to ensure it's not stale after timestamp update commit + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, indexed_count=documents_processed, error_message=None, ) + await session.commit() # Commit to ensure Electric SQL syncs the notification update elif documents_processed > 0: # Update notification to storing stage if notification: @@ -1124,24 +1128,30 @@ async def _run_indexing_with_notifications( f"Indexing completed successfully: {documents_processed} documents processed" ) if notification: + # Refresh notification to ensure it's not stale after indexing function commits + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, indexed_count=documents_processed, error_message=None, ) + await session.commit() # Commit to ensure Electric SQL syncs the notification update else: # No new documents processed - check if this is an error or just no changes if error_or_warning: # Actual failure logger.error(f"Indexing failed: {error_or_warning}") if notification: + # Refresh notification to ensure it's not stale after indexing function commits + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, indexed_count=0, error_message=error_or_warning, ) + await session.commit() # Commit to ensure Electric SQL syncs the notification update else: # Success - just no new documents to index (all skipped/unchanged) logger.info( @@ -1150,13 +1160,17 @@ async def _run_indexing_with_notifications( # Still update timestamp so ElectricSQL syncs and clears "Syncing" UI if update_timestamp_func: await update_timestamp_func(session, connector_id) + await session.commit() # Commit timestamp update if notification: + # Refresh notification to ensure it's not stale after timestamp update commit + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, indexed_count=0, error_message=None, # No error - sync succeeded ) + await session.commit() # Commit to ensure Electric SQL syncs the notification update except Exception as e: logger.error(f"Error in indexing task: {e!s}", exc_info=True) diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 17fbd64e0..e32cbf8a0 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -256,7 +256,6 @@ class ComposioService: "user_id": getattr(acc, "user_id", None), }) - logger.info(f"DEBUG: Found {len(result)} TOTAL connections in Composio") return result except Exception as e: logger.error(f"Failed to list all connections: {e!s}") @@ -273,7 +272,6 @@ class ComposioService: List of connected account details. """ try: - logger.info(f"DEBUG: Calling connected_accounts.list(user_id='{user_id}')") accounts_response = self.client.connected_accounts.list(user_id=user_id) # Handle paginated response (may have .items attribute) or direct list @@ -358,7 +356,6 @@ class ComposioService: # - connected_account_id: for authentication # - user_id: user identifier (SDK uses user_id, not entity_id) # - dangerously_skip_version_check: skip version check for manual execution - logger.info(f"DEBUG: Executing tool {tool_name} with params: {params}") result = self.client.tools.execute( slug=tool_name, connected_account_id=connected_account_id, @@ -366,8 +363,6 @@ class ComposioService: arguments=params or {}, dangerously_skip_version_check=True, ) - logger.info(f"DEBUG: Tool {tool_name} raw result type: {type(result)}") - logger.info(f"DEBUG: Tool {tool_name} raw result: {result}") return {"success": True, "data": result} except Exception as e: logger.error(f"Failed to execute tool {tool_name}: {e!s}") @@ -417,7 +412,6 @@ class ComposioService: return [], None, result.get("error", "Unknown error") data = result.get("data", {}) - logger.info(f"DEBUG: Drive data type: {type(data)}, keys: {data.keys() if isinstance(data, dict) else 'N/A'}") # Handle nested response structure from Composio files = [] @@ -429,7 +423,6 @@ class ComposioService: elif isinstance(data, list): files = data - logger.info(f"DEBUG: Extracted {len(files)} drive files") return files, next_token, None except Exception as e: @@ -478,25 +471,30 @@ class ComposioService: connected_account_id: str, entity_id: str, query: str = "", - max_results: int = 100, - ) -> tuple[list[dict[str, Any]], str | None]: + max_results: int = 50, + page_token: str | None = None, + ) -> tuple[list[dict[str, Any]], str | None, int | None, str | None]: """ - List Gmail messages via Composio. + List Gmail messages via Composio with pagination support. Args: connected_account_id: Composio connected account ID. entity_id: The entity/user ID that owns the connected account. query: Gmail search query. - max_results: Maximum number of messages to return. + max_results: Maximum number of messages to return per page (default: 50 to avoid payload size issues). + page_token: Optional pagination token for next page. Returns: - Tuple of (messages list, error message). + Tuple of (messages list, next_page_token, result_size_estimate, error message). """ try: - # Composio uses snake_case for parameters, max is 500 - params = {"max_results": min(max_results, 500)} + # Use smaller batch size to avoid 413 payload too large errors + # Composio uses snake_case for parameters + params = {"max_results": min(max_results, 50)} # Reduced from 500 to 50 if query: params["query"] = query # Composio uses 'query' not 'q' + if page_token: + params["page_token"] = page_token result = await self.execute_tool( connected_account_id=connected_account_id, @@ -506,25 +504,38 @@ class ComposioService: ) if not result.get("success"): - return [], result.get("error", "Unknown error") + return [], None, result.get("error", "Unknown error") data = result.get("data", {}) - logger.info(f"DEBUG: Gmail data type: {type(data)}, keys: {data.keys() if isinstance(data, dict) else 'N/A'}") - logger.info(f"DEBUG: Gmail full data: {data}") # Try different possible response structures messages = [] + next_token = None + result_size_estimate = None if isinstance(data, dict): messages = data.get("messages", []) or data.get("data", {}).get("messages", []) or data.get("emails", []) + # Check for pagination token in various possible locations + next_token = ( + data.get("nextPageToken") + or data.get("next_page_token") + or data.get("data", {}).get("nextPageToken") + or data.get("data", {}).get("next_page_token") + ) + # Extract resultSizeEstimate if available (Gmail API provides this) + result_size_estimate = ( + data.get("resultSizeEstimate") + or data.get("result_size_estimate") + or data.get("data", {}).get("resultSizeEstimate") + or data.get("data", {}).get("result_size_estimate") + ) elif isinstance(data, list): messages = data - logger.info(f"DEBUG: Extracted {len(messages)} messages") - return messages, None + return messages, next_token, result_size_estimate, None except Exception as e: logger.error(f"Failed to list Gmail messages: {e!s}") - return [], str(e) + return [], None, str(e) async def get_gmail_message_detail( self, connected_account_id: str, entity_id: str, message_id: str @@ -603,8 +614,6 @@ class ComposioService: return [], result.get("error", "Unknown error") data = result.get("data", {}) - logger.info(f"DEBUG: Calendar data type: {type(data)}, keys: {data.keys() if isinstance(data, dict) else 'N/A'}") - logger.info(f"DEBUG: Calendar full data: {data}") # Try different possible response structures events = [] @@ -613,7 +622,6 @@ class ComposioService: elif isinstance(data, list): events = data - logger.info(f"DEBUG: Extracted {len(events)} calendar events") return events, None except Exception as e: diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index 8762561ee..c9cd74234 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -9,6 +9,7 @@ to avoid circular import issues with the connector_indexers package. import logging from datetime import UTC, datetime +from typing import Any from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession @@ -26,6 +27,7 @@ from app.db import ( from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_DOCUMENT_TYPE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import calculate_date_range from app.utils.document_converters import ( create_document_chunks, generate_content_hash, @@ -75,7 +77,7 @@ async def update_connector_last_indexed( ) -> None: """Update the last_indexed_at timestamp for a connector.""" if update_last_indexed: - connector.last_indexed_at = datetime.now() + connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") @@ -287,6 +289,9 @@ async def _index_composio_google_drive( await task_logger.log_task_success( log_entry, success_msg, {"files_count": 0} ) + # CRITICAL: Update timestamp even when no files found so Electric SQL syncs and UI shows indexed status + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() return 0, None # Return None (not error) when no items found - this is success with 0 items logger.info(f"Found {len(all_files)} Google Drive files to index via Composio") @@ -380,6 +385,13 @@ async def _index_composio_google_drive( existing_document.updated_at = get_current_timestamp() documents_indexed += 1 + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Google Drive files processed so far" + ) + await session.commit() continue # Create new document @@ -425,7 +437,11 @@ async def _index_composio_google_drive( session.add(document) documents_indexed += 1 + # Batch commit every 10 documents if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Google Drive files processed so far" + ) await session.commit() except Exception as e: @@ -433,10 +449,19 @@ async def _index_composio_google_drive( documents_skipped += 1 continue - if documents_indexed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) + # Final commit to ensure all documents are persisted (safety net) + # This matches the pattern used in non-Composio Gmail indexer + logger.info( + f"Final commit: Total {documents_indexed} Google Drive files processed" + ) await session.commit() + logger.info( + "Successfully committed all Composio Google Drive document changes to database" + ) await task_logger.log_task_success( log_entry, @@ -454,154 +479,89 @@ async def _index_composio_google_drive( return 0, f"Failed to index Google Drive via Composio: {e!s}" -async def _index_composio_gmail( +async def _process_gmail_message_batch( session: AsyncSession, - connector, + messages: list[dict[str, Any]], + composio_connector: ComposioConnector, connector_id: int, search_space_id: int, user_id: str, - start_date: str | None, - end_date: str | None, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 1000, -) -> tuple[int, str]: - """Index Gmail messages via Composio.""" - try: - composio_connector = ComposioConnector(session, connector_id) + total_documents_indexed: int = 0, +) -> tuple[int, int]: + """ + Process a batch of Gmail messages and index them. + + Args: + total_documents_indexed: Running total of documents indexed so far (for batch commits). + + Returns: + Tuple of (documents_indexed, documents_skipped) + """ + documents_indexed = 0 + documents_skipped = 0 - await task_logger.log_task_progress( - log_entry, - f"Fetching Gmail messages via Composio for connector {connector_id}", - {"stage": "fetching_messages"}, - ) + for message in messages: + try: + # Composio uses 'messageId' (camelCase), not 'id' + message_id = message.get("messageId", "") or message.get("id", "") + if not message_id: + documents_skipped += 1 + continue - # Build query with date range - query_parts = [] - if start_date: - query_parts.append(f"after:{start_date.replace('-', '/')}") - if end_date: - query_parts.append(f"before:{end_date.replace('-', '/')}") - query = " ".join(query_parts) + # Composio's GMAIL_FETCH_EMAILS already returns full message content + # No need for a separate detail API call - messages, error = await composio_connector.list_gmail_messages( - query=query, - max_results=max_items, - ) + # Extract message info from Composio response + # Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds + payload = message.get("payload", {}) + headers = payload.get("headers", []) - if error: - await task_logger.log_task_failure( - log_entry, f"Failed to fetch Gmail messages: {error}", {} + subject = "No Subject" + sender = "Unknown Sender" + date_str = message.get("messageTimestamp", "Unknown Date") + + for header in headers: + name = header.get("name", "").lower() + value = header.get("value", "") + if name == "subject": + subject = value + elif name == "from": + sender = value + elif name == "date": + date_str = value + + # Format to markdown using the full message data + markdown_content = composio_connector.format_gmail_message_to_markdown(message) + + # Check for empty content (defensive parsing per Composio best practices) + if not markdown_content.strip(): + logger.warning(f"Skipping Gmail message with no content: {subject}") + documents_skipped += 1 + continue + + # Generate unique identifier + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"gmail_{message_id}", search_space_id ) - return 0, f"Failed to fetch Gmail messages: {error}" - if not messages: - success_msg = "No Gmail messages found in the specified date range" - await task_logger.log_task_success( - log_entry, success_msg, {"messages_count": 0} + content_hash = generate_content_hash(markdown_content, search_space_id) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - return 0, None # Return None (not error) when no items found - this is success with 0 items - logger.info(f"Found {len(messages)} Gmail messages to index via Composio") + # Get label IDs from Composio response + label_ids = message.get("labelIds", []) + # Extract thread_id if available (for consistency with non-Composio implementation) + thread_id = message.get("threadId", "") or message.get("thread_id", "") - documents_indexed = 0 - documents_skipped = 0 - - for message in messages: - try: - # Composio uses 'messageId' (camelCase), not 'id' - message_id = message.get("messageId", "") or message.get("id", "") - if not message_id: + if existing_document: + if existing_document.content_hash == content_hash: documents_skipped += 1 continue - # Composio's GMAIL_FETCH_EMAILS already returns full message content - # No need for a separate detail API call - - # Extract message info from Composio response - # Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds - payload = message.get("payload", {}) - headers = payload.get("headers", []) - - subject = "No Subject" - sender = "Unknown Sender" - date_str = message.get("messageTimestamp", "Unknown Date") - - for header in headers: - name = header.get("name", "").lower() - value = header.get("value", "") - if name == "subject": - subject = value - elif name == "from": - sender = value - elif name == "date": - date_str = value - - # Format to markdown using the full message data - markdown_content = composio_connector.format_gmail_message_to_markdown(message) - - # Generate unique identifier - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"gmail_{message_id}", search_space_id - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Get label IDs from Composio response - label_ids = message.get("labelIds", []) - - if existing_document: - if existing_document.content_hash == content_hash: - documents_skipped += 1 - continue - - # Update existing - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "subject": subject, - "sender": sender, - "document_type": "Gmail Message (Composio)", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - summary_embedding = config.embedding_model_instance.embed(summary_content) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = f"Gmail: {subject}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "message_id": message_id, - "subject": subject, - "sender": sender, - "date": date_str, - "labels": label_ids, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - continue - - # Create new document + # Update existing user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) @@ -609,6 +569,7 @@ async def _index_composio_gmail( if user_llm: document_metadata = { "message_id": message_id, + "thread_id": thread_id, "subject": subject, "sender": sender, "document_type": "Gmail Message (Composio)", @@ -622,53 +583,276 @@ async def _index_composio_gmail( chunks = await create_document_chunks(markdown_content) - document = Document( - search_space_id=search_space_id, - title=f"Gmail: {subject}", - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]), - document_metadata={ - "message_id": message_id, - "subject": subject, - "sender": sender, - "date": date_str, - "labels": label_ids, - "connector_id": connector_id, - "toolkit_id": "gmail", - "source": "composio", - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - ) - session.add(document) + existing_document.title = f"Gmail: {subject}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "labels": label_ids, + "connector_id": connector_id, + "source": "composio", + } + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + documents_indexed += 1 - - if documents_indexed % 10 == 0: + + # Batch commit every 10 documents + current_total = total_documents_indexed + documents_indexed + if current_total % 10 == 0: + logger.info( + f"Committing batch: {current_total} Gmail messages processed so far" + ) await session.commit() - - except Exception as e: - logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) - documents_skipped += 1 continue - if documents_indexed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # Create new document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + if user_llm: + document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "document_type": "Gmail Message (Composio)", + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" + summary_embedding = config.embedding_model_instance.embed(summary_content) + + chunks = await create_document_chunks(markdown_content) + + document = Document( + search_space_id=search_space_id, + title=f"Gmail: {subject}", + document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]), + document_metadata={ + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "labels": label_ids, + "connector_id": connector_id, + "toolkit_id": "gmail", + "source": "composio", + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + updated_at=get_current_timestamp(), + ) + session.add(document) + documents_indexed += 1 + + # Batch commit every 10 documents + current_total = total_documents_indexed + documents_indexed + if current_total % 10 == 0: + logger.info( + f"Committing batch: {current_total} Gmail messages processed so far" + ) + await session.commit() + + except Exception as e: + logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) + documents_skipped += 1 + # Rollback on error to avoid partial state (per Composio best practices) + try: + await session.rollback() + except Exception as rollback_error: + logger.error(f"Error during rollback: {rollback_error!s}", exc_info=True) + continue + + return documents_indexed, documents_skipped + + +async def _index_composio_gmail( + session: AsyncSession, + connector, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None, + end_date: str | None, + task_logger: TaskLoggingService, + log_entry, + update_last_indexed: bool = True, + max_items: int = 1000, +) -> tuple[int, str]: + """Index Gmail messages via Composio with pagination and incremental processing.""" + try: + composio_connector = ComposioConnector(session, connector_id) + + # Normalize date values - handle "undefined" strings from frontend + if start_date == "undefined" or start_date == "": + start_date = None + if end_date == "undefined" or end_date == "": + end_date = None + + # Calculate date range with defaults (uses last_indexed_at or 365 days back) + # This ensures indexing works even when user doesn't specify dates + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + # Build query with date range + query_parts = [] + if start_date_str: + query_parts.append(f"after:{start_date_str.replace('-', '/')}") + if end_date_str: + query_parts.append(f"before:{end_date_str.replace('-', '/')}") + query = " ".join(query_parts) if query_parts else "" + + logger.info( + f"Gmail query for connector {connector_id}: '{query}' " + f"(start_date={start_date_str}, end_date={end_date_str})" + ) + + # Use smaller batch size to avoid 413 payload too large errors + batch_size = 50 + page_token = None + total_documents_indexed = 0 + total_documents_skipped = 0 + total_messages_fetched = 0 + result_size_estimate = None # Will be set from first API response + + while total_messages_fetched < max_items: + # Calculate how many messages to fetch in this batch + remaining = max_items - total_messages_fetched + current_batch_size = min(batch_size, remaining) + + # Use result_size_estimate if available, otherwise fall back to max_items + estimated_total = result_size_estimate if result_size_estimate is not None else max_items + # Cap estimated_total at max_items to avoid showing misleading progress + estimated_total = min(estimated_total, max_items) + + await task_logger.log_task_progress( + log_entry, + f"Fetching Gmail messages batch via Composio for connector {connector_id} " + f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)", + { + "stage": "fetching_messages", + "batch_size": current_batch_size, + "total_fetched": total_messages_fetched, + "total_indexed": total_documents_indexed, + "estimated_total": estimated_total, + }, + ) + + # Fetch batch of messages + messages, next_token, result_size_estimate_batch, error = await composio_connector.list_gmail_messages( + query=query, + max_results=current_batch_size, + page_token=page_token, + ) + + if error: + await task_logger.log_task_failure( + log_entry, f"Failed to fetch Gmail messages: {error}", {} + ) + return 0, f"Failed to fetch Gmail messages: {error}" + + if not messages: + # No more messages available + break + + # Update result_size_estimate from first response (Gmail provides this estimate) + if result_size_estimate is None and result_size_estimate_batch is not None: + result_size_estimate = result_size_estimate_batch + logger.info(f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'") + + total_messages_fetched += len(messages) + # Recalculate estimated_total after potentially updating result_size_estimate + estimated_total = result_size_estimate if result_size_estimate is not None else max_items + estimated_total = min(estimated_total, max_items) + + logger.info( + f"Fetched batch of {len(messages)} Gmail messages " + f"(total: {total_messages_fetched}/{estimated_total})" + ) + + # Process batch incrementally + batch_indexed, batch_skipped = await _process_gmail_message_batch( + session=session, + messages=messages, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + total_documents_indexed=total_documents_indexed, + ) + + total_documents_indexed += batch_indexed + total_documents_skipped += batch_skipped + + logger.info( + f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped " + f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)" + ) + + # Batch commits happen in _process_gmail_message_batch every 10 documents + # This ensures progress is saved incrementally, preventing data loss on crashes + + # Check if we should continue + if not next_token: + # No more pages available + break + + if len(messages) < current_batch_size: + # Last page had fewer items than requested, we're done + break + + # Continue with next page + page_token = next_token + + if total_messages_fetched == 0: + success_msg = "No Gmail messages found in the specified date range" + await task_logger.log_task_success( + log_entry, success_msg, {"messages_count": 0} + ) + # CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + return 0, None # Return None (not error) when no items found + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit to ensure all documents are persisted (safety net) + # This matches the pattern used in non-Composio Gmail indexer + logger.info( + f"Final commit: Total {total_documents_indexed} Gmail messages processed" + ) await session.commit() + logger.info( + "Successfully committed all Composio Gmail document changes to database" + ) await task_logger.log_task_success( log_entry, f"Successfully completed Gmail indexing via Composio for connector {connector_id}", { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, + "documents_indexed": total_documents_indexed, + "documents_skipped": total_documents_skipped, + "messages_fetched": total_messages_fetched, }, ) - return documents_indexed, None + return total_documents_indexed, None except Exception as e: logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True) @@ -689,8 +873,6 @@ async def _index_composio_google_calendar( max_items: int = 2500, ) -> tuple[int, str]: """Index Google Calendar events via Composio.""" - from datetime import datetime, timedelta - try: composio_connector = ComposioConnector(session, connector_id) @@ -700,18 +882,26 @@ async def _index_composio_google_calendar( {"stage": "fetching_events"}, ) - # Build time range - if start_date: - time_min = f"{start_date}T00:00:00Z" - else: - # Default to 365 days ago - default_start = datetime.now() - timedelta(days=365) - time_min = default_start.strftime("%Y-%m-%dT00:00:00Z") + # Normalize date values - handle "undefined" strings from frontend + if start_date == "undefined" or start_date == "": + start_date = None + if end_date == "undefined" or end_date == "": + end_date = None - if end_date: - time_max = f"{end_date}T23:59:59Z" - else: - time_max = datetime.now().strftime("%Y-%m-%dT23:59:59Z") + # Calculate date range with defaults (uses last_indexed_at or 365 days back) + # This ensures indexing works even when user doesn't specify dates + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + # Build time range for API call + time_min = f"{start_date_str}T00:00:00Z" + time_max = f"{end_date_str}T23:59:59Z" + + logger.info( + f"Google Calendar query for connector {connector_id}: " + f"(start_date={start_date_str}, end_date={end_date_str})" + ) events, error = await composio_connector.list_calendar_events( time_min=time_min, @@ -730,6 +920,9 @@ async def _index_composio_google_calendar( await task_logger.log_task_success( log_entry, success_msg, {"events_count": 0} ) + # CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() return 0, None # Return None (not error) when no items found - this is success with 0 items logger.info(f"Found {len(events)} Google Calendar events to index via Composio") @@ -814,6 +1007,13 @@ async def _index_composio_google_calendar( existing_document.updated_at = get_current_timestamp() documents_indexed += 1 + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Google Calendar events processed so far" + ) + await session.commit() continue # Create new document @@ -863,7 +1063,11 @@ async def _index_composio_google_calendar( session.add(document) documents_indexed += 1 + # Batch commit every 10 documents if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Google Calendar events processed so far" + ) await session.commit() except Exception as e: @@ -871,10 +1075,19 @@ async def _index_composio_google_calendar( documents_skipped += 1 continue - if documents_indexed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) + # Final commit to ensure all documents are persisted (safety net) + # This matches the pattern used in non-Composio Gmail indexer + logger.info( + f"Final commit: Total {documents_indexed} Google Calendar events processed" + ) await session.commit() + logger.info( + "Successfully committed all Composio Google Calendar document changes to database" + ) await task_logger.log_task_success( log_entry,