diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 54b1afd26..05a4007ae 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -1,5 +1,9 @@ """ Airtable connector indexer. + +Implements real-time document status updates using a two-phase approach: +- Phase 1: Create all documents with PENDING status (visible in UI immediately) +- Phase 2: Process each document one by one (pending → processing → ready/failed) """ import time @@ -10,7 +14,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.airtable_history import AirtableHistoryConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -27,6 +31,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -134,24 +139,30 @@ async def index_airtable_records( await task_logger.log_task_success( log_entry, success_msg, {"bases_count": 0} ) - return 0, success_msg + # CRITICAL: Update timestamp even when no bases found so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + return 0, None # Return None (not error) when no items found logger.info(f"Found {len(bases)} Airtable bases to process") # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() - total_documents_indexed = 0 - # Process each base + # Track overall statistics + documents_indexed = 0 + documents_skipped = 0 + documents_failed = 0 + duplicate_content_count = 0 + + # ======================================================================= + # PHASE 1: Collect all records and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + records_to_process = [] # List of dicts with document and record data + new_documents_created = False + for base in bases: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) - >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(total_documents_indexed) - last_heartbeat_time = time.time() base_id = base.get("id") base_name = base.get("name", "Unknown Base") @@ -201,7 +212,6 @@ async def index_airtable_records( max_records=max_records, ) ) - else: # Fetch all records records, records_error = airtable_connector.get_all_records( @@ -222,21 +232,14 @@ async def index_airtable_records( logger.info(f"Found {len(records)} records in table {table_name}") - documents_indexed = 0 - skipped_messages = [] - documents_skipped = 0 - # Process each record + # Phase 1: Analyze each record and create pending documents for record in records: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) - >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(total_documents_indexed) - last_heartbeat_time = time.time() - try: + record_id = record.get("id", "") + if not record_id: + documents_skipped += 1 + continue + # Generate markdown content markdown_content = ( airtable_connector.format_record_to_markdown( @@ -246,16 +249,11 @@ async def index_airtable_records( if not markdown_content.strip(): logger.warning( - f"Skipping message with no content: {record.get('id')}" - ) - skipped_messages.append( - f"{record.get('id')} (no content)" + f"Skipping record with no content: {record_id}" ) documents_skipped += 1 continue - record_id = record.get("id", "Unknown") - # Generate unique identifier hash for this Airtable record unique_identifier_hash = generate_unique_identifier_hash( DocumentType.AIRTABLE_CONNECTOR, @@ -278,75 +276,24 @@ async def index_airtable_records( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Airtable record {record_id} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Airtable record {record_id}. Updating document." - ) - # Generate document summary - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "record_id": record_id, - "created_time": record.get( - "CREATED_TIME()", "" - ), - "document_type": "Airtable Record", - "connector_type": "Airtable", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, - user_llm, - document_metadata, - ) - else: - summary_content = ( - f"Airtable Record: {record_id}\n\n" - ) - summary_embedding = ( - config.embedding_model_instance.embed( - summary_content - ) - ) - - # Process chunks - chunks = await create_document_chunks( - markdown_content - ) - - # Update existing document - existing_document.title = record_id - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "record_id": record_id, - "created_time": record.get( - "CREATED_TIME()", "" - ), - } - existing_document.chunks = chunks - existing_document.updated_at = ( - get_current_timestamp() - ) - - documents_indexed += 1 - logger.info( - f"Successfully updated Airtable record {record_id}" - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + records_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'record_id': record_id, + 'record': record, + 'base_name': base_name, + 'table_name': table_name, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -363,44 +310,11 @@ async def index_airtable_records( f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate document summary - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "record_id": record_id, - "created_time": record.get("CREATED_TIME()", ""), - "document_type": "Airtable Record", - "connector_type": "Airtable", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = f"Airtable Record: {record_id}\n\n" - summary_embedding = ( - config.embedding_model_instance.embed( - summary_content - ) - ) - - # Process chunks - chunks = await create_document_chunks(markdown_content) - - # Create and store new document - logger.info( - f"Creating new document for Airtable record: {record_id}" - ) + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=record_id, @@ -408,78 +322,181 @@ async def index_airtable_records( document_metadata={ "record_id": record_id, "created_time": record.get("CREATED_TIME()", ""), + "base_name": base_name, + "table_name": table_name, + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info( - f"Successfully indexed new Airtable record {summary_content}" - ) + new_documents_created = True - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Airtable records processed so far" - ) - await session.commit() + records_to_process.append({ + 'document': document, + 'is_new': True, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'record_id': record_id, + 'record': record, + 'base_name': base_name, + 'table_name': table_name, + }) except Exception as e: - logger.error( - f"Error processing the Airtable record {record.get('id', 'Unknown')}: {e!s}", - exc_info=True, - ) - skipped_messages.append( - f"{record.get('id', 'Unknown')} (processing error)" - ) - documents_skipped += 1 - continue # Skip this message and continue with others + logger.error(f"Error in Phase 1 for record: {e!s}", exc_info=True) + documents_failed += 1 + continue - # Accumulate total processed across all tables - total_processed += documents_indexed + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([r for r in records_to_process if r['is_new']])} pending documents") + await session.commit() - # Final commit for any remaining documents not yet committed in batches - if documents_indexed > 0: + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(records_to_process)} documents") + + for item in records_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "record_id": item['record_id'], + "created_time": item['record'].get("CREATED_TIME()", ""), + "document_type": "Airtable Record", + "connector_type": "Airtable", + } + summary_content, summary_embedding = await generate_document_summary( + item['markdown_content'], user_llm, document_metadata_for_summary + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Airtable Record: {item['record_id']}\n\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['markdown_content']) + + # Update document to READY with actual content + document.title = item['record_id'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "record_id": item['record_id'], + "created_time": item['record'].get("CREATED_TIME()", ""), + "base_name": item['base_name'], + "table_name": item['table_name'], + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) + if documents_indexed % 10 == 0: logger.info( - f"Final commit for table {table_name}: {documents_indexed} Airtable records processed" + f"Committing batch: {documents_indexed} Airtable records processed so far" ) await session.commit() - logger.info( - f"Successfully committed all Airtable document changes for table {table_name}" - ) - # Update the last_indexed_at timestamp for the connector only if requested - # (after all tables in all bases are processed) - if total_processed > 0: - await update_connector_last_indexed( - session, connector, update_last_indexed + except Exception as e: + logger.error(f"Error processing Airtable record: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) + + total_processed = documents_indexed + + # Final commit to ensure all documents are persisted (safety net) + logger.info(f"Final commit: Total {documents_indexed} Airtable records processed") + try: + await session.commit() + logger.info( + "Successfully committed all Airtable document changes to database" ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same record was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success after processing all bases and tables await task_logger.log_task_success( log_entry, f"Successfully completed Airtable indexing for connector {connector_id}", { - "events_processed": total_processed, - "documents_indexed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, }, ) logger.info( - f"Airtable indexing completed: {total_processed} total records processed" + f"Airtable indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) return ( total_processed, - None, - ) # Return None as the error message to indicate success + warning_message, + ) except Exception as e: logger.error( diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 8d4d7650a..37927b779 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -1,5 +1,9 @@ """ Notion connector indexer. + +Implements real-time document status updates using a two-phase approach: +- Phase 1: Create all documents with PENDING status (visible in UI immediately) +- Phase 2: Process each document one by one (pending → processing → ready/failed) """ import time @@ -9,8 +13,9 @@ from datetime import datetime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession +from app.config import config from app.connectors.notion_history import NotionHistoryConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -28,6 +33,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -214,12 +220,17 @@ async def index_notion_pages( {"pages_found": 0}, ) logger.info("No Notion pages found to index") + # CRITICAL: Update timestamp even when no pages found so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() await notion_client.close() return 0, None # Success with 0 pages, not an error # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 + duplicate_content_count = 0 skipped_pages = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -231,22 +242,69 @@ async def index_notion_pages( {"stage": "process_pages", "total_pages": len(pages)}, ) - # Process each page - for page in pages: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze all pages, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + pages_to_process = [] # List of dicts with document and page data + new_documents_created = False + # Helper function to convert page content to markdown + def process_blocks(blocks, level=0): + result = "" + for block in blocks: + block_type = block.get("type") + block_content = block.get("content", "") + children = block.get("children", []) + + # Add indentation based on level + indent = " " * level + + # Format based on block type + if block_type in ["paragraph", "text"]: + result += f"{indent}{block_content}\n\n" + elif block_type in ["heading_1", "header"]: + result += f"{indent}# {block_content}\n\n" + elif block_type == "heading_2": + result += f"{indent}## {block_content}\n\n" + elif block_type == "heading_3": + result += f"{indent}### {block_content}\n\n" + elif block_type == "bulleted_list_item": + result += f"{indent}* {block_content}\n" + elif block_type == "numbered_list_item": + result += f"{indent}1. {block_content}\n" + elif block_type == "to_do": + result += f"{indent}- [ ] {block_content}\n" + elif block_type == "toggle": + result += f"{indent}> {block_content}\n" + elif block_type == "code": + result += f"{indent}```\n{block_content}\n```\n\n" + elif block_type == "quote": + result += f"{indent}> {block_content}\n\n" + elif block_type == "callout": + result += f"{indent}> **Note:** {block_content}\n\n" + elif block_type == "image": + result += f"{indent}![Image]({block_content})\n\n" + else: + # Default for other block types + if block_content: + result += f"{indent}{block_content}\n\n" + + # Process children recursively + if children: + result += process_blocks(children, level + 1) + + return result + + for page in pages: try: page_id = page.get("page_id") page_title = page.get("title", f"Untitled page ({page_id})") page_content = page.get("content", []) - logger.info(f"Processing Notion page: {page_title} ({page_id})") + if not page_id: + documents_skipped += 1 + continue if not page_content: logger.info(f"No content found in page {page_title}. Skipping.") @@ -256,57 +314,6 @@ async def index_notion_pages( # Convert page content to markdown format markdown_content = f"# Notion Page: {page_title}\n\n" - - # Process blocks recursively - def process_blocks(blocks, level=0): - result = "" - for block in blocks: - block_type = block.get("type") - block_content = block.get("content", "") - children = block.get("children", []) - - # Add indentation based on level - indent = " " * level - - # Format based on block type - if block_type in ["paragraph", "text"]: - result += f"{indent}{block_content}\n\n" - elif block_type in ["heading_1", "header"]: - result += f"{indent}# {block_content}\n\n" - elif block_type == "heading_2": - result += f"{indent}## {block_content}\n\n" - elif block_type == "heading_3": - result += f"{indent}### {block_content}\n\n" - elif block_type == "bulleted_list_item": - result += f"{indent}* {block_content}\n" - elif block_type == "numbered_list_item": - result += f"{indent}1. {block_content}\n" - elif block_type == "to_do": - result += f"{indent}- [ ] {block_content}\n" - elif block_type == "toggle": - result += f"{indent}> {block_content}\n" - elif block_type == "code": - result += f"{indent}```\n{block_content}\n```\n\n" - elif block_type == "quote": - result += f"{indent}> {block_content}\n\n" - elif block_type == "callout": - result += f"{indent}> **Note:** {block_content}\n\n" - elif block_type == "image": - result += f"{indent}![Image]({block_content})\n\n" - else: - # Default for other block types - if block_content: - result += f"{indent}{block_content}\n\n" - - # Process children recursively - if children: - result += process_blocks(children, level + 1) - - return result - - logger.debug( - f"Converting {len(page_content)} blocks to markdown for page {page_title}" - ) markdown_content += process_blocks(page_content) # Format document metadata @@ -346,71 +353,22 @@ async def index_notion_pages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Notion page {page_title} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Notion page {page_title}. Updating document." - ) - # Get user's long context LLM - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - if not user_llm: - logger.error( - f"No long context LLM configured for user {user_id}" - ) - skipped_pages.append(f"{page_title} (no LLM configured)") - documents_skipped += 1 - continue - - # Generate summary with metadata - document_metadata = { - "page_title": page_title, - "page_id": page_id, - "document_type": "Notion Page", - "connector_type": "Notion", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - - # Process chunks - chunks = await create_document_chunks(markdown_content) - - # Update existing document - existing_document.title = page_title - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "page_title": page_title, - "page_id": page_id, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - existing_document.connector_id = connector_id - - documents_indexed += 1 - logger.info(f"Successfully updated Notion page: {page_title}") - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} documents processed so far" - ) - await session.commit() - - continue + # Queue existing document for update (will be set to processing in Phase 2) + pages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'page_id': page_id, + 'page_title': page_title, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -425,37 +383,11 @@ async def index_notion_pages( f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Get user's long context LLM - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - if not user_llm: - logger.error(f"No long context LLM configured for user {user_id}") - skipped_pages.append(f"{page_title} (no LLM configured)") - documents_skipped += 1 - continue - - # Generate summary with metadata - logger.debug(f"Generating summary for page {page_title}") - document_metadata = { - "page_title": page_title, - "page_id": page_id, - "document_type": "Notion Page", - "connector_type": "Notion", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - - # Process chunks - logger.debug(f"Chunking content for page {page_title}") - chunks = await create_document_chunks(markdown_content) - - # Create and store new document + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=page_title, @@ -463,53 +395,159 @@ async def index_notion_pages( document_metadata={ "page_title": page_title, "page_id": page_id, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new Notion page: {page_title}") + new_documents_created = True - # Batch commit every 10 documents + pages_to_process.append({ + 'document': document, + 'is_new': True, + 'markdown_content': markdown_content, + 'content_hash': content_hash, + 'page_id': page_id, + 'page_title': page_title, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(pages_to_process)} documents") + + for item in pages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "page_title": item['page_title'], + "page_id": item['page_id'], + "document_type": "Notion Page", + "connector_type": "Notion", + } + summary_content, summary_embedding = await generate_document_summary( + item['markdown_content'], user_llm, document_metadata_for_summary + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Notion Page: {item['page_title']}\n\n{item['markdown_content'][:500]}..." + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['markdown_content']) + + # Update document to READY with actual content + document.title = item['page_title'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "page_title": item['page_title'], + "page_id": item['page_id'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( - f"Committing batch: {documents_indexed} documents processed so far" + f"Committing batch: {documents_indexed} Notion pages processed so far" ) await session.commit() except Exception as e: - logger.error( - f"Error processing Notion page {page.get('title', 'Unknown')}: {e!s}", - exc_info=True, - ) - skipped_pages.append( - f"{page.get('title', 'Unknown')} (processing error)" - ) - documents_skipped += 1 - continue # Skip this page and continue with others + logger.error(f"Error processing Notion page: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + skipped_pages.append(f"{item['page_title']} (processing error)") + documents_failed += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) - # Update the last_indexed_at timestamp for the connector only if requested - # and if we successfully indexed at least one page total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit for any remaining documents not yet committed in batches + # Final commit to ensure all documents are persisted (safety net) logger.info(f"Final commit: Total {documents_indexed} documents processed") - await session.commit() + try: + await session.commit() + logger.info( + "Successfully committed all Notion document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same page was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise # Get final count of pages with skipped Notion AI content pages_with_skipped_ai_content = notion_client.get_skipped_content_count() + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None + # Prepare result message with user-friendly notification about skipped content result_message = None if skipped_pages: @@ -532,6 +570,8 @@ async def index_notion_pages( "pages_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, "skipped_pages_count": len(skipped_pages), "pages_with_skipped_ai_content": pages_with_skipped_ai_content, "result_message": result_message, @@ -539,7 +579,9 @@ async def index_notion_pages( ) logger.info( - f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped" + f"Notion indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) # Clean up the async client @@ -559,6 +601,10 @@ async def index_notion_pages( "Using legacy token. Reconnect with OAuth for better reliability." ) + # Include warning message if there were issues + if warning_message: + notification_parts.append(warning_message) + user_notification_message = ( " ".join(notification_parts) if notification_parts else None )