diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 74b4cc23d..7fd842996 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -1,5 +1,9 @@ """ Confluence connector indexer. + +Provides real-time document status updates during indexing using a two-phase approach: +- Phase 1: Create all documents with PENDING status (visible in UI immediately) +- Phase 2: Process each document one by one (PENDING → PROCESSING → READY/FAILED) """ import contextlib @@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.confluence_history import ConfluenceHistoryConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -29,6 +33,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -180,22 +185,22 @@ async def index_confluence_pages( await confluence_client.close() return 0, f"Error fetching Confluence pages: {e!s}" - # Process and index each page + # ======================================================================= + # PHASE 1: Analyze all pages, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= documents_indexed = 0 - skipped_pages = [] documents_skipped = 0 + documents_failed = 0 + duplicate_content_count = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + pages_to_process = [] # List of dicts with document and page data + new_documents_created = False + for page in pages: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() try: page_id = page.get("id") page_title = page.get("title", "") @@ -205,7 +210,6 @@ async def index_confluence_pages( logger.warning( f"Skipping page with missing ID or title: {page_id or 'Unknown'}" ) - skipped_pages.append(f"{page_title or 'Unknown'} (missing data)") documents_skipped += 1 continue @@ -236,7 +240,6 @@ async def index_confluence_pages( if not full_content.strip(): logger.warning(f"Skipping page with no content: {page_title}") - skipped_pages.append(f"{page_title} (no content)") documents_skipped += 1 continue @@ -258,74 +261,25 @@ async def index_confluence_pages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Confluence page {page_title} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Confluence page {page_title}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "page_title": page_title, - "page_id": page_id, - "space_id": space_id, - "comment_count": comment_count, - "document_type": "Confluence Page", - "connector_type": "Confluence", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - full_content, user_llm, document_metadata - ) - else: - summary_content = f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n" - if page_content: - content_preview = page_content[:1000] - if len(page_content) > 1000: - content_preview += "..." - summary_content += ( - f"Content Preview: {content_preview}\n\n" - ) - summary_content += f"Comments: {comment_count}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(full_content) - - # Update existing document - existing_document.title = page_title - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "page_id": page_id, - "page_title": page_title, - "space_id": space_id, - "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info( - f"Successfully updated Confluence page {page_title}" - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + pages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'full_content': full_content, + 'page_content': page_content, + 'content_hash': content_hash, + 'page_id': page_id, + 'page_title': page_title, + 'space_id': space_id, + 'comment_count': comment_count, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -340,51 +294,11 @@ async def index_confluence_pages( f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "page_title": page_title, - "page_id": page_id, - "space_id": space_id, - "comment_count": comment_count, - "document_type": "Confluence Page", - "connector_type": "Confluence", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - full_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = ( - f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n" - ) - if page_content: - # Take first 500 characters of content for summary - content_preview = page_content[:1000] - if len(page_content) > 1000: - content_preview += "..." - summary_content += f"Content Preview: {content_preview}\n\n" - summary_content += f"Comments: {comment_count}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - using the full page content with comments - chunks = await create_document_chunks(full_content) - - # Create and store new document - logger.info(f"Creating new document for page {page_title}") + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=page_title, @@ -394,23 +308,122 @@ async def index_confluence_pages( "page_title": page_title, "space_id": space_id, "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new page {page_title}") + new_documents_created = True - # Batch commit every 10 documents + pages_to_process.append({ + 'document': document, + 'is_new': True, + 'full_content': full_content, + 'page_content': page_content, + 'content_hash': content_hash, + 'page_id': page_id, + 'page_title': page_title, + 'space_id': space_id, + 'comment_count': comment_count, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(pages_to_process)} documents") + + for item in pages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "page_title": item['page_title'], + "page_id": item['page_id'], + "space_id": item['space_id'], + "comment_count": item['comment_count'], + "document_type": "Confluence Page", + "connector_type": "Confluence", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item['full_content'], user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = ( + f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n" + ) + if item['page_content']: + # Take first 1000 characters of content for summary + content_preview = item['page_content'][:1000] + if len(item['page_content']) > 1000: + content_preview += "..." + summary_content += f"Content Preview: {content_preview}\n\n" + summary_content += f"Comments: {item['comment_count']}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks - using the full page content with comments + chunks = await create_document_chunks(item['full_content']) + + # Update document to READY with actual content + document.title = item['page_title'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "page_id": item['page_id'], + "page_title": item['page_title'], + "space_id": item['space_id'], + "comment_count": item['comment_count'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Confluence pages processed so far" @@ -419,53 +432,78 @@ async def index_confluence_pages( except Exception as e: logger.error( - f"Error processing page {page.get('title', 'Unknown')}: {e!s}", + f"Error processing page {item.get('page_title', 'Unknown')}: {e!s}", exc_info=True, ) - skipped_pages.append( - f"{page.get('title', 'Unknown')} (processing error)" - ) - documents_skipped += 1 + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 continue # Skip this page and continue with others - # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed - if update_last_indexed: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit for any remaining documents not yet committed in batches + # Final commit to ensure all documents are persisted (safety net) logger.info( f"Final commit: Total {documents_indexed} Confluence pages processed" ) - await session.commit() - logger.info( - "Successfully committed all Confluence document changes to database" - ) + try: + await session.commit() + logger.info( + "Successfully committed all Confluence document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same page was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Confluence indexing for connector {connector_id}", { - "pages_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, - "skipped_pages_count": len(skipped_pages), + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, }, ) logger.info( - f"Confluence indexing completed: {documents_indexed} new pages, {documents_skipped} skipped" + f"Confluence indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) # Close the client connection if confluence_client: await confluence_client.close() - return ( - total_processed, - None, - ) # Return None as the error message to indicate success + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 508834b4f..038df0f46 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -1,5 +1,9 @@ """ Jira connector indexer. + +Provides real-time document status updates during indexing using a two-phase approach: +- Phase 1: Create all documents with PENDING status (visible in UI immediately) +- Phase 2: Process each document one by one (PENDING → PROCESSING → READY/FAILED) """ import contextlib @@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.jira_history import JiraHistoryConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -29,6 +33,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -174,22 +179,22 @@ async def index_jira_issues( logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True) return 0, f"Error fetching Jira issues: {e!s}" - # Process and index each issue + # ======================================================================= + # PHASE 1: Analyze all issues, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= documents_indexed = 0 - skipped_issues = [] documents_skipped = 0 + documents_failed = 0 + duplicate_content_count = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + issues_to_process = [] # List of dicts with document and issue data + new_documents_created = False + for issue in issues: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() try: issue_id = issue.get("key") issue_identifier = issue.get("key", "") @@ -199,9 +204,6 @@ async def index_jira_issues( logger.warning( f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}" ) - skipped_issues.append( - f"{issue_identifier or 'Unknown'} (missing data)" - ) documents_skipped += 1 continue @@ -215,7 +217,6 @@ async def index_jira_issues( logger.warning( f"Skipping issue with no content: {issue_identifier} - {issue_title}" ) - skipped_issues.append(f"{issue_identifier} (no content)") documents_skipped += 1 continue @@ -237,71 +238,25 @@ async def index_jira_issues( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Jira issue {issue_identifier} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Jira issue {issue_identifier}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "issue_key": issue_identifier, - "issue_title": issue_title, - "status": formatted_issue.get("status", "Unknown"), - "priority": formatted_issue.get("priority", "Unknown"), - "comment_count": comment_count, - "document_type": "Jira Issue", - "connector_type": "Jira", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - issue_content, user_llm, document_metadata - ) - else: - summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n" - if formatted_issue.get("description"): - summary_content += f"Description: {formatted_issue.get('description')}\n\n" - summary_content += f"Comments: {comment_count}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(issue_content) - - # Update existing document - existing_document.title = f"{issue_identifier}: {issue_title}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": formatted_issue.get("status", "Unknown"), - "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info( - f"Successfully updated Jira issue {issue_identifier}" - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + issues_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'issue_content': issue_content, + 'content_hash': content_hash, + 'issue_id': issue_id, + 'issue_identifier': issue_identifier, + 'issue_title': issue_title, + 'formatted_issue': formatted_issue, + 'comment_count': comment_count, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -316,50 +271,11 @@ async def index_jira_issues( f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "issue_key": issue_identifier, - "issue_title": issue_title, - "status": formatted_issue.get("status", "Unknown"), - "priority": formatted_issue.get("priority", "Unknown"), - "comment_count": comment_count, - "document_type": "Jira Issue", - "connector_type": "Jira", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - issue_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n" - if formatted_issue.get("description"): - summary_content += ( - f"Description: {formatted_issue.get('description')}\n\n" - ) - summary_content += f"Comments: {comment_count}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - using the full issue content with comments - chunks = await create_document_chunks(issue_content) - - # Create and store new document - logger.info( - f"Creating new document for issue {issue_identifier} - {issue_title}" - ) + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=f"{issue_identifier}: {issue_title}", @@ -370,25 +286,120 @@ async def index_jira_issues( "issue_title": issue_title, "state": formatted_issue.get("status", "Unknown"), "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info( - f"Successfully indexed new issue {issue_identifier} - {issue_title}" + new_documents_created = True + + issues_to_process.append({ + 'document': document, + 'is_new': True, + 'issue_content': issue_content, + 'content_hash': content_hash, + 'issue_id': issue_id, + 'issue_identifier': issue_identifier, + 'issue_title': issue_title, + 'formatted_issue': formatted_issue, + 'comment_count': comment_count, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(issues_to_process)} documents") + + for item in issues_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id ) - # Batch commit every 10 documents + if user_llm: + document_metadata = { + "issue_key": item['issue_identifier'], + "issue_title": item['issue_title'], + "status": item['formatted_issue'].get("status", "Unknown"), + "priority": item['formatted_issue'].get("priority", "Unknown"), + "comment_count": item['comment_count'], + "document_type": "Jira Issue", + "connector_type": "Jira", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item['issue_content'], user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Jira Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['formatted_issue'].get('status', 'Unknown')}\n\n" + if item['formatted_issue'].get("description"): + summary_content += ( + f"Description: {item['formatted_issue'].get('description')}\n\n" + ) + summary_content += f"Comments: {item['comment_count']}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks - using the full issue content with comments + chunks = await create_document_chunks(item['issue_content']) + + # Update document to READY with actual content + document.title = f"{item['issue_identifier']}: {item['issue_title']}" + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "issue_id": item['issue_id'], + "issue_identifier": item['issue_identifier'], + "issue_title": item['issue_title'], + "state": item['formatted_issue'].get("status", "Unknown"), + "comment_count": item['comment_count'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Jira issues processed so far" @@ -397,48 +408,73 @@ async def index_jira_issues( except Exception as e: logger.error( - f"Error processing issue {issue.get('identifier', 'Unknown')}: {e!s}", + f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}", exc_info=True, ) - skipped_issues.append( - f"{issue.get('identifier', 'Unknown')} (processing error)" - ) - documents_skipped += 1 + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 continue # Skip this issue and continue with others - # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed - if update_last_indexed: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit for any remaining documents not yet committed in batches + # Final commit to ensure all documents are persisted (safety net) logger.info(f"Final commit: Total {documents_indexed} Jira issues processed") - await session.commit() - logger.info("Successfully committed all JIRA document changes to database") + try: + await session.commit() + logger.info("Successfully committed all JIRA document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same issue was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed JIRA indexing for connector {connector_id}", { - "issues_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, - "skipped_issues_count": len(skipped_issues), + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, }, ) logger.info( - f"JIRA indexing completed: {documents_indexed} new issues, {documents_skipped} skipped" + f"JIRA indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) # Clean up the connector await jira_client.close() - return ( - total_processed, - None, - ) # Return None as the error message to indicate success + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback()