diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index f1338564e..fbf90b345 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -1,5 +1,9 @@ """ BookStack connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Collect all pages and create pending documents (visible in UI immediately) +- Phase 2: Process each page: pending → processing → ready/failed """ import time @@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.bookstack_connector import BookStackConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -28,6 +32,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -184,22 +189,22 @@ async def index_bookstack_pages( logger.error(f"Error fetching BookStack pages: {e!s}", exc_info=True) return 0, f"Error fetching BookStack pages: {e!s}" - # Process and index each page + # ======================================================================= + # PHASE 1: Analyze all pages, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= documents_indexed = 0 skipped_pages = [] documents_skipped = 0 + documents_failed = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + pages_to_process = [] # List of dicts with document and page data + new_documents_created = False + for page in pages: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() try: page_id = page.get("id") page_name = page.get("name", "") @@ -218,7 +223,7 @@ async def index_bookstack_pages( # Fetch full page content (Markdown preferred) try: - page_detail, page_content = bookstack_client.get_page_with_content( + _, page_content = bookstack_client.get_page_with_content( page_id, use_markdown=True ) except Exception as e: @@ -252,82 +257,34 @@ async def index_bookstack_pages( # Build page URL page_url = f"{bookstack_base_url}/books/{book_slug}/page/{page_slug}" - # Build document metadata - doc_metadata = { - "page_id": page_id, - "page_name": page_name, - "page_slug": page_slug, - "book_id": book_id, - "book_slug": book_slug, - "chapter_id": chapter_id, - "base_url": bookstack_base_url, - "page_url": page_url, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() logger.info( f"Document for BookStack page {page_name} unchanged. Skipping." ) documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for BookStack page {page_name}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - summary_metadata = { - "page_name": page_name, - "page_id": page_id, - "book_id": book_id, - "document_type": "BookStack Page", - "connector_type": "BookStack", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - full_content, user_llm, summary_metadata - ) - else: - summary_content = ( - f"BookStack Page: {page_name}\n\nBook ID: {book_id}\n\n" - ) - if page_content: - content_preview = page_content[:1000] - if len(page_content) > 1000: - content_preview += "..." - summary_content += ( - f"Content Preview: {content_preview}\n\n" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(full_content) - - # Update existing document - existing_document.title = page_name - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = doc_metadata - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info(f"Successfully updated BookStack page {page_name}") - continue + # Queue existing document for update (will be set to processing in Phase 2) + pages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'page_id': page_id, + 'page_name': page_name, + 'page_slug': page_slug, + 'book_id': book_id, + 'book_slug': book_slug, + 'chapter_id': chapter_id, + 'page_url': page_url, + 'page_content': page_content, + 'full_content': full_content, + 'content_hash': content_hash, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -345,17 +302,104 @@ async def index_bookstack_pages( documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata + # Create new document with PENDING status (visible in UI immediately) + document = Document( + search_space_id=search_space_id, + title=page_name, + document_type=DocumentType.BOOKSTACK_CONNECTOR, + document_metadata={ + "page_id": page_id, + "page_name": page_name, + "page_slug": page_slug, + "book_id": book_id, + "book_slug": book_slug, + "chapter_id": chapter_id, + "base_url": bookstack_base_url, + "page_url": page_url, + "connector_id": connector_id, + }, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + new_documents_created = True + + pages_to_process.append({ + 'document': document, + 'is_new': True, + 'page_id': page_id, + 'page_name': page_name, + 'page_slug': page_slug, + 'book_id': book_id, + 'book_slug': book_slug, + 'chapter_id': chapter_id, + 'page_url': page_url, + 'page_content': page_content, + 'full_content': full_content, + 'content_hash': content_hash, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(pages_to_process)} documents") + + for item in pages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) + # Build document metadata + doc_metadata = { + "page_id": item['page_id'], + "page_name": item['page_name'], + "page_slug": item['page_slug'], + "book_id": item['book_id'], + "book_slug": item['book_slug'], + "chapter_id": item['chapter_id'], + "base_url": bookstack_base_url, + "page_url": item['page_url'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + if user_llm: summary_metadata = { - "page_name": page_name, - "page_id": page_id, - "book_id": book_id, + "page_name": item['page_name'], + "page_id": item['page_id'], + "book_id": item['book_id'], "document_type": "BookStack Page", "connector_type": "BookStack", } @@ -363,17 +407,17 @@ async def index_bookstack_pages( summary_content, summary_embedding, ) = await generate_document_summary( - full_content, user_llm, summary_metadata + item['full_content'], user_llm, summary_metadata ) else: # Fallback to simple summary if no LLM configured summary_content = ( - f"BookStack Page: {page_name}\n\nBook ID: {book_id}\n\n" + f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n" ) - if page_content: + if item['page_content']: # Take first 1000 characters of content for summary - content_preview = page_content[:1000] - if len(page_content) > 1000: + content_preview = item['page_content'][:1000] + if len(item['page_content']) > 1000: content_preview += "..." summary_content += f"Content Preview: {content_preview}\n\n" summary_embedding = config.embedding_model_instance.embed( @@ -381,30 +425,21 @@ async def index_bookstack_pages( ) # Process chunks - using the full page content - chunks = await create_document_chunks(full_content) + chunks = await create_document_chunks(item['full_content']) - # Create and store new document - logger.info(f"Creating new document for page {page_name}") - document = Document( - search_space_id=search_space_id, - title=page_name, - document_type=DocumentType.BOOKSTACK_CONNECTOR, - document_metadata=doc_metadata, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) + # Update document to READY with actual content + document.title = item['page_name'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = doc_metadata + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() - session.add(document) documents_indexed += 1 - logger.info(f"Successfully indexed new page {page_name}") - # Batch commit every 10 documents + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} BookStack pages processed so far" @@ -413,46 +448,72 @@ async def index_bookstack_pages( except Exception as e: logger.error( - f"Error processing page {page.get('name', 'Unknown')}: {e!s}", + f"Error processing page {item.get('page_name', 'Unknown')}: {e!s}", exc_info=True, ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") skipped_pages.append( - f"{page.get('name', 'Unknown')} (processing error)" + f"{item.get('page_name', 'Unknown')} (processing error)" ) - documents_skipped += 1 - continue # Skip this page and continue with others + documents_failed += 1 + continue - # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed - if update_last_indexed: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info( f"Final commit: Total {documents_indexed} BookStack pages processed" ) - await session.commit() - logger.info("Successfully committed all BookStack document changes to database") + try: + await session.commit() + logger.info("Successfully committed all BookStack document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same page was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed BookStack indexing for connector {connector_id}", { - "pages_processed": total_processed, + "pages_processed": documents_indexed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, "skipped_pages_count": len(skipped_pages), }, ) logger.info( - f"BookStack indexing completed: {documents_indexed} new pages, {documents_skipped} skipped" + f"BookStack indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed" ) - return ( - total_processed, - None, - ) # Return None as the error message to indicate success + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index fb6487474..97cd31a09 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -1,5 +1,9 @@ """ Elasticsearch indexer for SurfSense + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Collect all documents and create pending documents (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import json @@ -13,7 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from app.connectors.elasticsearch_connector import ElasticsearchConnector -from app.db import Document, DocumentType, SearchSourceConnector +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnector from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, @@ -25,6 +29,7 @@ from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, get_current_timestamp, + safe_set_chunks, ) # Type hint for heartbeat callback @@ -164,6 +169,8 @@ async def index_elasticsearch_documents( ) documents_processed = 0 + documents_skipped = 0 + documents_failed = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() @@ -178,23 +185,22 @@ async def index_elasticsearch_documents( "max_documents": max_documents, }, ) - # Use scroll search for large result sets + + # ======================================================================= + # PHASE 1: Collect all documents from Elasticsearch and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + docs_to_process = [] # List of dicts with document and ES data + new_documents_created = False + hits_collected = 0 + async for hit in es_connector.scroll_search( index=index_name, query=query, size=min(max_documents, 100), # Scroll in batches fields=config.get("ELASTICSEARCH_FIELDS"), ): - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) - >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_processed) - last_heartbeat_time = time.time() - - if documents_processed >= max_documents: + if hits_collected >= max_documents: break try: @@ -220,26 +226,12 @@ async def index_elasticsearch_documents( if not content.strip(): logger.warning(f"Skipping document {doc_id} - no content found") + documents_skipped += 1 continue # Create content hash content_hash = generate_content_hash(content, search_space_id) - # Build metadata - metadata = { - "elasticsearch_id": doc_id, - "elasticsearch_index": hit.get("_index", index_name), - "elasticsearch_score": hit.get("_score"), - "indexed_at": datetime.now().isoformat(), - "source": "ELASTICSEARCH_CONNECTOR", - } - - # Add any additional metadata fields specified in config - if "ELASTICSEARCH_METADATA_FIELDS" in config: - for field in config["ELASTICSEARCH_METADATA_FIELDS"]: - if field in source: - metadata[f"es_{field}"] = source[field] - # Build source-unique identifier and hash (prefer source id dedupe) source_identifier = f"{hit.get('_index', index_name)}:{doc_id}" unique_identifier_hash = generate_unique_identifier_hash( @@ -258,98 +250,209 @@ async def index_elasticsearch_documents( ) if existing_doc: - # If content is unchanged, skip. Otherwise update the existing document. + # If content is unchanged, skip. Otherwise queue for update. if existing_doc.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_doc.status, DocumentStatus.READY): + existing_doc.status = DocumentStatus.ready() logger.info( f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})" ) - continue - else: - logger.info( - f"Updating existing document {existing_doc.id} for ES doc {doc_id}" - ) - existing_doc.title = title - existing_doc.content = content - existing_doc.content_hash = content_hash - existing_doc.document_metadata = metadata - existing_doc.unique_identifier_hash = unique_identifier_hash - chunks = await create_document_chunks(content) - existing_doc.chunks = chunks - existing_doc.updated_at = get_current_timestamp() - await session.flush() - documents_processed += 1 - if documents_processed % 10 == 0: - await session.commit() + documents_skipped += 1 continue - # Create document + # Queue existing document for update (will be set to processing in Phase 2) + docs_to_process.append({ + 'document': existing_doc, + 'is_new': False, + 'doc_id': doc_id, + 'title': title, + 'content': content, + 'content_hash': content_hash, + 'unique_identifier_hash': unique_identifier_hash, + 'hit': hit, + 'source': source, + }) + hits_collected += 1 + continue + + # Build metadata for new document + metadata = { + "elasticsearch_id": doc_id, + "elasticsearch_index": hit.get("_index", index_name), + "elasticsearch_score": hit.get("_score"), + "source": "ELASTICSEARCH_CONNECTOR", + "connector_id": connector_id, + } + + # Add any additional metadata fields specified in config + if "ELASTICSEARCH_METADATA_FIELDS" in config: + for field in config["ELASTICSEARCH_METADATA_FIELDS"]: + if field in source: + metadata[f"es_{field}"] = source[field] + + # Create new document with PENDING status (visible in UI immediately) document = Document( title=title, - content=content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, document_type=DocumentType.ELASTICSEARCH_CONNECTOR, document_metadata=metadata, search_space_id=search_space_id, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - - # Create chunks and attach to document (persist via relationship) - chunks = await create_document_chunks(content) - document.chunks = chunks session.add(document) - await session.flush() + new_documents_created = True + + docs_to_process.append({ + 'document': document, + 'is_new': True, + 'doc_id': doc_id, + 'title': title, + 'content': content, + 'content_hash': content_hash, + 'unique_identifier_hash': unique_identifier_hash, + 'hit': hit, + 'source': source, + }) + hits_collected += 1 + + except Exception as e: + logger.error(f"Error in Phase 1 for ES doc: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([d for d in docs_to_process if d['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(docs_to_process)} documents") + + for item in docs_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_processed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Build metadata + metadata = { + "elasticsearch_id": item['doc_id'], + "elasticsearch_index": item['hit'].get("_index", index_name), + "elasticsearch_score": item['hit'].get("_score"), + "indexed_at": datetime.now().isoformat(), + "source": "ELASTICSEARCH_CONNECTOR", + "connector_id": connector_id, + } + + # Add any additional metadata fields specified in config + if "ELASTICSEARCH_METADATA_FIELDS" in config: + for field in config["ELASTICSEARCH_METADATA_FIELDS"]: + if field in item['source']: + metadata[f"es_{field}"] = item['source'][field] + + # Create chunks + chunks = await create_document_chunks(item['content']) + + # Update document to READY with actual content + document.title = item['title'] + document.content = item['content'] + document.content_hash = item['content_hash'] + document.unique_identifier_hash = item['unique_identifier_hash'] + document.document_metadata = metadata + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() documents_processed += 1 + # Batch commit every 10 documents (for ready status updates) if documents_processed % 10 == 0: logger.info( - f"Processed {documents_processed} Elasticsearch documents" + f"Committing batch: {documents_processed} Elasticsearch documents processed so far" ) await session.commit() except Exception as e: - msg = f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}" + msg = f"Error processing Elasticsearch document {item.get('doc_id', 'unknown')}: {e}" logger.error(msg) - await task_logger.log_task_failure( - log_entry, - "Document processing error", - msg, - { - "document_id": hit.get("_id", "unknown"), - "error_type": type(e).__name__, - }, - ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 continue - # Final commit - await session.commit() + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + if update_last_indexed: + connector.last_indexed_at = ( + datetime.now(UTC).isoformat().replace("+00:00", "Z") + ) + + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_processed} Elasticsearch documents processed") + try: + await session.commit() + logger.info("Successfully committed all Elasticsearch document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same document was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None await task_logger.log_task_success( log_entry, f"Successfully indexed {documents_processed} documents from Elasticsearch", - {"documents_indexed": documents_processed, "index": index_name}, + { + "documents_indexed": documents_processed, + "documents_skipped": documents_skipped, + "documents_failed": documents_failed, + "index": index_name, + }, ) logger.info( - f"Successfully indexed {documents_processed} documents from Elasticsearch" + f"Elasticsearch indexing completed: {documents_processed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed" ) - # Update last indexed timestamp if requested - if update_last_indexed and documents_processed > 0: - # connector.last_indexed_at = datetime.now() - connector.last_indexed_at = ( - datetime.now(UTC).isoformat().replace("+00:00", "Z") - ) - await session.commit() - await task_logger.log_task_progress( - log_entry, - "Updated connector.last_indexed_at", - {"last_indexed_at": connector.last_indexed_at}, - ) - - return documents_processed, None + return documents_processed, warning_message finally: # Clean up Elasticsearch connection diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index f4527843c..80d4ef3cf 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -1,5 +1,9 @@ """ Luma connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Collect all events and create pending documents (visible in UI immediately) +- Phase 2: Process each event: pending → processing → ready/failed """ import time @@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.luma_connector import LumaConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -27,6 +31,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -227,21 +232,22 @@ async def index_luma_events( logger.error(f"Error fetching Luma events: {e!s}", exc_info=True) return 0, f"Error fetching Luma events: {e!s}" + # ======================================================================= + # PHASE 1: Analyze all events, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 skipped_events = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + events_to_process = [] # List of dicts with document and event data + new_documents_created = False + for event in events: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() try: # Luma event structure fields - events have nested 'event' field event_data = event.get("event", {}) @@ -298,91 +304,34 @@ async def index_luma_events( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() logger.info( f"Document for Luma event {event_name} unchanged. Skipping." ) documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Luma event {event_name}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "event_id": event_id, - "event_name": event_name, - "event_url": event_url, - "start_at": start_at, - "end_at": end_at, - "timezone": timezone, - "location": location or "No location", - "city": city, - "hosts": host_names, - "document_type": "Luma Event", - "connector_type": "Luma", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - event_markdown, user_llm, document_metadata - ) - else: - summary_content = f"Luma Event: {event_name}\n\n" - if event_url: - summary_content += f"URL: {event_url}\n" - summary_content += f"Start: {start_at}\n" - summary_content += f"End: {end_at}\n" - if timezone: - summary_content += f"Timezone: {timezone}\n" - if location: - summary_content += f"Location: {location}\n" - if city: - summary_content += f"City: {city}\n" - if host_names: - summary_content += f"Hosts: {host_names}\n" - if description: - desc_preview = description[:1000] - if len(description) > 1000: - desc_preview += "..." - summary_content += f"Description: {desc_preview}\n" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(event_markdown) - - # Update existing document - existing_document.title = event_name - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "event_id": event_id, - "event_name": event_name, - "event_url": event_url, - "start_at": start_at, - "end_at": end_at, - "timezone": timezone, - "location": location, - "city": city, - "hosts": host_names, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info(f"Successfully updated Luma event {event_name}") - continue + # Queue existing document for update (will be set to processing in Phase 2) + events_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'event_id': event_id, + 'event_name': event_name, + 'event_url': event_url, + 'event_markdown': event_markdown, + 'content_hash': content_hash, + 'start_at': start_at, + 'end_at': end_at, + 'timezone': timezone, + 'location': location, + 'city': city, + 'host_names': host_names, + 'description': description, + 'cover_url': cover_url, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -400,59 +349,7 @@ async def index_luma_events( documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "event_id": event_id, - "event_name": event_name, - "event_url": event_url, - "start_at": start_at, - "end_at": end_at, - "timezone": timezone, - "location": location or "No location", - "city": city, - "hosts": host_names, - "document_type": "Luma Event", - "connector_type": "Luma", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - event_markdown, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = f"Luma Event: {event_name}\n\n" - if event_url: - summary_content += f"URL: {event_url}\n" - summary_content += f"Start: {start_at}\n" - summary_content += f"End: {end_at}\n" - if timezone: - summary_content += f"Timezone: {timezone}\n" - if location: - summary_content += f"Location: {location}\n" - if city: - summary_content += f"City: {city}\n" - if host_names: - summary_content += f"Hosts: {host_names}\n" - if description: - desc_preview = description[:1000] - if len(description) > 1000: - desc_preview += "..." - summary_content += f"Description: {desc_preview}\n" - - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(event_markdown) - + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=event_name, @@ -468,23 +365,147 @@ async def index_luma_events( "city": city, "hosts": host_names, "cover_url": cover_url, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new event {event_name}") + new_documents_created = True - # Batch commit every 10 documents + events_to_process.append({ + 'document': document, + 'is_new': True, + 'event_id': event_id, + 'event_name': event_name, + 'event_url': event_url, + 'event_markdown': event_markdown, + 'content_hash': content_hash, + 'start_at': start_at, + 'end_at': end_at, + 'timezone': timezone, + 'location': location, + 'city': city, + 'host_names': host_names, + 'description': description, + 'cover_url': cover_url, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(events_to_process)} documents") + + for item in events_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "event_id": item['event_id'], + "event_name": item['event_name'], + "event_url": item['event_url'], + "start_at": item['start_at'], + "end_at": item['end_at'], + "timezone": item['timezone'], + "location": item['location'] or "No location", + "city": item['city'], + "hosts": item['host_names'], + "document_type": "Luma Event", + "connector_type": "Luma", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item['event_markdown'], user_llm, document_metadata_for_summary + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Luma Event: {item['event_name']}\n\n" + if item['event_url']: + summary_content += f"URL: {item['event_url']}\n" + summary_content += f"Start: {item['start_at']}\n" + summary_content += f"End: {item['end_at']}\n" + if item['timezone']: + summary_content += f"Timezone: {item['timezone']}\n" + if item['location']: + summary_content += f"Location: {item['location']}\n" + if item['city']: + summary_content += f"City: {item['city']}\n" + if item['host_names']: + summary_content += f"Hosts: {item['host_names']}\n" + if item['description']: + desc_preview = item['description'][:1000] + if len(item['description']) > 1000: + desc_preview += "..." + summary_content += f"Description: {desc_preview}\n" + + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['event_markdown']) + + # Update document to READY with actual content + document.title = item['event_name'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "event_id": item['event_id'], + "event_name": item['event_name'], + "event_url": item['event_url'], + "start_at": item['start_at'], + "end_at": item['end_at'], + "timezone": item['timezone'], + "location": item['location'], + "city": item['city'], + "hosts": item['host_names'], + "cover_url": item['cover_url'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Luma events processed so far" @@ -493,38 +514,69 @@ async def index_luma_events( except Exception as e: logger.error( - f"Error processing event {event.get('name', 'Unknown')}: {e!s}", + f"Error processing event {item.get('event_name', 'Unknown')}: {e!s}", exc_info=True, ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") skipped_events.append( - f"{event.get('name', 'Unknown')} (processing error)" + f"{item.get('event_name', 'Unknown')} (processing error)" ) - documents_skipped += 1 + documents_failed += 1 continue - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info(f"Final commit: Total {documents_indexed} Luma events processed") - await session.commit() + try: + await session.commit() + logger.info("Successfully committed all Luma document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same event was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None await task_logger.log_task_success( log_entry, f"Successfully completed Luma indexing for connector {connector_id}", { - "events_processed": total_processed, + "events_processed": documents_indexed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, "skipped_events_count": len(skipped_events), }, ) logger.info( - f"Luma indexing completed: {documents_indexed} new events, {documents_skipped} skipped" + f"Luma indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed" ) - return total_processed, None + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback()