diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index 05395bfba..870053c7f 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -16,11 +16,15 @@ from sqlalchemy.orm import selectinload from app.config import config from app.connectors.composio_connector import ComposioConnector -from app.db import Document, DocumentType +from app.db import Document, DocumentStatus, DocumentType from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.tasks.connector_indexers.base import calculate_date_range +from app.tasks.connector_indexers.base import ( + calculate_date_range, + check_duplicate_document_by_hash, + safe_set_chunks, +) from app.utils.document_converters import ( create_document_chunks, generate_content_hash, @@ -206,26 +210,24 @@ class ComposioGmailConnector(ComposioConnector): # ============ Indexer Functions ============ -async def _process_gmail_message_batch( +async def _analyze_gmail_messages_phase1( session: AsyncSession, messages: list[dict[str, Any]], composio_connector: ComposioGmailConnector, connector_id: int, search_space_id: int, user_id: str, - total_documents_indexed: int = 0, -) -> tuple[int, int]: +) -> tuple[list[dict[str, Any]], int, int]: """ - Process a batch of Gmail messages and index them. - - Args: - total_documents_indexed: Running total of documents indexed so far (for batch commits). + Phase 1: Analyze all messages, create pending documents. + Makes ALL documents visible in the UI immediately with pending status. Returns: - Tuple of (documents_indexed, documents_skipped) + Tuple of (messages_to_process, documents_skipped, duplicate_content_count) """ - documents_indexed = 0 + messages_to_process = [] documents_skipped = 0 + duplicate_content_count = 0 for message in messages: try: @@ -235,11 +237,7 @@ async def _process_gmail_message_batch( documents_skipped += 1 continue - # Composio's GMAIL_FETCH_EMAILS already returns full message content - # No need for a separate detail API call - # Extract message info from Composio response - # Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds payload = message.get("payload", {}) headers = payload.get("headers", []) @@ -262,7 +260,7 @@ async def _process_gmail_message_batch( message ) - # Check for empty content (defensive parsing per Composio best practices) + # Check for empty content if not markdown_content.strip(): logger.warning(f"Skipping Gmail message with no content: {subject}") documents_skipped += 1 @@ -280,99 +278,51 @@ async def _process_gmail_message_batch( session, unique_identifier_hash ) - # Get label IDs from Composio response + # Get label IDs and thread_id from Composio response label_ids = message.get("labelIds", []) - # Extract thread_id if available (for consistency with non-Composio implementation) thread_id = message.get("threadId", "") or message.get("thread_id", "") if existing_document: if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - # Update existing - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "document_type": "Gmail Message (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = subject - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "labels": label_ids, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - - # Batch commit every 10 documents - current_total = total_documents_indexed + documents_indexed - if current_total % 10 == 0: - logger.info( - f"Committing batch: {current_total} Gmail messages processed so far" - ) - await session.commit() + # Queue existing document for update (will be set to processing in Phase 2) + messages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'message_id': message_id, + 'thread_id': thread_id, + 'subject': subject, + 'sender': sender, + 'date_str': date_str, + 'label_ids': label_ids, + }) continue - # Create new document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "document_type": "Gmail Message (Composio)", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from standard connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash ) - chunks = await create_document_chunks(markdown_content) + if duplicate_by_content: + logger.info( + f"Message {subject} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + duplicate_content_count += 1 + documents_skipped += 1 + continue + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=subject, @@ -388,39 +338,138 @@ async def _process_gmail_message_batch( "toolkit_id": "gmail", "source": "composio", }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) session.add(document) + + messages_to_process.append({ + 'document': document, + 'is_new': True, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'message_id': message_id, + 'thread_id': thread_id, + 'subject': subject, + 'sender': sender, + 'date_str': date_str, + 'label_ids': label_ids, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True) + documents_skipped += 1 + continue + + return messages_to_process, documents_skipped, duplicate_content_count + + +async def _process_gmail_messages_phase2( + session: AsyncSession, + messages_to_process: list[dict[str, Any]], + connector_id: int, + search_space_id: int, + user_id: str, + on_heartbeat_callback: HeartbeatCallbackType | None = None, +) -> tuple[int, int]: + """ + Phase 2: Process each document one by one. + Each document transitions: pending → processing → ready/failed + + Returns: + Tuple of (documents_indexed, documents_failed) + """ + documents_indexed = 0 + documents_failed = 0 + last_heartbeat_time = time.time() + + for item in messages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "message_id": item['message_id'], + "thread_id": item['thread_id'], + "subject": item['subject'], + "sender": item['sender'], + "document_type": "Gmail Message (Composio)", + } + summary_content, summary_embedding = await generate_document_summary( + item['markdown_content'], user_llm, document_metadata_for_summary + ) + else: + summary_content = ( + f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['markdown_content']) + + # Update document to READY with actual content + document.title = item['subject'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "message_id": item['message_id'], + "thread_id": item['thread_id'], + "subject": item['subject'], + "sender": item['sender'], + "date": item['date_str'], + "labels": item['label_ids'], + "connector_id": connector_id, + "source": "composio", + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + documents_indexed += 1 - # Batch commit every 10 documents - current_total = total_documents_indexed + documents_indexed - if current_total % 10 == 0: + # Batch commit every 10 documents (for ready status updates) + if documents_indexed % 10 == 0: logger.info( - f"Committing batch: {current_total} Gmail messages processed so far" + f"Committing batch: {documents_indexed} Gmail messages processed so far" ) await session.commit() except Exception as e: logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) - documents_skipped += 1 - # Rollback on error to avoid partial state (per Composio best practices) + # Mark document as failed with reason (visible in UI) try: - await session.rollback() - except Exception as rollback_error: - logger.error( - f"Error during rollback: {rollback_error!s}", exc_info=True - ) + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 continue - return documents_indexed, documents_skipped + return documents_indexed, documents_failed async def index_composio_gmail( @@ -437,7 +486,7 @@ async def index_composio_gmail( max_items: int = 1000, on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str]: - """Index Gmail messages via Composio with pagination and incremental processing.""" + """Index Gmail messages via Composio with real-time document status updates.""" try: composio_connector = ComposioGmailConnector(session, connector_id) @@ -448,14 +497,10 @@ async def index_composio_gmail( end_date = None # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at - # This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior) if start_date is not None and end_date is not None: - # User provided both dates - use them directly start_date_str = start_date end_date_str = end_date else: - # Calculate date range with defaults (uses last_indexed_at or 365 days back) - # This ensures indexing works even when user doesn't specify dates start_date_str, end_date_str = calculate_date_range( connector, start_date, end_date, default_days_back=365 ) @@ -473,48 +518,32 @@ async def index_composio_gmail( f"(start_date={start_date_str}, end_date={end_date_str})" ) - # Use smaller batch size to avoid 413 payload too large errors + await task_logger.log_task_progress( + log_entry, + f"Fetching Gmail messages via Composio for connector {connector_id}", + {"stage": "fetching_messages"}, + ) + + # ======================================================================= + # FETCH ALL MESSAGES FIRST + # ======================================================================= batch_size = 50 page_token = None - total_documents_indexed = 0 - total_documents_skipped = 0 - total_messages_fetched = 0 - result_size_estimate = None # Will be set from first API response + all_messages = [] + result_size_estimate = None last_heartbeat_time = time.time() - while total_messages_fetched < max_items: - # Send heartbeat periodically to indicate task is still alive + while len(all_messages) < max_items: + # Send heartbeat periodically if on_heartbeat_callback: current_time = time.time() if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(total_documents_indexed) + await on_heartbeat_callback(len(all_messages)) last_heartbeat_time = current_time - # Calculate how many messages to fetch in this batch - remaining = max_items - total_messages_fetched + remaining = max_items - len(all_messages) current_batch_size = min(batch_size, remaining) - # Use result_size_estimate if available, otherwise fall back to max_items - estimated_total = ( - result_size_estimate if result_size_estimate is not None else max_items - ) - # Cap estimated_total at max_items to avoid showing misleading progress - estimated_total = min(estimated_total, max_items) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Gmail messages batch via Composio for connector {connector_id} " - f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)", - { - "stage": "fetching_messages", - "batch_size": current_batch_size, - "total_fetched": total_messages_fetched, - "total_indexed": total_documents_indexed, - "estimated_total": estimated_total, - }, - ) - - # Fetch batch of messages ( messages, next_token, @@ -533,97 +562,136 @@ async def index_composio_gmail( return 0, f"Failed to fetch Gmail messages: {error}" if not messages: - # No more messages available break - # Update result_size_estimate from first response (Gmail provides this estimate) if result_size_estimate is None and result_size_estimate_batch is not None: result_size_estimate = result_size_estimate_batch logger.info( f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'" ) - total_messages_fetched += len(messages) - # Recalculate estimated_total after potentially updating result_size_estimate - estimated_total = ( - result_size_estimate if result_size_estimate is not None else max_items - ) - estimated_total = min(estimated_total, max_items) + all_messages.extend(messages) + logger.info(f"Fetched {len(messages)} messages (total: {len(all_messages)})") - logger.info( - f"Fetched batch of {len(messages)} Gmail messages " - f"(total: {total_messages_fetched}/{estimated_total})" - ) - - # Process batch incrementally - batch_indexed, batch_skipped = await _process_gmail_message_batch( - session=session, - messages=messages, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - total_documents_indexed=total_documents_indexed, - ) - - total_documents_indexed += batch_indexed - total_documents_skipped += batch_skipped - - logger.info( - f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped " - f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)" - ) - - # Batch commits happen in _process_gmail_message_batch every 10 documents - # This ensures progress is saved incrementally, preventing data loss on crashes - - # Check if we should continue - if not next_token: - # No more pages available + if not next_token or len(messages) < current_batch_size: break - if len(messages) < current_batch_size: - # Last page had fewer items than requested, we're done - break - - # Continue with next page page_token = next_token - if total_messages_fetched == 0: + if not all_messages: success_msg = "No Gmail messages found in the specified date range" await task_logger.log_task_success( log_entry, success_msg, {"messages_count": 0} ) - # CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) await session.commit() - return 0, None # Return None (not error) when no items found + return ( + 0, + None, + ) # Return None (not error) when no items found - this is success with 0 items - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" + logger.info(f"Found {len(all_messages)} Gmail messages to index via Composio") + + # ======================================================================= + # PHASE 1: Analyze all messages, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + await task_logger.log_task_progress( + log_entry, + f"Phase 1: Creating pending documents for {len(all_messages)} messages", + {"stage": "phase1_pending"}, + ) + + ( + messages_to_process, + documents_skipped, + duplicate_content_count, + ) = await _analyze_gmail_messages_phase1( + session=session, + messages=all_messages, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + + # Commit all pending documents - they all appear in UI now + new_documents_count = len([m for m in messages_to_process if m['is_new']]) + if new_documents_count > 0: + logger.info(f"Phase 1: Committing {new_documents_count} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + await task_logger.log_task_progress( + log_entry, + f"Phase 2: Processing {len(messages_to_process)} documents", + {"stage": "phase2_processing"}, + ) + + documents_indexed, documents_failed = await _process_gmail_messages_phase2( + session=session, + messages_to_process=messages_to_process, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + on_heartbeat_callback=on_heartbeat_callback, + ) + + # CRITICAL: Always update timestamp so Electric SQL syncs await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit to ensure all documents are persisted (safety net) - # This matches the pattern used in non-Composio Gmail indexer + # Final commit to ensure all documents are persisted logger.info( - f"Final commit: Total {total_documents_indexed} Gmail messages processed" - ) - await session.commit() - logger.info( - "Successfully committed all Composio Gmail document changes to database" + f"Final commit: Total {documents_indexed} Gmail messages processed" ) + try: + await session.commit() + logger.info( + "Successfully committed all Composio Gmail document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None await task_logger.log_task_success( log_entry, f"Successfully completed Gmail indexing via Composio for connector {connector_id}", { - "documents_indexed": total_documents_indexed, - "documents_skipped": total_documents_skipped, - "messages_fetched": total_messages_fetched, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, }, ) - return total_documents_indexed, None + logger.info( + f"Composio Gmail indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" + ) + return documents_indexed, warning_message except Exception as e: logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True) diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index d7299fbfe..26cfd3020 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -21,10 +21,14 @@ from sqlalchemy.orm.attributes import flag_modified from app.config import config from app.connectors.composio_connector import ComposioConnector -from app.db import Document, DocumentType, Log +from app.db import Document, DocumentStatus, DocumentType, Log from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import ( + check_duplicate_document_by_hash, + safe_set_chunks, +) from app.utils.document_converters import ( create_document_chunks, generate_content_hash, @@ -537,22 +541,6 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() -async def check_document_by_content_hash( - session: AsyncSession, content_hash: str -) -> Document | None: - """Check if a document with the given content hash already exists. - - This is used to prevent duplicate content from being indexed, regardless - of which connector originally indexed it. - """ - from sqlalchemy.future import select - - existing_doc_result = await session.execute( - select(Document).where(Document.content_hash == content_hash) - ) - return existing_doc_result.scalars().first() - - async def check_document_by_google_drive_file_id( session: AsyncSession, file_id: str, search_space_id: int ) -> Document | None: @@ -843,14 +831,16 @@ async def _index_composio_drive_delta_sync( log_entry, on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, list[str]]: - """Index Google Drive files using delta sync (only changed files). + """Index Google Drive files using delta sync with real-time document status updates. Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync. Handles: new files, modified files, and deleted files. """ documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 processing_errors = [] + duplicate_content_count = 0 last_heartbeat_time = time.time() # Fetch all changes with pagination @@ -881,14 +871,13 @@ async def _index_composio_drive_delta_sync( logger.info(f"Processing {len(all_changes)} changes from delta sync") - for change in all_changes[:max_items]: - # Send heartbeat periodically to indicate task is still alive - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time + # ======================================================================= + # PHASE 1: Analyze all changes, handle deletions, create pending documents + # ======================================================================= + files_to_process = [] + new_documents_created = False + for change in all_changes[:max_items]: try: # Handle removed files is_removed = change.get("removed", False) @@ -899,9 +888,8 @@ async def _index_composio_drive_delta_sync( documents_skipped += 1 continue - # Check if file was trashed or removed + # Check if file was trashed or removed - handle deletions immediately if is_removed or file_info.get("trashed", False): - # Remove document from database document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) unique_identifier_hash = generate_unique_identifier_hash( document_type, f"drive_{file_id}", search_space_id @@ -923,37 +911,219 @@ async def _index_composio_drive_delta_sync( if mime_type == "application/vnd.google-apps.folder": continue - # Process the file - indexed, skipped, errors = await _process_single_drive_file( - session=session, - composio_connector=composio_connector, - file_id=file_id, - file_name=file_name, - mime_type=mime_type, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, + # Check for existing document by file ID (from any connector) + existing_by_file_id = await check_document_by_google_drive_file_id( + session, file_id, search_space_id ) - documents_indexed += indexed - documents_skipped += skipped - processing_errors.extend(errors) + # Generate unique identifier hash + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"drive_{file_id}", search_space_id + ) + + # Check if document exists by unique identifier + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_by_file_id and not existing_document: + # File already indexed by different connector - skip + logger.info( + f"Skipping file {file_name} (file_id={file_id}): already indexed " + f"by {existing_by_file_id.document_type.value}" + ) + documents_skipped += 1 + continue + + if existing_document: + # Queue existing document for update + files_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'file_id': file_id, + 'file_name': file_name, + 'mime_type': mime_type, + }) + continue + + # Create new document with PENDING status + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), + document_metadata={ + "file_id": file_id, + "file_name": file_name, + "FILE_NAME": file_name, + "mime_type": mime_type, + "connector_id": connector_id, + "toolkit_id": "googledrive", + "source": "composio", + }, + content="Pending...", + content_hash=unique_identifier_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], + status=DocumentStatus.pending(), + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + new_documents_created = True + + files_to_process.append({ + 'document': document, + 'is_new': True, + 'file_id': file_id, + 'file_name': file_name, + 'mime_type': mime_type, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for change: {e!s}", exc_info=True) + documents_skipped += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # ======================================================================= + logger.info(f"Phase 2: Processing {len(files_to_process)} documents") + + for item in files_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit + document.status = DocumentStatus.processing() + await session.commit() + + # Get file content + content, content_error = await composio_connector.get_drive_file_content( + item['file_id'], original_mime_type=item['mime_type'] + ) + + if content_error or not content: + logger.warning(f"Could not get content for file {item['file_name']}: {content_error}") + markdown_content = f"# {item['file_name']}\n\n" + markdown_content += f"**File ID:** {item['file_id']}\n" + markdown_content += f"**Type:** {item['mime_type']}\n" + elif isinstance(content, dict): + error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}" + logger.error(error_msg) + processing_errors.append(error_msg) + markdown_content = f"# {item['file_name']}\n\n" + markdown_content += f"**File ID:** {item['file_id']}\n" + markdown_content += f"**Type:** {item['mime_type']}\n" + else: + markdown_content = await _process_file_content( + content=content, + file_name=item['file_name'], + file_id=item['file_id'], + mime_type=item['mime_type'], + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + processing_errors=processing_errors, + ) + + content_hash = generate_content_hash(markdown_content, search_space_id) + + # For existing documents, check if content changed + if not item['is_new'] and document.content_hash == content_hash: + if not DocumentStatus.is_state(document.status, DocumentStatus.READY): + document.status = DocumentStatus.ready() + documents_skipped += 1 + continue + + # Check for duplicate content hash (for new documents) + if item['is_new']: + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + if duplicate_by_content: + logger.info( + f"File {item['file_name']} already indexed by another connector. Skipping." + ) + await session.delete(document) + duplicate_content_count += 1 + documents_skipped += 1 + continue + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm(session, user_id, search_space_id) + + if user_llm: + document_metadata_for_summary = { + "file_id": item['file_id'], + "file_name": item['file_name'], + "mime_type": item['mime_type'], + "document_type": "Google Drive File (Composio)", + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata_for_summary + ) + else: + summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}" + summary_embedding = config.embedding_model_instance.embed(summary_content) + + chunks = await create_document_chunks(markdown_content) + + # Update document to READY + document.title = item['file_name'] + document.content = summary_content + document.content_hash = content_hash + document.embedding = summary_embedding + document.document_metadata = { + "file_id": item['file_id'], + "file_name": item['file_name'], + "FILE_NAME": item['file_name'], + "mime_type": item['mime_type'], + "connector_id": connector_id, + "source": "composio", + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 # Batch commit every 10 documents - if documents_indexed > 0 and documents_indexed % 10 == 0: + if documents_indexed % 10 == 0: await session.commit() logger.info(f"Committed batch: {documents_indexed} changes processed") except Exception as e: - error_msg = f"Error processing change for file {file_id}: {e!s}" + error_msg = f"Error processing change for file {item['file_id']}: {e!s}" logger.error(error_msg, exc_info=True) processing_errors.append(error_msg) - documents_skipped += 1 + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue logger.info( - f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped" + f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, " + f"{documents_failed} failed ({duplicate_content_count} duplicate content)" ) return documents_indexed, documents_skipped, processing_errors @@ -973,10 +1143,12 @@ async def _index_composio_drive_full_scan( log_entry, on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, list[str]]: - """Index Google Drive files using full scan (first sync or when no delta token).""" + """Index Google Drive files using full scan with real-time document status updates.""" documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 processing_errors = [] + duplicate_content_count = 0 last_heartbeat_time = time.time() all_files = [] @@ -1108,14 +1280,14 @@ async def _index_composio_drive_full_scan( f"Found {len(all_files)} Google Drive files to index via Composio (full scan)" ) - for file_info in all_files: - # Send heartbeat periodically to indicate task is still alive - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time + # ======================================================================= + # PHASE 1: Analyze all files, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + files_to_process = [] # List of dicts with document and file data + new_documents_created = False + for file_info in all_files: try: # Handle both standard Google API and potential Composio variations file_id = file_info.get("id", "") or file_info.get("fileId", "") @@ -1132,227 +1304,228 @@ async def _index_composio_drive_full_scan( if mime_type == "application/vnd.google-apps.folder": continue - # Process the file - indexed, skipped, errors = await _process_single_drive_file( - session=session, - composio_connector=composio_connector, - file_id=file_id, - file_name=file_name, - mime_type=mime_type, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, + # ========== EARLY DUPLICATE CHECK BY FILE ID ========== + existing_by_file_id = await check_document_by_google_drive_file_id( + session, file_id, search_space_id + ) + if existing_by_file_id: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): already indexed " + f"by {existing_by_file_id.document_type.value}" + ) + documents_skipped += 1 + continue + + # Generate unique identifier hash + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"drive_{file_id}", search_space_id ) - documents_indexed += indexed - documents_skipped += skipped - processing_errors.extend(errors) + # Check if document exists by unique identifier + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Queue existing document for update (will be set to processing in Phase 2) + files_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'file_id': file_id, + 'file_name': file_name, + 'mime_type': mime_type, + }) + continue + + # Create new document with PENDING status (visible in UI immediately) + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), + document_metadata={ + "file_id": file_id, + "file_name": file_name, + "FILE_NAME": file_name, + "mime_type": mime_type, + "connector_id": connector_id, + "toolkit_id": "googledrive", + "source": "composio", + }, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + new_documents_created = True + + files_to_process.append({ + 'document': document, + 'is_new': True, + 'file_id': file_id, + 'file_name': file_name, + 'mime_type': mime_type, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for file: {e!s}", exc_info=True) + documents_skipped += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(files_to_process)} documents") + + for item in files_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Get file content (pass mime_type for Google Workspace export handling) + content, content_error = await composio_connector.get_drive_file_content( + item['file_id'], original_mime_type=item['mime_type'] + ) + + if content_error or not content: + logger.warning(f"Could not get content for file {item['file_name']}: {content_error}") + markdown_content = f"# {item['file_name']}\n\n" + markdown_content += f"**File ID:** {item['file_id']}\n" + markdown_content += f"**Type:** {item['mime_type']}\n" + elif isinstance(content, dict): + error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}" + logger.error(error_msg) + processing_errors.append(error_msg) + markdown_content = f"# {item['file_name']}\n\n" + markdown_content += f"**File ID:** {item['file_id']}\n" + markdown_content += f"**Type:** {item['mime_type']}\n" + else: + # Process content based on file type + markdown_content = await _process_file_content( + content=content, + file_name=item['file_name'], + file_id=item['file_id'], + mime_type=item['mime_type'], + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + processing_errors=processing_errors, + ) + + content_hash = generate_content_hash(markdown_content, search_space_id) + + # For existing documents, check if content changed + if not item['is_new'] and document.content_hash == content_hash: + # Ensure status is ready + if not DocumentStatus.is_state(document.status, DocumentStatus.READY): + document.status = DocumentStatus.ready() + documents_skipped += 1 + continue + + # Check for duplicate content hash (for new documents) + if item['is_new']: + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + if duplicate_by_content: + logger.info( + f"File {item['file_name']} already indexed by another connector. Skipping." + ) + # Remove the pending document we created + await session.delete(document) + duplicate_content_count += 1 + documents_skipped += 1 + continue + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm(session, user_id, search_space_id) + + if user_llm: + document_metadata_for_summary = { + "file_id": item['file_id'], + "file_name": item['file_name'], + "mime_type": item['mime_type'], + "document_type": "Google Drive File (Composio)", + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata_for_summary + ) + else: + summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}" + summary_embedding = config.embedding_model_instance.embed(summary_content) + + chunks = await create_document_chunks(markdown_content) + + # Update document to READY with actual content + document.title = item['file_name'] + document.content = summary_content + document.content_hash = content_hash + document.embedding = summary_embedding + document.document_metadata = { + "file_id": item['file_id'], + "file_name": item['file_name'], + "FILE_NAME": item['file_name'], + "mime_type": item['mime_type'], + "connector_id": connector_id, + "source": "composio", + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 # Batch commit every 10 documents - if documents_indexed > 0 and documents_indexed % 10 == 0: + if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Drive files processed so far" ) await session.commit() except Exception as e: - error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}" + error_msg = f"Error processing Drive file {item['file_name']}: {e!s}" logger.error(error_msg, exc_info=True) processing_errors.append(error_msg) - documents_skipped += 1 + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue logger.info( - f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped" + f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, " + f"{documents_failed} failed ({duplicate_content_count} duplicate content)" ) return documents_indexed, documents_skipped, processing_errors -async def _process_single_drive_file( - session: AsyncSession, - composio_connector: ComposioGoogleDriveConnector, - file_id: str, - file_name: str, - mime_type: str, - connector_id: int, - search_space_id: int, - user_id: str, - task_logger: TaskLoggingService, - log_entry, -) -> tuple[int, int, list[str]]: - """Process a single Google Drive file for indexing. - - Returns: - Tuple of (documents_indexed, documents_skipped, processing_errors) - """ - processing_errors = [] - - # ========== EARLY DUPLICATE CHECK BY FILE ID ========== - # Check if this Google Drive file was already indexed by ANY connector - # This happens BEFORE download/ETL to save expensive API calls - existing_by_file_id = await check_document_by_google_drive_file_id( - session, file_id, search_space_id - ) - if existing_by_file_id: - logger.info( - f"Skipping file {file_name} (file_id={file_id}): already indexed " - f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' " - f"(saved download & ETL cost)" - ) - return 0, 1, processing_errors # Skip - NO download, NO ETL! - # ====================================================== - - # Generate unique identifier hash - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - - # Check if document exists by unique identifier (same connector, same file) - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Get file content (pass mime_type for Google Workspace export handling) - content, content_error = await composio_connector.get_drive_file_content( - file_id, original_mime_type=mime_type - ) - - if content_error or not content: - logger.warning(f"Could not get content for file {file_name}: {content_error}") - # Use metadata as content fallback - markdown_content = f"# {file_name}\n\n" - markdown_content += f"**File ID:** {file_id}\n" - markdown_content += f"**Type:** {mime_type}\n" - elif isinstance(content, dict): - # Safety check: if content is still a dict, log error and use fallback - error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}" - logger.error(error_msg) - processing_errors.append(error_msg) - markdown_content = f"# {file_name}\n\n" - markdown_content += f"**File ID:** {file_id}\n" - markdown_content += f"**Type:** {mime_type}\n" - else: - # Process content based on file type - markdown_content = await _process_file_content( - content=content, - file_name=file_name, - file_id=file_id, - mime_type=mime_type, - search_space_id=search_space_id, - user_id=user_id, - session=session, - task_logger=task_logger, - log_entry=log_entry, - processing_errors=processing_errors, - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - if existing_document: - if existing_document.content_hash == content_hash: - return 0, 1, processing_errors # Skipped - unchanged - - # Update existing document - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - - if user_llm: - document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "document_type": "Google Drive File (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}" - summary_embedding = config.embedding_model_instance.embed(summary_content) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = file_name - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "file_id": file_id, - "file_name": file_name, - "FILE_NAME": file_name, # For compatibility - "mime_type": mime_type, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - return 1, 0, processing_errors # Indexed - updated - - # Check if content_hash already exists (from any connector) - # This prevents duplicate content and avoids IntegrityError on unique constraint - existing_by_content_hash = await check_document_by_content_hash( - session, content_hash - ) - if existing_by_content_hash: - logger.info( - f"Skipping file {file_name} (file_id={file_id}): identical content " - f"already indexed as '{existing_by_content_hash.title}'" - ) - return 0, 1, processing_errors # Skipped - duplicate content - - # Create new document - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - - if user_llm: - document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "document_type": "Google Drive File (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}" - summary_embedding = config.embedding_model_instance.embed(summary_content) - - chunks = await create_document_chunks(markdown_content) - - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), - document_metadata={ - "file_id": file_id, - "file_name": file_name, - "FILE_NAME": file_name, # For compatibility - "mime_type": mime_type, - "toolkit_id": "googledrive", - "source": "composio", - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - - return 1, 0, processing_errors # Indexed - new - - async def _fetch_folder_files_recursively( composio_connector: ComposioGoogleDriveConnector, folder_id: str, diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx index 0bd8189b8..d579fe677 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx @@ -56,7 +56,7 @@ function StatusIndicator({ status }: { status?: DocumentStatus }) { - Processing... + Syncing ); case "failed": diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx index 867fdc916..4133f2960 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx @@ -119,7 +119,7 @@ export function RowActions({