diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 70e8f28f9..747e02834 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -2127,6 +2127,7 @@ async def run_google_gmail_indexing( start_date: str | None, end_date: str | None, update_last_indexed: bool, + on_heartbeat_callback=None, ) -> tuple[int, str | None]: # Use a reasonable default for max_messages max_messages = 1000 @@ -2139,6 +2140,7 @@ async def run_google_gmail_indexing( end_date=end_date, update_last_indexed=update_last_indexed, max_messages=max_messages, + on_heartbeat_callback=on_heartbeat_callback, ) # index_google_gmail_messages returns (int, str) but we need (int, str | None) return indexed_count, error_message if error_message else None diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 8d7b8b045..ad749e61c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -1,5 +1,9 @@ """ Google Calendar connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time @@ -11,7 +15,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.google_calendar_connector import GoogleCalendarConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -27,6 +31,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -284,7 +289,7 @@ async def index_google_calendar_events( documents_indexed = 0 documents_skipped = 0 - skipped_events = [] + documents_failed = 0 # Track events that failed processing duplicate_content_count = ( 0 # Track events skipped due to duplicate content_hash ) @@ -292,14 +297,14 @@ async def index_google_calendar_events( # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze all events, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + events_to_process = [] # List of dicts with document and event data + new_documents_created = False + for event in events: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() try: event_id = event.get("id") event_summary = event.get("summary", "No Title") @@ -307,14 +312,12 @@ async def index_google_calendar_events( if not event_id: logger.warning(f"Skipping event with missing ID: {event_summary}") - skipped_events.append(f"{event_summary} (missing ID)") documents_skipped += 1 continue event_markdown = calendar_client.format_event_to_markdown(event) if not event_markdown.strip(): logger.warning(f"Skipping event with no content: {event_summary}") - skipped_events.append(f"{event_summary} (no content)") documents_skipped += 1 continue @@ -341,82 +344,27 @@ async def index_google_calendar_events( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Google Calendar event {event_summary} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Google Calendar event {event_summary}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location or "No location", - "document_type": "Google Calendar Event", - "connector_type": "Google Calendar", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - event_markdown, user_llm, document_metadata - ) - else: - summary_content = ( - f"Google Calendar Event: {event_summary}\n\n" - ) - summary_content += f"Calendar: {calendar_id}\n" - summary_content += f"Start: {start_time}\n" - summary_content += f"End: {end_time}\n" - if location: - summary_content += f"Location: {location}\n" - if description: - desc_preview = description[:1000] - if len(description) > 1000: - desc_preview += "..." - summary_content += f"Description: {desc_preview}\n" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(event_markdown) - - # Update existing document - existing_document.title = event_summary - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info( - f"Successfully updated Google Calendar event {event_summary}" - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + events_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'event_markdown': event_markdown, + 'content_hash': content_hash, + 'event_id': event_id, + 'event_summary': event_summary, + 'calendar_id': calendar_id, + 'start_time': start_time, + 'end_time': end_time, + 'location': location, + 'description': description, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -434,52 +382,9 @@ async def index_google_calendar_events( ) duplicate_content_count += 1 documents_skipped += 1 - skipped_events.append( - f"{event_summary} (already indexed by another connector)" - ) continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location or "No location", - "document_type": "Google Calendar Event", - "connector_type": "Google Calendar", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - event_markdown, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = f"Google Calendar Event: {event_summary}\n\n" - summary_content += f"Calendar: {calendar_id}\n" - summary_content += f"Start: {start_time}\n" - summary_content += f"End: {end_time}\n" - if location: - summary_content += f"Location: {location}\n" - if description: - desc_preview = description[:1000] - if len(description) > 1000: - desc_preview += "..." - summary_content += f"Description: {desc_preview}\n" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - chunks = await create_document_chunks(event_markdown) - + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=event_summary, @@ -491,23 +396,124 @@ async def index_google_calendar_events( "start_time": start_time, "end_time": end_time, "location": location, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new event {event_summary}") + new_documents_created = True - # Batch commit every 10 documents + events_to_process.append({ + 'document': document, + 'is_new': True, + 'event_markdown': event_markdown, + 'content_hash': content_hash, + 'event_id': event_id, + 'event_summary': event_summary, + 'calendar_id': calendar_id, + 'start_time': start_time, + 'end_time': end_time, + 'location': location, + 'description': description, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(events_to_process)} documents") + + for item in events_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "event_id": item['event_id'], + "event_summary": item['event_summary'], + "calendar_id": item['calendar_id'], + "start_time": item['start_time'], + "end_time": item['end_time'], + "location": item['location'] or "No location", + "document_type": "Google Calendar Event", + "connector_type": "Google Calendar", + } + summary_content, summary_embedding = await generate_document_summary( + item['event_markdown'], user_llm, document_metadata_for_summary + ) + else: + summary_content = f"Google Calendar Event: {item['event_summary']}\n\n" + summary_content += f"Calendar: {item['calendar_id']}\n" + summary_content += f"Start: {item['start_time']}\n" + summary_content += f"End: {item['end_time']}\n" + if item['location']: + summary_content += f"Location: {item['location']}\n" + if item['description']: + desc_preview = item['description'][:1000] + if len(item['description']) > 1000: + desc_preview += "..." + summary_content += f"Description: {desc_preview}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['event_markdown']) + + # Update document to READY with actual content + document.title = item['event_summary'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "event_id": item['event_id'], + "event_summary": item['event_summary'], + "calendar_id": item['calendar_id'], + "start_time": item['start_time'], + "end_time": item['end_time'], + "location": item['location'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Calendar events processed so far" @@ -515,19 +521,18 @@ async def index_google_calendar_events( await session.commit() except Exception as e: - logger.error( - f"Error processing event {event.get('summary', 'Unknown')}: {e!s}", - exc_info=True, - ) - skipped_events.append( - f"{event.get('summary', 'Unknown')} (processing error)" - ) - documents_skipped += 1 + logger.error(f"Error processing Calendar event: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 continue - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info( @@ -535,6 +540,9 @@ async def index_google_calendar_events( ) try: await session.commit() + logger.info( + "Successfully committed all Google Calendar document changes to database" + ) except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( @@ -551,10 +559,15 @@ async def index_google_calendar_events( else: raise - # Build warning message if duplicates were found - warning_message = None + # Build warning message if there were issues + warning_parts = [] if duplicate_content_count > 0: - warning_message = f"{duplicate_content_count} skipped (duplicate)" + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None + + total_processed = documents_indexed await task_logger.log_task_success( log_entry, @@ -563,14 +576,15 @@ async def index_google_calendar_events( "events_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, "duplicate_content_count": duplicate_content_count, - "skipped_events_count": len(skipped_events), }, ) logger.info( - f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped " - f"({duplicate_content_count} due to duplicate content from other connectors)" + f"Google Calendar indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) return total_processed, warning_message diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 151c1abbc..8eae35d00 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -1,4 +1,9 @@ -"""Google Drive indexer using Surfsense file processors.""" +"""Google Drive indexer using Surfsense file processors. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed +""" import logging import time @@ -17,11 +22,12 @@ from app.connectors.google_drive import ( get_files_in_folder, get_start_page_token, ) -from app.db import DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + get_current_timestamp, update_connector_last_indexed, ) from app.utils.document_converters import generate_unique_identifier_hash @@ -324,8 +330,29 @@ async def index_google_drive_single_file( display_name = file_name or file.get("name", "Unknown") logger.info(f"Indexing Google Drive file: {display_name} ({file_id})") + # Create pending document for status visibility + pending_doc, should_skip = await _create_pending_document_for_file( + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + + if should_skip: + await task_logger.log_task_progress( + log_entry, + f"File {display_name} is unchanged or not indexable", + {"status": "skipped"}, + ) + return 0, None + + # Commit pending document so it appears in UI + if pending_doc and pending_doc.id is None: + await session.commit() + # Process the file - indexed, skipped = await _process_single_file( + indexed, skipped, failed = await _process_single_file( drive_client=drive_client, session=session, file=file, @@ -334,6 +361,7 @@ async def index_google_drive_single_file( user_id=user_id, task_logger=task_logger, log_entry=log_entry, + pending_document=pending_doc, ) await session.commit() @@ -341,6 +369,15 @@ async def index_google_drive_single_file( "Successfully committed Google Drive file indexing changes to database" ) + if failed > 0: + error_msg = f"Failed to index file {display_name}" + await task_logger.log_task_failure( + log_entry, + error_msg, + {"file_name": display_name, "file_id": file_id}, + ) + return 0, error_msg + if indexed > 0: await task_logger.log_task_success( log_entry, @@ -397,7 +434,12 @@ async def _index_full_scan( include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int]: - """Perform full scan indexing of a folder.""" + """Perform full scan indexing of a folder. + + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Collect all files and create pending documents (visible in UI immediately) + - Phase 2: Process each file: pending → processing → ready/failed + """ await task_logger.log_task_progress( log_entry, f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})", @@ -410,29 +452,31 @@ async def _index_full_scan( documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 files_processed = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Collect all files and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + files_to_process = [] # List of (file, pending_document or None) + new_documents_created = False + # Queue of folders to process: (folder_id, folder_name) folders_to_process = [(folder_id, folder_name)] + logger.info("Phase 1: Collecting files and creating pending documents") + while folders_to_process and files_processed < max_files: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() current_folder_id, current_folder_name = folders_to_process.pop(0) - logger.info(f"Processing folder: {current_folder_name} ({current_folder_id})") + logger.info(f"Scanning folder: {current_folder_name} ({current_folder_id})") page_token = None while files_processed < max_files: # Get files and folders in current folder - # include_subfolders=True here so we get folder items to queue them files, next_token, error = await get_files_in_folder( drive_client, current_folder_id, @@ -462,35 +506,74 @@ async def _index_full_scan( logger.debug(f"Queued subfolder: {file.get('name', 'Unknown')}") continue - # Process the file files_processed += 1 - indexed, skipped = await _process_single_file( - drive_client=drive_client, + # Create pending document for this file + pending_doc, should_skip = await _create_pending_document_for_file( session=session, file=file, connector_id=connector_id, search_space_id=search_space_id, user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, ) - documents_indexed += indexed - documents_skipped += skipped + if should_skip: + documents_skipped += 1 + continue - if documents_indexed % 10 == 0 and documents_indexed > 0: - await session.commit() - logger.info( - f"Committed batch: {documents_indexed} files indexed so far" - ) + if pending_doc and pending_doc.id is None: + # New document was created + new_documents_created = True + + files_to_process.append((file, pending_doc)) page_token = next_token if not page_token: break + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each file one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(files_to_process)} files") + + for file, pending_doc in files_to_process: + # Check if it's time for a heartbeat update + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + indexed, skipped, failed = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + pending_document=pending_doc, + ) + + documents_indexed += indexed + documents_skipped += skipped + documents_failed += failed + + if documents_indexed % 10 == 0 and documents_indexed > 0: + await session.commit() + logger.info( + f"Committed batch: {documents_indexed} files indexed so far" + ) + logger.info( - f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped" + f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed" ) return documents_indexed, documents_skipped @@ -514,6 +597,10 @@ async def _index_with_delta_sync( Note: include_subfolders is accepted for API consistency but delta sync automatically tracks changes across all folders including subfolders. + + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Collect all changes and create pending documents (visible in UI immediately) + - Phase 2: Process each file: pending → processing → ready/failed """ await task_logger.log_task_progress( log_entry, @@ -537,19 +624,21 @@ async def _index_with_delta_sync( documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 files_processed = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze changes and create pending documents for new/modified files + # ======================================================================= + changes_to_process = [] # List of (change, file, pending_document or None) + new_documents_created = False + + logger.info("Phase 1: Analyzing changes and creating pending documents") + for change in changes: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() if files_processed >= max_files: break @@ -566,7 +655,45 @@ async def _index_with_delta_sync( if not file: continue - indexed, skipped = await _process_single_file( + # Create pending document for this file + pending_doc, should_skip = await _create_pending_document_for_file( + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + + if should_skip: + documents_skipped += 1 + continue + + if pending_doc and pending_doc.id is None: + # New document was created + new_documents_created = True + + changes_to_process.append((change, file, pending_doc)) + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each file one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(changes_to_process)} changes") + + for change, file, pending_doc in changes_to_process: + # Check if it's time for a heartbeat update + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + indexed, skipped, failed = await _process_single_file( drive_client=drive_client, session=session, file=file, @@ -575,21 +702,123 @@ async def _index_with_delta_sync( user_id=user_id, task_logger=task_logger, log_entry=log_entry, + pending_document=pending_doc, ) documents_indexed += indexed documents_skipped += skipped + documents_failed += failed if documents_indexed % 10 == 0 and documents_indexed > 0: await session.commit() logger.info(f"Committed batch: {documents_indexed} changes processed") logger.info( - f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped" + f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed" ) return documents_indexed, documents_skipped +async def _create_pending_document_for_file( + session: AsyncSession, + file: dict, + connector_id: int, + search_space_id: int, + user_id: str, +) -> tuple[Document | None, bool]: + """ + Create a pending document for a Google Drive file if it doesn't exist. + + This is Phase 1 of the 2-phase document status update pattern. + Creates documents with 'pending' status so they appear in UI immediately. + + Args: + session: Database session + file: File metadata from Google Drive API + connector_id: ID of the Drive connector + search_space_id: ID of the search space + user_id: ID of the user + + Returns: + Tuple of (document, should_skip): + - (existing_doc, False): Existing document that needs update + - (new_pending_doc, False): New pending document created + - (None, True): File should be skipped (unchanged, rename-only, or folder) + """ + from app.connectors.google_drive.file_types import should_skip_file + + file_id = file.get("id") + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + # Skip folders and shortcuts + if should_skip_file(mime_type): + return None, True + + if not file_id: + return None, True + + # Generate unique identifier hash for this file + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + + # Check if document exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Check if this is a rename-only update (content unchanged) + incoming_md5 = file.get("md5Checksum") + incoming_modified_time = file.get("modifiedTime") + doc_metadata = existing_document.document_metadata or {} + stored_md5 = doc_metadata.get("md5_checksum") + stored_modified_time = doc_metadata.get("modified_time") + + # Determine if content changed + content_unchanged = False + if incoming_md5 and stored_md5: + content_unchanged = incoming_md5 == stored_md5 + elif not incoming_md5 and incoming_modified_time and stored_modified_time: + # Google Workspace file - use modifiedTime as fallback + content_unchanged = incoming_modified_time == stored_modified_time + + if content_unchanged: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() + return None, True + + # Content changed - return existing document for update + return existing_document, False + + # Create new pending document + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + document_metadata={ + "google_drive_file_id": file_id, + "google_drive_file_name": file_name, + "google_drive_mime_type": mime_type, + "connector_id": connector_id, + }, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation + status=DocumentStatus.pending(), # Pending until processing starts + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + + return document, False + + async def _check_rename_only_update( session: AsyncSession, file: dict, @@ -725,15 +954,31 @@ async def _process_single_file( user_id: str, task_logger: TaskLoggingService, log_entry: any, -) -> tuple[int, int]: + pending_document: Document | None = None, +) -> tuple[int, int, int]: """ Process a single file by downloading and using Surfsense's file processor. + + Implements Phase 2 of the 2-phase document status update pattern. + Updates document status: pending → processing → ready/failed + + Args: + drive_client: Google Drive client + session: Database session + file: File metadata from Google Drive API + connector_id: ID of the connector + search_space_id: ID of the search space + user_id: ID of the user + task_logger: Task logging service + log_entry: Log entry for tracking + pending_document: Optional pending document created in Phase 1 Returns: - Tuple of (indexed_count, skipped_count) + Tuple of (indexed_count, skipped_count, failed_count) """ file_name = file.get("name", "Unknown") mime_type = file.get("mimeType", "") + file_id = file.get("id") try: logger.info(f"Processing file: {file_name} ({mime_type})") @@ -756,10 +1001,15 @@ async def _process_single_file( # Return 1 for renamed files (they are "indexed" in the sense that they're updated) # Return 0 for unchanged files if "renamed" in (rename_message or "").lower(): - return 1, 0 - return 0, 1 + return 1, 0, 0 + return 0, 1, 0 - _, error, _ = await download_and_process_file( + # Set document to PROCESSING status if we have a pending document + if pending_document: + pending_document.status = DocumentStatus.processing() + await session.commit() + + _, error, metadata = await download_and_process_file( client=drive_client, file=file, search_space_id=search_space_id, @@ -776,14 +1026,43 @@ async def _process_single_file( f"Skipped {file_name}: {error}", {"status": "skipped", "reason": error}, ) - return 0, 1 + # Mark pending document as failed if it exists + if pending_document: + pending_document.status = DocumentStatus.failed(error) + pending_document.updated_at = get_current_timestamp() + await session.commit() + return 0, 1, 0 + + # The document was created/updated by download_and_process_file + # Find the document and ensure it has READY status + if file_id: + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + processed_doc = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + if processed_doc: + # Ensure status is READY + if not DocumentStatus.is_state(processed_doc.status, DocumentStatus.READY): + processed_doc.status = DocumentStatus.ready() + processed_doc.updated_at = get_current_timestamp() + await session.commit() logger.info(f"Successfully indexed Google Drive file: {file_name}") - return 1, 0 + return 1, 0, 0 except Exception as e: logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True) - return 0, 1 + # Mark pending document as failed if it exists + if pending_document: + try: + pending_document.status = DocumentStatus.failed(str(e)) + pending_document.updated_at = get_current_timestamp() + await session.commit() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + return 0, 0, 1 async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 805be5781..89e8796d3 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -1,5 +1,9 @@ """ Google Gmail connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time @@ -13,6 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.google_gmail_connector import GoogleGmailConnector from app.db import ( Document, + DocumentStatus, DocumentType, SearchSourceConnectorType, ) @@ -32,6 +37,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -220,20 +226,21 @@ async def index_google_gmail_messages( logger.info(f"Found {len(messages)} Google gmail messages to index") documents_indexed = 0 - skipped_messages = [] documents_skipped = 0 + documents_failed = 0 # Track messages that failed processing + duplicate_content_count = 0 # Track messages skipped due to duplicate content_hash # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze all messages, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + messages_to_process = [] # List of dicts with document and message data + new_documents_created = False + for message in messages: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() try: # Extract message information message_id = message.get("id", "") @@ -259,7 +266,6 @@ async def index_google_gmail_messages( if not message_id: logger.warning(f"Skipping message with missing ID: {subject}") - skipped_messages.append(f"{subject} (missing ID)") documents_skipped += 1 continue @@ -268,7 +274,6 @@ async def index_google_gmail_messages( if not markdown_content.strip(): logger.warning(f"Skipping message with no content: {subject}") - skipped_messages.append(f"{subject} (no content)") documents_skipped += 1 continue @@ -288,68 +293,25 @@ async def index_google_gmail_messages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Gmail message {subject} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Gmail message {subject}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "document_type": "Gmail Message", - "connector_type": "Google Gmail", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = f"Google Gmail Message: {subject}\n\n" - summary_content += f"Sender: {sender}\n" - summary_content += f"Date: {date_str}\n" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(markdown_content) - - # Update existing document - existing_document.title = subject - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "connector_id": connector_id, - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info(f"Successfully updated Gmail message {subject}") - continue + # Queue existing document for update (will be set to processing in Phase 2) + messages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'message_id': message_id, + 'thread_id': thread_id, + 'subject': subject, + 'sender': sender, + 'date_str': date_str, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -364,45 +326,11 @@ async def index_google_gmail_messages( f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "document_type": "Gmail Message", - "connector_type": "Google Gmail", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = f"Google Gmail Message: {subject}\n\n" - summary_content += f"Sender: {sender}\n" - summary_content += f"Date: {date_str}\n" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(markdown_content) - - # Create and store new document - logger.info(f"Creating new document for Gmail message: {subject}") + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=subject, @@ -413,21 +341,111 @@ async def index_google_gmail_messages( "subject": subject, "sender": sender, "date": date_str, + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new email {summary_content}") + new_documents_created = True - # Batch commit every 10 documents + messages_to_process.append({ + 'document': document, + 'is_new': True, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'message_id': message_id, + 'thread_id': thread_id, + 'subject': subject, + 'sender': sender, + 'date_str': date_str, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + + for item in messages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "message_id": item['message_id'], + "thread_id": item['thread_id'], + "subject": item['subject'], + "sender": item['sender'], + "date": item['date_str'], + "document_type": "Gmail Message", + "connector_type": "Google Gmail", + } + summary_content, summary_embedding = await generate_document_summary( + item['markdown_content'], user_llm, document_metadata_for_summary + ) + else: + summary_content = f"Google Gmail Message: {item['subject']}\n\n" + summary_content += f"Sender: {item['sender']}\n" + summary_content += f"Date: {item['date_str']}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['markdown_content']) + + # Update document to READY with actual content + document.title = item['subject'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "message_id": item['message_id'], + "thread_id": item['thread_id'], + "subject": item['subject'], + "sender": item['sender'], + "date": item['date_str'], + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Gmail messages processed so far" @@ -435,45 +453,74 @@ async def index_google_gmail_messages( await session.commit() except Exception as e: - logger.error( - f"Error processing the email {message_id}: {e!s}", - exc_info=True, - ) - skipped_messages.append(f"{subject} (processing error)") - documents_skipped += 1 - continue # Skip this message and continue with others + logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue - # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed") - await session.commit() - logger.info( - "Successfully committed all Google gmail document changes to database" - ) + try: + await session.commit() + logger.info( + "Successfully committed all Google Gmail document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same message was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None + + total_processed = documents_indexed # Log success await task_logger.log_task_success( log_entry, - f"Successfully completed Google gmail indexing for connector {connector_id}", + f"Successfully completed Google Gmail indexing for connector {connector_id}", { "events_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, - "skipped_messages_count": len(skipped_messages), + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, }, ) logger.info( - f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped" + f"Google Gmail indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) return ( total_processed, - None, - ) # Return None as the error message to indicate success + warning_message, + ) # Return warning_message (None on success) except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 674773463..4433cb11e 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -17,7 +17,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.config import config as app_config -from app.db import Document, DocumentType, Log, Notification +from app.db import Document, DocumentStatus, DocumentType, Log, Notification from app.services.llm_service import get_user_long_context_llm from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService @@ -499,6 +499,7 @@ async def add_received_file_document_using_unstructured( existing_document.blocknote_document = blocknote_json existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() + existing_document.status = DocumentStatus.ready() # Mark as ready await session.commit() await session.refresh(existing_document) @@ -528,6 +529,7 @@ async def add_received_file_document_using_unstructured( updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector.get("connector_id") if connector else None, + status=DocumentStatus.ready(), # Mark as ready ) session.add(document) @@ -640,6 +642,7 @@ async def add_received_file_document_using_llamacloud( existing_document.blocknote_document = blocknote_json existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() + existing_document.status = DocumentStatus.ready() # Mark as ready await session.commit() await session.refresh(existing_document) @@ -669,6 +672,7 @@ async def add_received_file_document_using_llamacloud( updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector.get("connector_id") if connector else None, + status=DocumentStatus.ready(), # Mark as ready ) session.add(document) @@ -806,6 +810,7 @@ async def add_received_file_document_using_docling( existing_document.blocknote_document = blocknote_json existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() + existing_document.status = DocumentStatus.ready() # Mark as ready await session.commit() await session.refresh(existing_document) @@ -835,6 +840,7 @@ async def add_received_file_document_using_docling( updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector.get("connector_id") if connector else None, + status=DocumentStatus.ready(), # Mark as ready ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index ff85d962e..8ecbb1370 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -7,7 +7,7 @@ import logging from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Document, DocumentType +from app.db import Document, DocumentStatus, DocumentType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -270,6 +270,7 @@ async def add_received_markdown_file_document( existing_document.chunks = chunks existing_document.blocknote_document = blocknote_json existing_document.updated_at = get_current_timestamp() + existing_document.status = DocumentStatus.ready() # Mark as ready await session.commit() await session.refresh(existing_document) @@ -297,6 +298,7 @@ async def add_received_markdown_file_document( updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector.get("connector_id") if connector else None, + status=DocumentStatus.ready(), # Mark as ready ) session.add(document)