diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 3b46b6437..8f447b842 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -1,49 +1,74 @@ -""" -Confluence connector indexer. - -Provides real-time document status updates during indexing using a two-phase approach: -- Phase 1: Create all documents with PENDING status (visible in UI immediately) -- Phase 2: Process each document one by one (PENDING → PROCESSING → READY/FAILED) -""" +"""Confluence connector indexer using the unified parallel indexing pipeline.""" import contextlib -import time from collections.abc import Awaitable, Callable -from datetime import datetime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.confluence_history import ConfluenceHistoryConnector -from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.db import DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_content_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from .base import ( calculate_date_range, - check_document_by_unique_identifier, check_duplicate_document_by_hash, get_connector_by_id, - get_current_timestamp, logger, - safe_set_chunks, update_connector_last_indexed, ) -# Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds HEARTBEAT_INTERVAL_SECONDS = 30 +def _build_connector_doc( + page: dict, + full_content: str, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Map a raw Confluence page dict to a ConnectorDocument.""" + page_id = page.get("id", "") + page_title = page.get("title", "") + space_id = page.get("spaceId", "") + comment_count = len(page.get("comments", [])) + + metadata = { + "page_id": page_id, + "page_title": page_title, + "space_id": space_id, + "comment_count": comment_count, + "connector_id": connector_id, + "document_type": "Confluence Page", + "connector_type": "Confluence", + } + + fallback_summary = ( + f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n{full_content}" + ) + + return ConnectorDocument( + title=page_title, + source_markdown=full_content, + unique_id=page_id, + document_type=DocumentType.CONFLUENCE_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_confluence_pages( session: AsyncSession, connector_id: int, @@ -53,26 +78,9 @@ async def index_confluence_pages( end_date: str | None = None, update_last_indexed: bool = True, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str | None]: - """ - Index Confluence pages and comments. - - Args: - session: Database session - connector_id: ID of the Confluence connector - search_space_id: ID of the search space to store documents in - user_id: User ID - start_date: Start date for indexing (YYYY-MM-DD format) - end_date: End date for indexing (YYYY-MM-DD format) - update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - on_heartbeat_callback: Optional callback to update notification during long-running indexing. - - Returns: - Tuple containing (number of documents indexed, error message or None) - """ +) -> tuple[int, int, str | None]: + """Index Confluence pages and comments.""" task_logger = TaskLoggingService(session, search_space_id) - - # Log task start log_entry = await task_logger.log_task_start( task_name="confluence_pages_indexing", source="connector_indexing_task", @@ -86,7 +94,6 @@ async def index_confluence_pages( ) try: - # Get the connector from the database connector = await get_connector_by_id( session, connector_id, SearchSourceConnectorType.CONFLUENCE_CONNECTOR ) @@ -98,9 +105,8 @@ async def index_confluence_pages( "Connector not found", {"error_type": "ConnectorNotFound"}, ) - return 0, f"Connector with ID {connector_id} not found" + return 0, 0, f"Connector with ID {connector_id} not found" - # Initialize Confluence OAuth client await task_logger.log_task_progress( log_entry, f"Initializing Confluence OAuth client for connector {connector_id}", @@ -114,7 +120,6 @@ async def index_confluence_pages( ) ) - # Calculate date range start_date_str, end_date_str = calculate_date_range( connector, start_date, end_date, default_days_back=365 ) @@ -129,19 +134,14 @@ async def index_confluence_pages( }, ) - # Get pages within date range try: pages, error = await confluence_client.get_pages_by_date_range( start_date=start_date_str, end_date=end_date_str, include_comments=True ) if error: - # Don't treat "No pages found" as an error that should stop indexing if "No pages found" in error: logger.info(f"No Confluence pages found: {error}") - logger.info( - "No pages found is not a critical error, continuing with update" - ) if update_last_indexed: await update_connector_last_indexed( session, connector, update_last_indexed @@ -156,11 +156,10 @@ async def index_confluence_pages( f"No Confluence pages found in date range {start_date_str} to {end_date_str}", {"pages_found": 0}, ) - # Close client before returning if confluence_client: with contextlib.suppress(Exception): await confluence_client.close() - return 0, None + return 0, 0, None else: logger.error(f"Failed to get Confluence pages: {error}") await task_logger.log_task_failure( @@ -169,36 +168,35 @@ async def index_confluence_pages( "API Error", {"error_type": "APIError"}, ) - # Close client on error if confluence_client: with contextlib.suppress(Exception): await confluence_client.close() - return 0, f"Failed to get Confluence pages: {error}" + return 0, 0, f"Failed to get Confluence pages: {error}" logger.info(f"Retrieved {len(pages)} pages from Confluence API") except Exception as e: logger.error(f"Error fetching Confluence pages: {e!s}", exc_info=True) - # Close client on error if confluence_client: with contextlib.suppress(Exception): await confluence_client.close() - return 0, f"Error fetching Confluence pages: {e!s}" + return 0, 0, f"Error fetching Confluence pages: {e!s}" + + if not pages: + logger.info("No Confluence pages found for the specified date range") + if update_last_indexed: + await update_connector_last_indexed( + session, connector, update_last_indexed + ) + await session.commit() + if confluence_client: + with contextlib.suppress(Exception): + await confluence_client.close() + return 0, 0, None - # ======================================================================= - # PHASE 1: Analyze all pages, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - documents_indexed = 0 documents_skipped = 0 - documents_failed = 0 duplicate_content_count = 0 - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() - - pages_to_process = [] # List of dicts with document and page data - new_documents_created = False + connector_docs: list[ConnectorDocument] = [] for page in pages: try: @@ -213,12 +211,10 @@ async def index_confluence_pages( documents_skipped += 1 continue - # Extract page content page_content = "" if page.get("body") and page["body"].get("storage"): page_content = page["body"]["storage"].get("value", "") - # Add comments to content comments = page.get("comments", []) comments_content = "" if comments: @@ -235,61 +231,25 @@ async def index_confluence_pages( comments_content += f"**Comment by {comment_author}** ({comment_date}):\n{comment_body}\n\n" - # Combine page content with comments full_content = f"# {page_title}\n\n{page_content}{comments_content}" - if not full_content.strip(): + if not page_content.strip() and not comments: logger.warning(f"Skipping page with no content: {page_title}") documents_skipped += 1 continue - # Generate unique identifier hash for this Confluence page - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.CONFLUENCE_CONNECTOR, page_id, search_space_id + doc = _build_connector_doc( + page, + full_content, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector.enable_summary, ) - # Generate content hash - content_hash = generate_content_hash(full_content, search_space_id) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - comment_count = len(comments) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - pages_to_process.append( - { - "document": existing_document, - "is_new": False, - "full_content": full_content, - "page_content": page_content, - "content_hash": content_hash, - "page_id": page_id, - "page_title": page_title, - "space_id": space_id, - "comment_count": comment_count, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash + session, compute_content_hash(doc) ) if duplicate_by_content: @@ -302,151 +262,29 @@ async def index_confluence_pages( documents_skipped += 1 continue - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=page_title, - document_type=DocumentType.CONFLUENCE_CONNECTOR, - document_metadata={ - "page_id": page_id, - "page_title": page_title, - "space_id": space_id, - "comment_count": comment_count, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - pages_to_process.append( - { - "document": document, - "is_new": True, - "full_content": full_content, - "page_content": page_content, - "content_hash": content_hash, - "page_id": page_id, - "page_title": page_title, - "space_id": space_id, - "comment_count": comment_count, - } - ) + connector_docs.append(doc) except Exception as e: - logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) - documents_failed += 1 + logger.error(f"Error building ConnectorDocument for page: {e!s}", exc_info=True) + documents_skipped += 1 continue - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents" - ) - await session.commit() + pipeline = IndexingPipelineService(session) + await pipeline.migrate_legacy_docs(connector_docs) - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(pages_to_process)} documents") + async def _get_llm(s: AsyncSession): + return await get_user_long_context_llm(s, user_id, search_space_id) - for item in pages_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time + _, documents_indexed, documents_failed = await pipeline.index_batch_parallel( + connector_docs, + _get_llm, + max_concurrency=3, + on_heartbeat=on_heartbeat_callback, + heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, + ) - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and connector.enable_summary: - document_metadata = { - "page_title": item["page_title"], - "page_id": item["page_id"], - "space_id": item["space_id"], - "comment_count": item["comment_count"], - "document_type": "Confluence Page", - "connector_type": "Confluence", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["full_content"], user_llm, document_metadata - ) - else: - summary_content = f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n{item['full_content']}" - summary_embedding = embed_text(summary_content) - - # Process chunks - using the full page content with comments - chunks = await create_document_chunks(item["full_content"]) - - # Update document to READY with actual content - document.title = item["page_title"] - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "page_id": item["page_id"], - "page_title": item["page_title"], - "space_id": item["space_id"], - "comment_count": item["comment_count"], - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "connector_id": connector_id, - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents (for ready status updates) - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Confluence pages processed so far" - ) - await session.commit() - - except Exception as e: - logger.error( - f"Error processing page {item.get('page_title', 'Unknown')}: {e!s}", - exc_info=True, - ) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - documents_failed += 1 - continue # Skip this page and continue with others - - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit to ensure all documents are persisted (safety net) logger.info( f"Final commit: Total {documents_indexed} Confluence pages processed" ) @@ -456,7 +294,6 @@ async def index_confluence_pages( "Successfully committed all Confluence document changes to database" ) except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower() @@ -467,11 +304,9 @@ async def index_confluence_pages( f"Rolling back and continuing. Error: {e!s}" ) await session.rollback() - # Don't fail the entire task - some documents may have been successfully indexed else: raise - # Build warning message if there were issues warning_parts = [] if duplicate_content_count > 0: warning_parts.append(f"{duplicate_content_count} duplicate") @@ -479,7 +314,6 @@ async def index_confluence_pages( warning_parts.append(f"{documents_failed} failed") warning_message = ", ".join(warning_parts) if warning_parts else None - # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Confluence indexing for connector {connector_id}", @@ -490,22 +324,19 @@ async def index_confluence_pages( "duplicate_content_count": duplicate_content_count, }, ) - logger.info( f"Confluence indexing completed: {documents_indexed} ready, " f"{documents_skipped} skipped, {documents_failed} failed " f"({duplicate_content_count} duplicate content)" ) - # Close the client connection if confluence_client: await confluence_client.close() - return documents_indexed, warning_message + return documents_indexed, documents_skipped, warning_message except SQLAlchemyError as db_error: await session.rollback() - # Close client if it exists if confluence_client: with contextlib.suppress(Exception): await confluence_client.close() @@ -516,10 +347,9 @@ async def index_confluence_pages( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() - # Close client if it exists if confluence_client: with contextlib.suppress(Exception): await confluence_client.close() @@ -530,4 +360,4 @@ async def index_confluence_pages( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Confluence pages: {e!s}", exc_info=True) - return 0, f"Failed to index Confluence pages: {e!s}" + return 0, 0, f"Failed to index Confluence pages: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 25491a8f6..6e370fc35 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -1,49 +1,80 @@ -""" -Jira connector indexer. - -Provides real-time document status updates during indexing using a two-phase approach: -- Phase 1: Create all documents with PENDING status (visible in UI immediately) -- Phase 2: Process each document one by one (PENDING → PROCESSING → READY/FAILED) -""" +"""Jira connector indexer using the unified parallel indexing pipeline.""" import contextlib -import time from collections.abc import Awaitable, Callable -from datetime import datetime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.jira_history import JiraHistoryConnector -from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.db import DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_content_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from .base import ( calculate_date_range, - check_document_by_unique_identifier, check_duplicate_document_by_hash, get_connector_by_id, - get_current_timestamp, logger, - safe_set_chunks, update_connector_last_indexed, ) -# Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +def _build_connector_doc( + issue: dict, + formatted_issue: dict, + issue_content: str, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Map a raw Jira issue dict to a ConnectorDocument.""" + issue_id = issue.get("key", "") + issue_identifier = issue.get("key", "") + issue_title = issue.get("id", "") + state = formatted_issue.get("status", "Unknown") + priority = formatted_issue.get("priority", "Unknown") + comment_count = len(formatted_issue.get("comments", [])) + + metadata = { + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": state, + "priority": priority, + "comment_count": comment_count, + "connector_id": connector_id, + "document_type": "Jira Issue", + "connector_type": "Jira", + } + + fallback_summary = ( + f"Jira Issue {issue_identifier}: {issue_title}\n\n" + f"Status: {state}\n\n{issue_content}" + ) + + return ConnectorDocument( + title=f"{issue_identifier}: {issue_title}", + source_markdown=issue_content, + unique_id=issue_id, + document_type=DocumentType.JIRA_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_jira_issues( session: AsyncSession, connector_id: int, @@ -53,26 +84,9 @@ async def index_jira_issues( end_date: str | None = None, update_last_indexed: bool = True, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str | None]: - """ - Index Jira issues and comments. - - Args: - session: Database session - connector_id: ID of the Jira connector - search_space_id: ID of the search space to store documents in - user_id: User ID - start_date: Start date for indexing (YYYY-MM-DD format) - end_date: End date for indexing (YYYY-MM-DD format) - update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - on_heartbeat_callback: Optional callback to update notification during long-running indexing. - - Returns: - Tuple containing (number of documents indexed, error message or None) - """ +) -> tuple[int, int, str | None]: + """Index Jira issues and comments.""" task_logger = TaskLoggingService(session, search_space_id) - - # Log task start log_entry = await task_logger.log_task_start( task_name="jira_issues_indexing", source="connector_indexing_task", @@ -86,7 +100,6 @@ async def index_jira_issues( ) try: - # Get the connector from the database connector = await get_connector_by_id( session, connector_id, SearchSourceConnectorType.JIRA_CONNECTOR ) @@ -98,24 +111,15 @@ async def index_jira_issues( "Connector not found", {"error_type": "ConnectorNotFound"}, ) - return 0, f"Connector with ID {connector_id} not found" + return 0, 0, f"Connector with ID {connector_id} not found" - # Initialize Jira client with internal refresh capability - # Token refresh will happen automatically when needed await task_logger.log_task_progress( log_entry, f"Initializing Jira client for connector {connector_id}", {"stage": "client_initialization"}, ) - - logger.info(f"Initializing Jira client for connector {connector_id}") - - # Create connector with session and connector_id for internal refresh - # Token refresh will happen automatically when needed jira_client = JiraHistoryConnector(session=session, connector_id=connector_id) - # Calculate date range - # Handle "undefined" strings from frontend if start_date == "undefined" or start_date == "": start_date = None if end_date == "undefined" or end_date == "": @@ -135,19 +139,14 @@ async def index_jira_issues( }, ) - # Get issues within date range try: issues, error = await jira_client.get_issues_by_date_range( start_date=start_date_str, end_date=end_date_str, include_comments=True ) if error: - # Don't treat "No issues found" as an error that should stop indexing if "No issues found" in error: logger.info(f"No Jira issues found: {error}") - logger.info( - "No issues found is not a critical error, continuing with update" - ) if update_last_indexed: await update_connector_last_indexed( session, connector, update_last_indexed @@ -162,7 +161,8 @@ async def index_jira_issues( f"No Jira issues found in date range {start_date_str} to {end_date_str}", {"issues_found": 0}, ) - return 0, None + await jira_client.close() + return 0, 0, None else: logger.error(f"Failed to get Jira issues: {error}") await task_logger.log_task_failure( @@ -171,29 +171,30 @@ async def index_jira_issues( "API Error", {"error_type": "APIError"}, ) - return 0, f"Failed to get Jira issues: {error}" + await jira_client.close() + return 0, 0, f"Failed to get Jira issues: {error}" logger.info(f"Retrieved {len(issues)} issues from Jira API") except Exception as e: logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True) - return 0, f"Error fetching Jira issues: {e!s}" + await jira_client.close() + return 0, 0, f"Error fetching Jira issues: {e!s}" - # ======================================================================= - # PHASE 1: Analyze all issues, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - documents_indexed = 0 + if not issues: + logger.info("No Jira issues found for the specified date range") + if update_last_indexed: + await update_connector_last_indexed( + session, connector, update_last_indexed + ) + await session.commit() + await jira_client.close() + return 0, 0, None + + connector_docs: list[ConnectorDocument] = [] documents_skipped = 0 - documents_failed = 0 duplicate_content_count = 0 - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() - - issues_to_process = [] # List of dicts with document and issue data - new_documents_created = False - for issue in issues: try: issue_id = issue.get("key") @@ -207,10 +208,7 @@ async def index_jira_issues( documents_skipped += 1 continue - # Format the issue for better readability formatted_issue = jira_client.format_issue(issue) - - # Convert to markdown issue_content = jira_client.format_issue_to_markdown(formatted_issue) if not issue_content: @@ -220,53 +218,19 @@ async def index_jira_issues( documents_skipped += 1 continue - # Generate unique identifier hash for this Jira issue - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.JIRA_CONNECTOR, issue_id, search_space_id + doc = _build_connector_doc( + issue, + formatted_issue, + issue_content, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector.enable_summary, ) - # Generate content hash - content_hash = generate_content_hash(issue_content, search_space_id) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - comment_count = len(formatted_issue.get("comments", [])) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - issues_to_process.append( - { - "document": existing_document, - "is_new": False, - "issue_content": issue_content, - "content_hash": content_hash, - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "formatted_issue": formatted_issue, - "comment_count": comment_count, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash + session, compute_content_hash(doc) ) if duplicate_by_content: @@ -279,160 +243,37 @@ async def index_jira_issues( documents_skipped += 1 continue - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=f"{issue_identifier}: {issue_title}", - document_type=DocumentType.JIRA_CONNECTOR, - document_metadata={ - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": formatted_issue.get("status", "Unknown"), - "comment_count": comment_count, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - issues_to_process.append( - { - "document": document, - "is_new": True, - "issue_content": issue_content, - "content_hash": content_hash, - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "formatted_issue": formatted_issue, - "comment_count": comment_count, - } - ) - - except Exception as e: - logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True) - documents_failed += 1 - continue - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(issues_to_process)} documents") - - for item in issues_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and connector.enable_summary: - document_metadata = { - "issue_key": item["issue_identifier"], - "issue_title": item["issue_title"], - "status": item["formatted_issue"].get("status", "Unknown"), - "priority": item["formatted_issue"].get("priority", "Unknown"), - "comment_count": item["comment_count"], - "document_type": "Jira Issue", - "connector_type": "Jira", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["issue_content"], user_llm, document_metadata - ) - else: - summary_content = f"Jira Issue {item['issue_identifier']}: {item['issue_title']}\n\n{item['issue_content']}" - summary_embedding = embed_text(summary_content) - - # Process chunks - using the full issue content with comments - chunks = await create_document_chunks(item["issue_content"]) - - # Update document to READY with actual content - document.title = f"{item['issue_identifier']}: {item['issue_title']}" - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "issue_id": item["issue_id"], - "issue_identifier": item["issue_identifier"], - "issue_title": item["issue_title"], - "state": item["formatted_issue"].get("status", "Unknown"), - "comment_count": item["comment_count"], - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "connector_id": connector_id, - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents (for ready status updates) - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Jira issues processed so far" - ) - await session.commit() + connector_docs.append(doc) except Exception as e: logger.error( - f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}", + f"Error building ConnectorDocument for issue {issue_identifier}: {e!s}", exc_info=True, ) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - documents_failed += 1 - continue # Skip this issue and continue with others + documents_skipped += 1 + continue + + pipeline = IndexingPipelineService(session) + await pipeline.migrate_legacy_docs(connector_docs) + + async def _get_llm(s: AsyncSession): + return await get_user_long_context_llm(s, user_id, search_space_id) + + _, documents_indexed, documents_failed = await pipeline.index_batch_parallel( + connector_docs, + _get_llm, + max_concurrency=3, + on_heartbeat=on_heartbeat_callback, + heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, + ) - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit to ensure all documents are persisted (safety net) logger.info(f"Final commit: Total {documents_indexed} Jira issues processed") try: await session.commit() logger.info("Successfully committed all JIRA document changes to database") except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower() @@ -447,7 +288,6 @@ async def index_jira_issues( else: raise - # Build warning message if there were issues warning_parts = [] if duplicate_content_count > 0: warning_parts.append(f"{duplicate_content_count} duplicate") @@ -455,7 +295,6 @@ async def index_jira_issues( warning_parts.append(f"{documents_failed} failed") warning_message = ", ".join(warning_parts) if warning_parts else None - # Log success await task_logger.log_task_success( log_entry, f"Successfully completed JIRA indexing for connector {connector_id}", @@ -466,17 +305,13 @@ async def index_jira_issues( "duplicate_content_count": duplicate_content_count, }, ) - logger.info( f"JIRA indexing completed: {documents_indexed} ready, " f"{documents_skipped} skipped, {documents_failed} failed " f"({duplicate_content_count} duplicate content)" ) - - # Clean up the connector await jira_client.close() - - return documents_indexed, warning_message + return documents_indexed, documents_skipped, warning_message except SQLAlchemyError as db_error: await session.rollback() @@ -487,11 +322,10 @@ async def index_jira_issues( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - # Clean up the connector in case of error if "jira_client" in locals(): with contextlib.suppress(Exception): await jira_client.close() - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -501,8 +335,7 @@ async def index_jira_issues( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True) - # Clean up the connector in case of error if "jira_client" in locals(): with contextlib.suppress(Exception): await jira_client.close() - return 0, f"Failed to index JIRA issues: {e!s}" + return 0, 0, f"Failed to index JIRA issues: {e!s}" diff --git a/surfsense_backend/tests/unit/connector_indexers/test_confluence_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_confluence_parallel.py new file mode 100644 index 000000000..d11c88b64 --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_confluence_parallel.py @@ -0,0 +1,373 @@ +"""Tests for Confluence indexer migrated to the unified parallel pipeline.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import app.tasks.connector_indexers.confluence_indexer as _mod +from app.db import DocumentType +from app.tasks.connector_indexers.confluence_indexer import ( + _build_connector_doc, + index_confluence_pages, +) + +pytestmark = pytest.mark.unit + +_USER_ID = "00000000-0000-0000-0000-000000000001" +_CONNECTOR_ID = 42 +_SEARCH_SPACE_ID = 1 + + +def _make_page( + page_id: str = "p1", + title: str = "Home", + space_id: str = "S1", + body: str = "
Hello
", + comments=None, +): + return { + "id": page_id, + "title": title, + "spaceId": space_id, + "body": {"storage": {"value": body}}, + "comments": comments or [], + } + + +def _to_markdown(page: dict) -> str: + page_title = page.get("title", "") + page_content = page.get("body", {}).get("storage", {}).get("value", "") + comments = page.get("comments", []) + comments_content = "" + if comments: + comments_content = "\n\n## Comments\n\n" + for comment in comments: + comment_body = ( + comment.get("body", {}).get("storage", {}).get("value", "") + ) + comment_author = comment.get("version", {}).get("authorId", "Unknown") + comment_date = comment.get("version", {}).get("createdAt", "") + comments_content += ( + f"**Comment by {comment_author}** ({comment_date}):\n" + f"{comment_body}\n\n" + ) + return f"# {page_title}\n\n{page_content}{comments_content}" + + +# --------------------------------------------------------------------------- +# Slice 1: _build_connector_doc tracer bullet +# --------------------------------------------------------------------------- + + +async def test_build_connector_doc_produces_correct_fields(): + page = _make_page( + page_id="abc-123", + title="Engineering Handbook", + space_id="ENG", + comments=[{"id": "c1"}], + ) + markdown = _to_markdown(page) + + doc = _build_connector_doc( + page, + markdown, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert doc.title == "Engineering Handbook" + assert doc.unique_id == "abc-123" + assert doc.document_type == DocumentType.CONFLUENCE_CONNECTOR + assert doc.source_markdown == markdown + assert doc.search_space_id == _SEARCH_SPACE_ID + assert doc.connector_id == _CONNECTOR_ID + assert doc.created_by_id == _USER_ID + assert doc.should_summarize is True + assert doc.metadata["page_id"] == "abc-123" + assert doc.metadata["page_title"] == "Engineering Handbook" + assert doc.metadata["space_id"] == "ENG" + assert doc.metadata["comment_count"] == 1 + assert doc.metadata["connector_id"] == _CONNECTOR_ID + assert doc.metadata["document_type"] == "Confluence Page" + assert doc.metadata["connector_type"] == "Confluence" + assert doc.fallback_summary is not None + assert "Engineering Handbook" in doc.fallback_summary + assert markdown in doc.fallback_summary + + +async def test_build_connector_doc_summary_disabled(): + doc = _build_connector_doc( + _make_page(), + _to_markdown(_make_page()), + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=False, + ) + assert doc.should_summarize is False + + +# --------------------------------------------------------------------------- +# Shared fixtures for Slices 2-7 +# --------------------------------------------------------------------------- + + +def _mock_connector(enable_summary: bool = True): + c = MagicMock() + c.config = {"access_token": "tok"} + c.enable_summary = enable_summary + c.last_indexed_at = None + return c + + +def _mock_confluence_client(pages=None, error=None): + client = MagicMock() + client.get_pages_by_date_range = AsyncMock( + return_value=(pages if pages is not None else [], error), + ) + client.close = AsyncMock() + return client + + +@pytest.fixture +def confluence_mocks(monkeypatch): + mock_session = AsyncMock() + mock_session.no_autoflush = MagicMock() + + mock_connector = _mock_connector() + monkeypatch.setattr( + _mod, "get_connector_by_id", AsyncMock(return_value=mock_connector), + ) + + confluence_client = _mock_confluence_client(pages=[_make_page()]) + monkeypatch.setattr( + _mod, "ConfluenceHistoryConnector", MagicMock(return_value=confluence_client), + ) + + monkeypatch.setattr( + _mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None), + ) + monkeypatch.setattr( + _mod, "update_connector_last_indexed", AsyncMock(), + ) + monkeypatch.setattr( + _mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")), + ) + + mock_task_logger = MagicMock() + mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock()) + mock_task_logger.log_task_progress = AsyncMock() + mock_task_logger.log_task_success = AsyncMock() + mock_task_logger.log_task_failure = AsyncMock() + monkeypatch.setattr( + _mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger), + ) + + batch_mock = AsyncMock(return_value=([], 1, 0)) + pipeline_mock = MagicMock() + pipeline_mock.index_batch_parallel = batch_mock + pipeline_mock.migrate_legacy_docs = AsyncMock() + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock), + ) + + return { + "session": mock_session, + "connector": mock_connector, + "confluence_client": confluence_client, + "task_logger": mock_task_logger, + "pipeline_mock": pipeline_mock, + "batch_mock": batch_mock, + } + + +async def _run_index(mocks, **overrides): + return await index_confluence_pages( + session=mocks["session"], + connector_id=overrides.get("connector_id", _CONNECTOR_ID), + search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID), + user_id=overrides.get("user_id", _USER_ID), + start_date=overrides.get("start_date", "2025-01-01"), + end_date=overrides.get("end_date", "2025-12-31"), + update_last_indexed=overrides.get("update_last_indexed", True), + on_heartbeat_callback=overrides.get("on_heartbeat_callback"), + ) + + +# --------------------------------------------------------------------------- +# Slice 2: Full pipeline wiring +# --------------------------------------------------------------------------- + + +async def test_one_page_calls_pipeline_and_returns_indexed_count(confluence_mocks): + indexed, skipped, warning = await _run_index(confluence_mocks) + assert indexed == 1 + assert skipped == 0 + assert warning is None + + confluence_mocks["batch_mock"].assert_called_once() + connector_docs = confluence_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert connector_docs[0].document_type == DocumentType.CONFLUENCE_CONNECTOR + + +async def test_pipeline_called_with_max_concurrency_3(confluence_mocks): + await _run_index(confluence_mocks) + call_kwargs = confluence_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("max_concurrency") == 3 + + +async def test_migrate_legacy_docs_called_before_indexing(confluence_mocks): + await _run_index(confluence_mocks) + confluence_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once() + + +# --------------------------------------------------------------------------- +# Slice 3: Page skipping (missing id/title/content) +# --------------------------------------------------------------------------- + + +async def test_pages_with_missing_id_are_skipped(confluence_mocks): + pages = [ + _make_page(page_id="p1", title="Valid"), + _make_page(page_id="", title="Missing id"), + ] + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + pages, + None, + ) + _, skipped, _ = await _run_index(confluence_mocks) + connector_docs = confluence_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +async def test_pages_with_missing_title_are_skipped(confluence_mocks): + pages = [ + _make_page(page_id="p1", title="Valid"), + _make_page(page_id="p2", title=""), + ] + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + pages, + None, + ) + _, skipped, _ = await _run_index(confluence_mocks) + connector_docs = confluence_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +async def test_pages_with_no_content_are_skipped(confluence_mocks): + pages = [ + _make_page(page_id="p1", title="Valid", body="ok
"), + _make_page(page_id="p2", title="Empty", body=""), + ] + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + pages, + None, + ) + _, skipped, _ = await _run_index(confluence_mocks) + connector_docs = confluence_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 4: Duplicate content skipping +# --------------------------------------------------------------------------- + + +async def test_duplicate_content_pages_are_skipped(confluence_mocks, monkeypatch): + pages = [ + _make_page(page_id="p1", title="One"), + _make_page(page_id="p2", title="Two"), + ] + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + pages, + None, + ) + + call_count = 0 + + async def _check_dup(session, content_hash): + nonlocal call_count + call_count += 1 + if call_count == 2: + dup = MagicMock() + dup.id = 99 + dup.document_type = "OTHER" + return dup + return None + + monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup) + + _, skipped, _ = await _run_index(confluence_mocks) + connector_docs = confluence_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 5: Heartbeat callback forwarding +# --------------------------------------------------------------------------- + + +async def test_heartbeat_callback_forwarded_to_pipeline(confluence_mocks): + heartbeat_cb = AsyncMock() + await _run_index(confluence_mocks, on_heartbeat_callback=heartbeat_cb) + call_kwargs = confluence_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("on_heartbeat") is heartbeat_cb + + +# --------------------------------------------------------------------------- +# Slice 6: Empty pages and no-data success tuple +# --------------------------------------------------------------------------- + + +async def test_empty_pages_returns_zero_tuple(confluence_mocks): + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + [], + None, + ) + indexed, skipped, warning = await _run_index(confluence_mocks) + assert indexed == 0 + assert skipped == 0 + assert warning is None + confluence_mocks["batch_mock"].assert_not_called() + + +async def test_no_pages_error_message_returns_success_tuple(confluence_mocks): + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + [], + "No pages found in date range", + ) + indexed, skipped, warning = await _run_index(confluence_mocks) + assert indexed == 0 + assert skipped == 0 + assert warning is None + + +async def test_api_error_still_returns_3_tuple(confluence_mocks): + confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = ( + [], + "API exploded", + ) + result = await _run_index(confluence_mocks) + assert len(result) == 3 + assert result[0] == 0 + assert result[1] == 0 + assert "Failed to get Confluence pages" in result[2] + + +# --------------------------------------------------------------------------- +# Slice 7: Failed docs warning +# --------------------------------------------------------------------------- + + +async def test_failed_docs_warning_in_result(confluence_mocks): + confluence_mocks["batch_mock"].return_value = ([], 0, 2) + _, _, warning = await _run_index(confluence_mocks) + assert warning is not None + assert "2 failed" in warning diff --git a/surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py new file mode 100644 index 000000000..e8699c158 --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py @@ -0,0 +1,372 @@ +"""Tests for Jira indexer migrated to the unified parallel pipeline.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import app.tasks.connector_indexers.jira_indexer as _mod +from app.db import DocumentType +from app.tasks.connector_indexers.jira_indexer import ( + _build_connector_doc, + index_jira_issues, +) + +pytestmark = pytest.mark.unit + +_USER_ID = "00000000-0000-0000-0000-000000000001" +_CONNECTOR_ID = 42 +_SEARCH_SPACE_ID = 1 + + +def _make_issue( + issue_key: str = "ENG-1", + issue_id: str = "10001", + title: str = "Fix login", +): + return {"key": issue_key, "id": issue_id, "title": title} + + +def _make_formatted_issue( + issue_key: str = "ENG-1", + issue_id: str = "10001", + title: str = "Fix login", + status: str = "In Progress", + priority: str = "High", + comments=None, +): + return { + "key": issue_key, + "id": issue_id, + "title": title, + "status": status, + "priority": priority, + "comments": comments or [], + } + + +# --------------------------------------------------------------------------- +# Slice 1: _build_connector_doc tracer bullet +# --------------------------------------------------------------------------- + + +async def test_build_connector_doc_produces_correct_fields(): + issue = _make_issue(issue_key="ENG-42", issue_id="4242", title="Fix auth bug") + formatted = _make_formatted_issue( + issue_key="ENG-42", + issue_id="4242", + title="Fix auth bug", + status="Done", + priority="Urgent", + comments=[{"id": "c1"}], + ) + markdown = "# ENG-42: Fix auth bug\n\nBody" + + doc = _build_connector_doc( + issue, + formatted, + markdown, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert doc.title == "ENG-42: 4242" + assert doc.unique_id == "ENG-42" + assert doc.document_type == DocumentType.JIRA_CONNECTOR + assert doc.source_markdown == markdown + assert doc.search_space_id == _SEARCH_SPACE_ID + assert doc.connector_id == _CONNECTOR_ID + assert doc.created_by_id == _USER_ID + assert doc.should_summarize is True + assert doc.metadata["issue_id"] == "ENG-42" + assert doc.metadata["issue_identifier"] == "ENG-42" + assert doc.metadata["issue_title"] == "4242" + assert doc.metadata["state"] == "Done" + assert doc.metadata["priority"] == "Urgent" + assert doc.metadata["comment_count"] == 1 + assert doc.metadata["connector_id"] == _CONNECTOR_ID + assert doc.metadata["document_type"] == "Jira Issue" + assert doc.metadata["connector_type"] == "Jira" + assert doc.fallback_summary is not None + assert "ENG-42" in doc.fallback_summary + assert markdown in doc.fallback_summary + + +async def test_build_connector_doc_summary_disabled(): + doc = _build_connector_doc( + _make_issue(), + _make_formatted_issue(), + "# content", + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=False, + ) + assert doc.should_summarize is False + + +# --------------------------------------------------------------------------- +# Shared fixtures for Slices 2-7 +# --------------------------------------------------------------------------- + + +def _mock_connector(enable_summary: bool = True): + c = MagicMock() + c.config = {"access_token": "tok"} + c.enable_summary = enable_summary + c.last_indexed_at = None + return c + + +def _mock_jira_client(issues=None, error=None): + client = MagicMock() + client.get_issues_by_date_range = AsyncMock( + return_value=(issues if issues is not None else [], error), + ) + client.format_issue = MagicMock( + side_effect=lambda i: _make_formatted_issue( + issue_key=i.get("key", ""), + issue_id=i.get("id", ""), + title=i.get("title", ""), + ) + ) + client.format_issue_to_markdown = MagicMock( + side_effect=lambda fi: f"# {fi.get('key', '')}: {fi.get('id', '')}\n\nContent" + ) + client.close = AsyncMock() + return client + + +@pytest.fixture +def jira_mocks(monkeypatch): + mock_session = AsyncMock() + mock_session.no_autoflush = MagicMock() + + mock_connector = _mock_connector() + monkeypatch.setattr( + _mod, "get_connector_by_id", AsyncMock(return_value=mock_connector), + ) + + jira_client = _mock_jira_client(issues=[_make_issue()]) + monkeypatch.setattr( + _mod, "JiraHistoryConnector", MagicMock(return_value=jira_client), + ) + + monkeypatch.setattr( + _mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None), + ) + monkeypatch.setattr( + _mod, "update_connector_last_indexed", AsyncMock(), + ) + monkeypatch.setattr( + _mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")), + ) + + mock_task_logger = MagicMock() + mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock()) + mock_task_logger.log_task_progress = AsyncMock() + mock_task_logger.log_task_success = AsyncMock() + mock_task_logger.log_task_failure = AsyncMock() + monkeypatch.setattr( + _mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger), + ) + + batch_mock = AsyncMock(return_value=([], 1, 0)) + pipeline_mock = MagicMock() + pipeline_mock.index_batch_parallel = batch_mock + pipeline_mock.migrate_legacy_docs = AsyncMock() + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock), + ) + + return { + "session": mock_session, + "connector": mock_connector, + "jira_client": jira_client, + "task_logger": mock_task_logger, + "pipeline_mock": pipeline_mock, + "batch_mock": batch_mock, + } + + +async def _run_index(mocks, **overrides): + return await index_jira_issues( + session=mocks["session"], + connector_id=overrides.get("connector_id", _CONNECTOR_ID), + search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID), + user_id=overrides.get("user_id", _USER_ID), + start_date=overrides.get("start_date", "2025-01-01"), + end_date=overrides.get("end_date", "2025-12-31"), + update_last_indexed=overrides.get("update_last_indexed", True), + on_heartbeat_callback=overrides.get("on_heartbeat_callback"), + ) + + +# --------------------------------------------------------------------------- +# Slice 2: Full pipeline wiring +# --------------------------------------------------------------------------- + + +async def test_one_issue_calls_pipeline_and_returns_indexed_count(jira_mocks): + indexed, skipped, warning = await _run_index(jira_mocks) + assert indexed == 1 + assert skipped == 0 + assert warning is None + + jira_mocks["batch_mock"].assert_called_once() + connector_docs = jira_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert connector_docs[0].document_type == DocumentType.JIRA_CONNECTOR + + +async def test_pipeline_called_with_max_concurrency_3(jira_mocks): + await _run_index(jira_mocks) + call_kwargs = jira_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("max_concurrency") == 3 + + +async def test_migrate_legacy_docs_called_before_indexing(jira_mocks): + await _run_index(jira_mocks) + jira_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once() + + +# --------------------------------------------------------------------------- +# Slice 3: Issue skipping (missing key/title/content) +# --------------------------------------------------------------------------- + + +async def test_issues_with_missing_key_are_skipped(jira_mocks): + issues = [ + _make_issue(issue_key="ENG-1", issue_id="10001"), + {"key": "", "id": "10002", "title": "No key"}, + ] + jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) + + _, skipped, _ = await _run_index(jira_mocks) + connector_docs = jira_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +async def test_issues_with_missing_title_are_skipped(jira_mocks): + issues = [ + _make_issue(issue_key="ENG-1", issue_id="10001"), + {"key": "ENG-2", "id": "", "title": "Missing id used as title"}, + ] + jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) + + _, skipped, _ = await _run_index(jira_mocks) + connector_docs = jira_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +async def test_issues_with_no_content_are_skipped(jira_mocks): + issues = [ + _make_issue(issue_key="ENG-1", issue_id="10001"), + _make_issue(issue_key="ENG-2", issue_id="10002"), + ] + jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) + + jira_mocks["jira_client"].format_issue_to_markdown.side_effect = [ + "# ENG-1: 10001\n\nContent", + "", + ] + _, skipped, _ = await _run_index(jira_mocks) + connector_docs = jira_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 4: Duplicate content skipping +# --------------------------------------------------------------------------- + + +async def test_duplicate_content_issues_are_skipped(jira_mocks, monkeypatch): + issues = [ + _make_issue(issue_key="ENG-1", issue_id="10001"), + _make_issue(issue_key="ENG-2", issue_id="10002"), + ] + jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) + + call_count = 0 + + async def _check_dup(session, content_hash): + nonlocal call_count + call_count += 1 + if call_count == 2: + dup = MagicMock() + dup.id = 99 + dup.document_type = "OTHER" + return dup + return None + + monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup) + + _, skipped, _ = await _run_index(jira_mocks) + connector_docs = jira_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 5: Heartbeat callback forwarding +# --------------------------------------------------------------------------- + + +async def test_heartbeat_callback_forwarded_to_pipeline(jira_mocks): + heartbeat_cb = AsyncMock() + await _run_index(jira_mocks, on_heartbeat_callback=heartbeat_cb) + call_kwargs = jira_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("on_heartbeat") is heartbeat_cb + + +# --------------------------------------------------------------------------- +# Slice 6: Empty issues and no-data success tuple +# --------------------------------------------------------------------------- + + +async def test_empty_issues_returns_zero_tuple(jira_mocks): + jira_mocks["jira_client"].get_issues_by_date_range.return_value = ([], None) + indexed, skipped, warning = await _run_index(jira_mocks) + assert indexed == 0 + assert skipped == 0 + assert warning is None + jira_mocks["batch_mock"].assert_not_called() + + +async def test_no_issues_error_message_returns_success_tuple(jira_mocks): + jira_mocks["jira_client"].get_issues_by_date_range.return_value = ( + [], + "No issues found in date range", + ) + indexed, skipped, warning = await _run_index(jira_mocks) + assert indexed == 0 + assert skipped == 0 + assert warning is None + + +async def test_api_error_still_returns_3_tuple(jira_mocks): + jira_mocks["jira_client"].get_issues_by_date_range.return_value = ( + [], + "API exploded", + ) + result = await _run_index(jira_mocks) + assert len(result) == 3 + assert result[0] == 0 + assert result[1] == 0 + assert "Failed to get Jira issues" in result[2] + + +# --------------------------------------------------------------------------- +# Slice 7: Failed docs warning +# --------------------------------------------------------------------------- + + +async def test_failed_docs_warning_in_result(jira_mocks): + jira_mocks["batch_mock"].return_value = ([], 0, 2) + _, _, warning = await _run_index(jira_mocks) + assert warning is not None + assert "2 failed" in warning