diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index c28f151ca..45e1e357a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -1,5 +1,9 @@ """ Linear connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time @@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.linear_connector import LinearConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -28,6 +32,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -196,6 +201,7 @@ async def index_linear_issues( # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 # Track issues that failed processing skipped_issues = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -207,16 +213,14 @@ async def index_linear_issues( {"stage": "process_issues", "total_issues": len(issues)}, ) - # Process each issue - for issue in issues: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze all issues, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + issues_to_process = [] # List of dicts with document and issue data + new_documents_created = False + for issue in issues: try: issue_id = issue.get("id", "") issue_identifier = issue.get("identifier", "") @@ -262,78 +266,35 @@ async def index_linear_issues( state = formatted_issue.get("state", "Unknown") description = formatted_issue.get("description", "") comment_count = len(formatted_issue.get("comments", [])) + priority = formatted_issue.get("priority", "Unknown") if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() logger.info( f"Document for Linear issue {issue_identifier} unchanged. Skipping." ) documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Linear issue {issue_identifier}. Updating document." - ) - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "issue_id": issue_identifier, - "issue_title": issue_title, - "state": state, - "priority": formatted_issue.get("priority", "Unknown"), - "comment_count": comment_count, - "document_type": "Linear Issue", - "connector_type": "Linear", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - issue_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - if description and len(description) > 1000: - description = description[:997] + "..." - summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n" - if description: - summary_content += f"Description: {description}\n\n" - summary_content += f"Comments: {comment_count}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(issue_content) - - # Update existing document - existing_document.title = f"{issue_identifier}: {issue_title}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info( - f"Successfully updated Linear issue {issue_identifier}" - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + issues_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'issue_content': issue_content, + 'content_hash': content_hash, + 'issue_id': issue_id, + 'issue_identifier': issue_identifier, + 'issue_title': issue_title, + 'state': state, + 'description': description, + 'comment_count': comment_count, + 'priority': priority, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -351,48 +312,7 @@ async def index_linear_issues( documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "issue_id": issue_identifier, - "issue_title": issue_title, - "state": state, - "priority": formatted_issue.get("priority", "Unknown"), - "comment_count": comment_count, - "document_type": "Linear Issue", - "connector_type": "Linear", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - issue_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - # Truncate description if it's too long for the summary - if description and len(description) > 1000: - description = description[:997] + "..." - summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n" - if description: - summary_content += f"Description: {description}\n\n" - summary_content += f"Comments: {comment_count}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - using the full issue content with comments - chunks = await create_document_chunks(issue_content) - - # Create and store new document - logger.info( - f"Creating new document for issue {issue_identifier} - {issue_title}" - ) + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=f"{issue_identifier}: {issue_title}", @@ -403,25 +323,119 @@ async def index_linear_issues( "issue_title": issue_title, "state": state, "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info( - f"Successfully indexed new issue {issue_identifier} - {issue_title}" + new_documents_created = True + + issues_to_process.append({ + 'document': document, + 'is_new': True, + 'issue_content': issue_content, + 'content_hash': content_hash, + 'issue_id': issue_id, + 'issue_identifier': issue_identifier, + 'issue_title': issue_title, + 'state': state, + 'description': description, + 'comment_count': comment_count, + 'priority': priority, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(issues_to_process)} documents") + + for item in issues_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id ) - # Batch commit every 10 documents + if user_llm: + document_metadata_for_summary = { + "issue_id": item['issue_identifier'], + "issue_title": item['issue_title'], + "state": item['state'], + "priority": item['priority'], + "comment_count": item['comment_count'], + "document_type": "Linear Issue", + "connector_type": "Linear", + } + summary_content, summary_embedding = await generate_document_summary( + item['issue_content'], user_llm, document_metadata_for_summary + ) + else: + # Fallback to simple summary if no LLM configured + description = item['description'] + if description and len(description) > 1000: + description = description[:997] + "..." + summary_content = f"Linear Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['state']}\n\n" + if description: + summary_content += f"Description: {description}\n\n" + summary_content += f"Comments: {item['comment_count']}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(item['issue_content']) + + # Update document to READY with actual content + document.title = f"{item['issue_identifier']}: {item['issue_title']}" + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "issue_id": item['issue_id'], + "issue_identifier": item['issue_identifier'], + "issue_title": item['issue_title'], + "state": item['state'], + "comment_count": item['comment_count'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Linear issues processed so far" @@ -430,44 +444,68 @@ async def index_linear_issues( except Exception as e: logger.error( - f"Error processing issue {issue.get('identifier', 'Unknown')}: {e!s}", + f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}", exc_info=True, ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") skipped_issues.append( - f"{issue.get('identifier', 'Unknown')} (processing error)" + f"{item.get('issue_identifier', 'Unknown')} (processing error)" ) - documents_skipped += 1 - continue # Skip this issue and continue with others + documents_failed += 1 + continue - # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed - if update_last_indexed: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info(f"Final commit: Total {documents_indexed} Linear issues processed") - await session.commit() - logger.info("Successfully committed all Linear document changes to database") + try: + await session.commit() + logger.info("Successfully committed all Linear document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same issue was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Linear indexing for connector {connector_id}", { - "issues_processed": total_processed, + "issues_processed": documents_indexed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, "skipped_issues_count": len(skipped_issues), }, ) logger.info( - f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped" + f"Linear indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed" ) - return ( - total_processed, - None, - ) # Return None as the error message to indicate success + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 010d1eff4..61faa39b3 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -1,5 +1,9 @@ """ Slack connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time @@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.slack_history import SlackHistory -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, @@ -28,6 +32,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -168,11 +173,15 @@ async def index_slack_messages( f"No Slack channels found for connector {connector_id}", {"channels_found": 0}, ) - return 0, "No Slack channels found" + # CRITICAL: Update timestamp even when no channels found so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + return 0, None # Return None (not error) when no channels found # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 # Track messages that failed processing skipped_channels = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -184,15 +193,14 @@ async def index_slack_messages( {"stage": "process_channels", "total_channels": len(channels)}, ) - # Process each channel + # ======================================================================= + # PHASE 1: Collect all messages from all channels, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + messages_to_process = [] # List of dicts with document and message data + new_documents_created = False + for channel_obj in channels: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() channel_id = channel_obj["id"] channel_name = channel_obj["name"] is_private = channel_obj["is_private"] @@ -305,47 +313,29 @@ async def index_slack_messages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() logger.info( f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping." ) documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Slack message {msg_ts} in channel {channel_name}. Updating document." - ) - # Update chunks and embedding - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = config.embedding_model_instance.embed( - combined_document_string - ) - - # Update existing document - existing_document.content = combined_document_string - existing_document.content_hash = content_hash - existing_document.embedding = doc_embedding - existing_document.document_metadata = { - "channel_name": channel_name, - "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(formatted_messages), - "indexed_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - } - - # Delete old chunks and add new ones - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info(f"Successfully updated Slack message {msg_ts}") - continue + # Queue existing document for update (will be set to processing in Phase 2) + messages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'combined_document_string': combined_document_string, + 'content_hash': content_hash, + 'channel_name': channel_name, + 'channel_id': channel_id, + 'msg_ts': msg_ts, + 'start_date': start_date_str, + 'end_date': end_date_str, + 'message_count': len(formatted_messages), + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -363,14 +353,7 @@ async def index_slack_messages( documents_skipped += 1 continue - # Document doesn't exist - create new one - # Process chunks - chunks = await create_document_chunks(combined_document_string) - doc_embedding = config.embedding_model_instance.embed( - combined_document_string - ) - - # Create and store new document + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=channel_name, @@ -378,33 +361,37 @@ async def index_slack_messages( document_metadata={ "channel_name": channel_name, "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(formatted_messages), - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "msg_ts": msg_ts, + "connector_id": connector_id, }, - content=combined_document_string, - embedding=doc_embedding, - chunks=chunks, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 + new_documents_created = True - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Slack channels processed so far" - ) - await session.commit() + messages_to_process.append({ + 'document': document, + 'is_new': True, + 'combined_document_string': combined_document_string, + 'content_hash': content_hash, + 'channel_name': channel_name, + 'channel_id': channel_id, + 'msg_ts': msg_ts, + 'start_date': start_date_str, + 'end_date': end_date_str, + 'message_count': len(formatted_messages), + }) logger.info( - f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages" + f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}" ) except SlackApiError as slack_error: @@ -420,43 +407,125 @@ async def index_slack_messages( documents_skipped += 1 continue # Skip this channel and continue with others - # Update the last_indexed_at timestamp for the connector only if requested - # and if we successfully indexed at least one channel - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + + for item in messages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (embeddings, chunks) + chunks = await create_document_chunks(item['combined_document_string']) + doc_embedding = config.embedding_model_instance.embed( + item['combined_document_string'] + ) + + # Update document to READY with actual content + document.title = item['channel_name'] + document.content = item['combined_document_string'] + document.content_hash = item['content_hash'] + document.embedding = doc_embedding + document.document_metadata = { + "channel_name": item['channel_name'], + "channel_id": item['channel_id'], + "start_date": item['start_date'], + "end_date": item['end_date'], + "message_count": item['message_count'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Slack messages processed so far" + ) + await session.commit() + + except Exception as e: + logger.error( + f"Error processing Slack message {item.get('msg_ts', 'Unknown')}: {e!s}", + exc_info=True, + ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches - logger.info(f"Final commit: Total {documents_indexed} Slack channels processed") - await session.commit() + logger.info(f"Final commit: Total {documents_indexed} Slack messages processed") + try: + await session.commit() + logger.info("Successfully committed all Slack document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same message was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise - # Prepare result message - result_message = None - if skipped_channels: - result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" - else: - result_message = f"Processed {total_processed} channels." + # Build warning message if there were issues + warning_parts = [] + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Slack indexing for connector {connector_id}", { - "channels_processed": total_processed, + "channels_processed": len(channels), "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, "skipped_channels_count": len(skipped_channels), - "result_message": result_message, }, ) logger.info( - f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped" + f"Slack indexing completed: {documents_indexed} ready, " + f"{documents_skipped} skipped, {documents_failed} failed" ) - return ( - total_processed, - None, - ) # Return None on success (result_message is for logging only) + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback()