diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 6e9ccaa01..38d931588 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -1,48 +1,84 @@ """ Linear connector indexer. -Implements 2-phase document status updates for real-time UI feedback: -- Phase 1: Create all documents with 'pending' status (visible in UI immediately) -- Phase 2: Process each document: pending → processing → ready/failed +Uses the shared IndexingPipelineService for document deduplication, +summarization, chunking, and embedding with bounded parallel indexing. """ -import time from collections.abc import Awaitable, Callable -from datetime import datetime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.linear_connector import LinearConnector -from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.db import DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_content_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from .base import ( calculate_date_range, - check_document_by_unique_identifier, check_duplicate_document_by_hash, get_connector_by_id, - get_current_timestamp, logger, - safe_set_chunks, update_connector_last_indexed, ) -# Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +def _build_connector_doc( + issue: dict, + formatted_issue: dict, + issue_content: str, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Map a raw Linear issue dict to a ConnectorDocument.""" + issue_id = issue.get("id", "") + issue_identifier = issue.get("identifier", "") + issue_title = issue.get("title", "") + state = formatted_issue.get("state", "Unknown") + priority = formatted_issue.get("priority", "Unknown") + comment_count = len(formatted_issue.get("comments", [])) + + metadata = { + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": state, + "priority": priority, + "comment_count": comment_count, + "connector_id": connector_id, + "document_type": "Linear Issue", + "connector_type": "Linear", + } + + fallback_summary = ( + f"Linear Issue {issue_identifier}: {issue_title}\n\n" + f"Status: {state}\n\n{issue_content}" + ) + + return ConnectorDocument( + title=f"{issue_identifier}: {issue_title}", + source_markdown=issue_content, + unique_id=issue_id, + document_type=DocumentType.LINEAR_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_linear_issues( session: AsyncSession, connector_id: int, @@ -52,26 +88,15 @@ async def index_linear_issues( end_date: str | None = None, update_last_indexed: bool = True, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str | None]: +) -> tuple[int, int, str | None]: """ Index Linear issues and comments. - Args: - session: Database session - connector_id: ID of the Linear connector - search_space_id: ID of the search space to store documents in - user_id: ID of the user - start_date: Start date for indexing (YYYY-MM-DD format) - end_date: End date for indexing (YYYY-MM-DD format) - update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - on_heartbeat_callback: Optional callback to update notification during long-running indexing. - Returns: - Tuple containing (number of documents indexed, error message or None) + Tuple of (indexed_count, skipped_count, warning_or_error_message) """ task_logger = TaskLoggingService(session, search_space_id) - # Log task start log_entry = await task_logger.log_task_start( task_name="linear_issues_indexing", source="connector_indexing_task", @@ -85,7 +110,7 @@ async def index_linear_issues( ) try: - # Get the connector + # ── Connector lookup ────────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Retrieving Linear connector {connector_id} from database", @@ -104,11 +129,11 @@ async def index_linear_issues( {"error_type": "ConnectorNotFound"}, ) return ( + 0, 0, f"Connector with ID {connector_id} not found or is not a Linear connector", ) - # Check if access_token exists (support both new OAuth format and old API key format) if not connector.config.get("access_token") and not connector.config.get( "LINEAR_API_KEY" ): @@ -118,26 +143,22 @@ async def index_linear_issues( "Missing Linear access token", {"error_type": "MissingToken"}, ) - return 0, "Linear access token not found in connector config" + return 0, 0, "Linear access token not found in connector config" - # Initialize Linear client with internal refresh capability + # ── Client init ─────────────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Initializing Linear client for connector {connector_id}", {"stage": "client_initialization"}, ) - # Create connector with session and connector_id for internal refresh - # Token refresh will happen automatically when needed linear_client = LinearConnector(session=session, connector_id=connector_id) - # Handle 'undefined' string from frontend (treat as None) if start_date == "undefined" or start_date == "": start_date = None if end_date == "undefined" or end_date == "": end_date = None - # Calculate date range start_date_str, end_date_str = calculate_date_range( connector, start_date, end_date, default_days_back=365 ) @@ -154,37 +175,34 @@ async def index_linear_issues( }, ) - # Get issues within date range + # ── Fetch issues ────────────────────────────────────────────── try: issues, error = await linear_client.get_issues_by_date_range( - start_date=start_date_str, end_date=end_date_str, include_comments=True + start_date=start_date_str, + end_date=end_date_str, + include_comments=True, ) if error: - # Don't treat "No issues found" as an error that should stop indexing if "No issues found" in error: logger.info(f"No Linear issues found: {error}") - logger.info( - "No issues found is not a critical error, continuing with update" - ) if update_last_indexed: await update_connector_last_indexed( session, connector, update_last_indexed ) await session.commit() - logger.info( - f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found" - ) - return 0, None + return 0, 0, None else: logger.error(f"Failed to get Linear issues: {error}") - return 0, f"Failed to get Linear issues: {error}" + return 0, 0, f"Failed to get Linear issues: {error}" logger.info(f"Retrieved {len(issues)} issues from Linear API") except Exception as e: - logger.error(f"Exception when calling Linear API: {e!s}", exc_info=True) - return 0, f"Failed to get Linear issues: {e!s}" + logger.error( + f"Exception when calling Linear API: {e!s}", exc_info=True + ) + return 0, 0, f"Failed to get Linear issues: {e!s}" if not issues: logger.info("No Linear issues found for the specified date range") @@ -193,19 +211,12 @@ async def index_linear_issues( session, connector, update_last_indexed ) await session.commit() - logger.info( - f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found" - ) - return 0, None # Return None instead of error message when no issues found + return 0, 0, None - # Track the number of documents indexed - documents_indexed = 0 + # ── Build ConnectorDocuments ────────────────────────────────── + connector_docs: list[ConnectorDocument] = [] documents_skipped = 0 - documents_failed = 0 # Track issues that failed processing - skipped_issues = [] - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() + duplicate_content_count = 0 await task_logger.log_task_progress( log_entry, @@ -213,13 +224,6 @@ async def index_linear_issues( {"stage": "process_issues", "total_issues": len(issues)}, ) - # ======================================================================= - # PHASE 1: Analyze all issues, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - issues_to_process = [] # List of dicts with document and issue data - new_documents_created = False - for issue in issues: try: issue_id = issue.get("id", "") @@ -230,271 +234,102 @@ async def index_linear_issues( logger.warning( f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}" ) - skipped_issues.append( - f"{issue_identifier or 'Unknown'} (missing data)" - ) documents_skipped += 1 continue - # Format the issue first to get well-structured data formatted_issue = linear_client.format_issue(issue) - - # Convert issue to markdown format - issue_content = linear_client.format_issue_to_markdown(formatted_issue) + issue_content = linear_client.format_issue_to_markdown( + formatted_issue + ) if not issue_content: logger.warning( f"Skipping issue with no content: {issue_identifier} - {issue_title}" ) - skipped_issues.append(f"{issue_identifier} (no content)") documents_skipped += 1 continue - # Generate unique identifier hash for this Linear issue - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.LINEAR_CONNECTOR, issue_id, search_space_id - ) - - # Generate content hash - content_hash = generate_content_hash(issue_content, search_space_id) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - state = formatted_issue.get("state", "Unknown") - description = formatted_issue.get("description", "") - comment_count = len(formatted_issue.get("comments", [])) - priority = formatted_issue.get("priority", "Unknown") - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - logger.info( - f"Document for Linear issue {issue_identifier} unchanged. Skipping." - ) - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - issues_to_process.append( - { - "document": existing_document, - "is_new": False, - "issue_content": issue_content, - "content_hash": content_hash, - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "description": description, - "comment_count": comment_count, - "priority": priority, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - - if duplicate_by_content: - logger.info( - f"Linear issue {issue_identifier} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) - documents_skipped += 1 - continue - - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=f"{issue_identifier}: {issue_title}", - document_type=DocumentType.LINEAR_CONNECTOR, - document_metadata={ - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "comment_count": comment_count, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, + doc = _build_connector_doc( + issue, + formatted_issue, + issue_content, connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - issues_to_process.append( - { - "document": document, - "is_new": True, - "issue_content": issue_content, - "content_hash": content_hash, - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "description": description, - "comment_count": comment_count, - "priority": priority, - } + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector.enable_summary, ) - except Exception as e: - logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True) - documents_failed += 1 - continue - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(issues_to_process)} documents") - - for item in issues_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and connector.enable_summary: - document_metadata_for_summary = { - "issue_id": item["issue_identifier"], - "issue_title": item["issue_title"], - "state": item["state"], - "priority": item["priority"], - "comment_count": item["comment_count"], - "document_type": "Linear Issue", - "connector_type": "Linear", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["issue_content"], user_llm, document_metadata_for_summary + with session.no_autoflush: + duplicate = await check_duplicate_document_by_hash( + session, compute_content_hash(doc) ) - else: - summary_content = f"Linear Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['state']}\n\n{item['issue_content']}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(item["issue_content"]) - - # Update document to READY with actual content - document.title = f"{item['issue_identifier']}: {item['issue_title']}" - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "issue_id": item["issue_id"], - "issue_identifier": item["issue_identifier"], - "issue_title": item["issue_title"], - "state": item["state"], - "comment_count": item["comment_count"], - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "connector_id": connector_id, - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents (for ready status updates) - if documents_indexed % 10 == 0: + if duplicate: logger.info( - f"Committing batch: {documents_indexed} Linear issues processed so far" + f"Linear issue {doc.title} already indexed by another connector " + f"(existing document ID: {duplicate.id}, " + f"type: {duplicate.document_type}). Skipping." ) - await session.commit() + duplicate_content_count += 1 + documents_skipped += 1 + continue + + connector_docs.append(doc) except Exception as e: logger.error( - f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}", + f"Error building ConnectorDocument for issue: {e!s}", exc_info=True, ) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - skipped_issues.append( - f"{item.get('issue_identifier', 'Unknown')} (processing error)" - ) - documents_failed += 1 + documents_skipped += 1 continue - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs + # ── Pipeline: migrate legacy docs + parallel index ──────────── + pipeline = IndexingPipelineService(session) + + await pipeline.migrate_legacy_docs(connector_docs) + + async def _get_llm(s): + return await get_user_long_context_llm(s, user_id, search_space_id) + + _, documents_indexed, documents_failed = await pipeline.index_batch_parallel( + connector_docs, + _get_llm, + max_concurrency=3, + on_heartbeat=on_heartbeat_callback, + heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, + ) + + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit for any remaining documents not yet committed in batches - logger.info(f"Final commit: Total {documents_indexed} Linear issues processed") + logger.info( + f"Final commit: Total {documents_indexed} Linear issues processed" + ) try: await session.commit() logger.info( "Successfully committed all Linear document changes to database" ) except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower() ): logger.warning( f"Duplicate content_hash detected during final commit. " - f"This may occur if the same issue was indexed by multiple connectors. " f"Rolling back and continuing. Error: {e!s}" ) await session.rollback() else: raise - # Build warning message if there were issues - warning_parts = [] + warning_parts: list[str] = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") if documents_failed > 0: warning_parts.append(f"{documents_failed} failed") warning_message = ", ".join(warning_parts) if warning_parts else None - # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Linear indexing for connector {connector_id}", @@ -503,7 +338,7 @@ async def index_linear_issues( "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "documents_failed": documents_failed, - "skipped_issues_count": len(skipped_issues), + "duplicate_content_count": duplicate_content_count, }, ) @@ -511,7 +346,7 @@ async def index_linear_issues( f"Linear indexing completed: {documents_indexed} ready, " f"{documents_skipped} skipped, {documents_failed} failed" ) - return documents_indexed, warning_message + return documents_indexed, documents_skipped, warning_message except SQLAlchemyError as db_error: await session.rollback() @@ -522,7 +357,7 @@ async def index_linear_issues( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -532,4 +367,4 @@ async def index_linear_issues( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Linear issues: {e!s}", exc_info=True) - return 0, f"Failed to index Linear issues: {e!s}" + return 0, 0, f"Failed to index Linear issues: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 619b8dcd7..6614071a4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -1,12 +1,10 @@ """ Notion connector indexer. -Implements real-time document status updates using a two-phase approach: -- Phase 1: Create all documents with PENDING status (visible in UI immediately) -- Phase 2: Process each document one by one (pending → processing → ready/failed) +Uses the shared IndexingPipelineService for document deduplication, +summarization, chunking, and embedding with bounded parallel indexing. """ -import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -14,42 +12,64 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.notion_history import NotionHistoryConnector -from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.db import DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_content_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from app.utils.notion_utils import process_blocks from .base import ( - build_document_metadata_string, calculate_date_range, - check_document_by_unique_identifier, check_duplicate_document_by_hash, get_connector_by_id, - get_current_timestamp, logger, - safe_set_chunks, update_connector_last_indexed, ) -# Type alias for retry callback -# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] - -# Type alias for heartbeat callback -# Signature: async callback(indexed_count) -> None HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +def _build_connector_doc( + page: dict, + markdown_content: str, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Map a raw Notion page dict to a ConnectorDocument.""" + page_id = page.get("page_id", "") + page_title = page.get("title", f"Untitled page ({page_id})") + + metadata = { + "page_title": page_title, + "page_id": page_id, + "connector_id": connector_id, + "document_type": "Notion Page", + "connector_type": "Notion", + } + + fallback_summary = f"Notion Page: {page_title}\n\n{markdown_content}" + + return ConnectorDocument( + title=page_title, + source_markdown=markdown_content, + unique_id=page_id, + document_type=DocumentType.NOTION_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_notion_pages( session: AsyncSession, connector_id: int, @@ -60,30 +80,15 @@ async def index_notion_pages( update_last_indexed: bool = True, on_retry_callback: RetryCallbackType | None = None, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str | None]: +) -> tuple[int, int, str | None]: """ Index Notion pages from all accessible pages. - Args: - session: Database session - connector_id: ID of the Notion connector - search_space_id: ID of the search space to store documents in - user_id: ID of the user - start_date: Start date for indexing (YYYY-MM-DD format) - end_date: End date for indexing (YYYY-MM-DD format) - update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - on_retry_callback: Optional callback for retry progress notifications. - Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) - retry_reason is one of: 'rate_limit', 'server_error', 'timeout' - on_heartbeat_callback: Optional callback to update notification during long-running indexing. - Called periodically with (indexed_count) to prevent task appearing stuck. - Returns: - Tuple containing (number of documents indexed, error message or None) + Tuple of (indexed_count, skipped_count, warning_or_error_message) """ task_logger = TaskLoggingService(session, search_space_id) - # Log task start log_entry = await task_logger.log_task_start( task_name="notion_pages_indexing", source="connector_indexing_task", @@ -97,7 +102,7 @@ async def index_notion_pages( ) try: - # Get the connector + # ── Connector lookup ────────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Retrieving Notion connector {connector_id} from database", @@ -116,11 +121,11 @@ async def index_notion_pages( {"error_type": "ConnectorNotFound"}, ) return ( + 0, 0, f"Connector with ID {connector_id} not found or is not a Notion connector", ) - # Check if access_token exists (support both new OAuth format and old integration token format) if not connector.config.get("access_token") and not connector.config.get( "NOTION_INTEGRATION_TOKEN" ): @@ -130,9 +135,9 @@ async def index_notion_pages( "Missing Notion access token", {"error_type": "MissingToken"}, ) - return 0, "Notion access token not found in connector config" + return 0, 0, "Notion access token not found in connector config" - # Initialize Notion client with internal refresh capability + # ── Client init ─────────────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Initializing Notion client for connector {connector_id}", @@ -141,18 +146,15 @@ async def index_notion_pages( logger.info(f"Initializing Notion client for connector {connector_id}") - # Handle 'undefined' string from frontend (treat as None) if start_date == "undefined" or start_date == "": start_date = None if end_date == "undefined" or end_date == "": end_date = None - # Calculate date range using the shared utility function start_date_str, end_date_str = calculate_date_range( connector, start_date, end_date, default_days_back=365 ) - # Convert YYYY-MM-DD to ISO format for Notion API start_date_iso = datetime.strptime(start_date_str, "%Y-%m-%d").strftime( "%Y-%m-%dT%H:%M:%SZ" ) @@ -160,13 +162,10 @@ async def index_notion_pages( "%Y-%m-%dT%H:%M:%SZ" ) - # Create connector with session and connector_id for internal refresh - # Token refresh will happen automatically when needed notion_client = NotionHistoryConnector( session=session, connector_id=connector_id ) - # Set retry callback if provided (for user notifications during rate limits) if on_retry_callback: notion_client.set_retry_callback(on_retry_callback) @@ -182,21 +181,19 @@ async def index_notion_pages( }, ) - # Get all pages + # ── Fetch pages ─────────────────────────────────────────────── try: pages = await notion_client.get_all_pages( start_date=start_date_iso, end_date=end_date_iso ) logger.info(f"Found {len(pages)} Notion pages") - # Get count of pages that had unsupported content skipped pages_with_skipped_content = notion_client.get_skipped_content_count() if pages_with_skipped_content > 0: logger.info( f"{pages_with_skipped_content} pages had Notion AI content skipped (not available via API)" ) - # Check if using legacy integration token and log warning if notion_client.is_using_legacy_token(): logger.warning( f"Connector {connector_id} is using legacy integration token. " @@ -204,8 +201,6 @@ async def index_notion_pages( ) except Exception as e: error_str = str(e) - # Check if this is an unsupported block type error (transcription, ai_block, etc.) - # These are known Notion API limitations and should be logged as warnings, not errors unsupported_block_errors = [ "transcription is not supported", "ai_block is not supported", @@ -216,7 +211,6 @@ async def index_notion_pages( ) if is_unsupported_block_error: - # Log as warning since this is a known Notion API limitation logger.warning( f"Notion API limitation for connector {connector_id}: {error_str}. " "This is a known issue with Notion AI blocks (transcription, ai_block) " @@ -229,7 +223,6 @@ async def index_notion_pages( {"error_type": "UnsupportedBlockType", "is_known_limitation": True}, ) else: - # Log as error for other failures logger.error( f"Error fetching Notion pages for connector {connector_id}: {error_str}", exc_info=True, @@ -242,7 +235,7 @@ async def index_notion_pages( ) await notion_client.close() - return 0, f"Failed to get Notion pages: {e!s}" + return 0, 0, f"Failed to get Notion pages: {e!s}" if not pages: await task_logger.log_task_success( @@ -252,21 +245,17 @@ async def index_notion_pages( {"pages_found": 0}, ) logger.info("No Notion pages found to index") - # CRITICAL: Update timestamp even when no pages found so Zero syncs - await update_connector_last_indexed(session, connector, update_last_indexed) + await update_connector_last_indexed( + session, connector, update_last_indexed + ) await session.commit() await notion_client.close() - return 0, None # Success with 0 pages, not an error + return 0, 0, None - # Track the number of documents indexed - documents_indexed = 0 + # ── Build ConnectorDocuments ────────────────────────────────── + connector_docs: list[ConnectorDocument] = [] documents_skipped = 0 - documents_failed = 0 duplicate_content_count = 0 - skipped_pages = [] - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() await task_logger.log_task_progress( log_entry, @@ -274,13 +263,6 @@ async def index_notion_pages( {"stage": "process_pages", "total_pages": len(pages)}, ) - # ======================================================================= - # PHASE 1: Analyze all pages, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - pages_to_process = [] # List of dicts with document and page data - new_documents_created = False - for page in pages: try: page_id = page.get("page_id") @@ -293,225 +275,71 @@ async def index_notion_pages( if not page_content: logger.info(f"No content found in page {page_title}. Skipping.") - skipped_pages.append(f"{page_title} (no content)") documents_skipped += 1 continue - # Convert page content to markdown format markdown_content = f"# Notion Page: {page_title}\n\n" markdown_content += process_blocks(page_content) - # Format document metadata - metadata_sections = [ - ("METADATA", [f"PAGE_TITLE: {page_title}", f"PAGE_ID: {page_id}"]), - ( - "CONTENT", - [ - "FORMAT: markdown", - "TEXT_START", - markdown_content, - "TEXT_END", - ], - ), - ] - - # Build the document string - combined_document_string = build_document_metadata_string( - metadata_sections - ) - - # Generate unique identifier hash for this Notion page - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.NOTION_CONNECTOR, page_id, search_space_id - ) - - # Generate content hash - content_hash = generate_content_hash( - combined_document_string, search_space_id - ) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - pages_to_process.append( - { - "document": existing_document, - "is_new": False, - "markdown_content": markdown_content, - "content_hash": content_hash, - "page_id": page_id, - "page_title": page_title, - } + if not markdown_content.strip(): + logger.warning( + f"Skipping page with empty markdown: {page_title}" ) + documents_skipped += 1 continue - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) + doc = _build_connector_doc( + page, + markdown_content, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector.enable_summary, + ) - if duplicate_by_content: + with session.no_autoflush: + duplicate = await check_duplicate_document_by_hash( + session, compute_content_hash(doc) + ) + if duplicate: logger.info( - f"Notion page {page_title} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." + f"Notion page {doc.title} already indexed by another connector " + f"(existing document ID: {duplicate.id}, " + f"type: {duplicate.document_type}). Skipping." ) duplicate_content_count += 1 documents_skipped += 1 continue - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=page_title, - document_type=DocumentType.NOTION_CONNECTOR, - document_metadata={ - "page_title": page_title, - "page_id": page_id, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - pages_to_process.append( - { - "document": document, - "is_new": True, - "markdown_content": markdown_content, - "content_hash": content_hash, - "page_id": page_id, - "page_title": page_title, - } - ) + connector_docs.append(doc) except Exception as e: - logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) - documents_failed += 1 - continue - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(pages_to_process)} documents") - - for item in pages_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id + logger.error( + f"Error building ConnectorDocument for page: {e!s}", + exc_info=True, ) - - if user_llm and connector.enable_summary: - document_metadata_for_summary = { - "page_title": item["page_title"], - "page_id": item["page_id"], - "document_type": "Notion Page", - "connector_type": "Notion", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["markdown_content"], - user_llm, - document_metadata_for_summary, - ) - else: - summary_content = f"Notion Page: {item['page_title']}\n\n{item['markdown_content']}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(item["markdown_content"]) - - # Update document to READY with actual content - document.title = item["page_title"] - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "page_title": item["page_title"], - "page_id": item["page_id"], - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "connector_id": connector_id, - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents (for ready status updates) - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Notion pages processed so far" - ) - await session.commit() - - except Exception as e: - logger.error(f"Error processing Notion page: {e!s}", exc_info=True) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - skipped_pages.append(f"{item['page_title']} (processing error)") - documents_failed += 1 + documents_skipped += 1 continue - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs + # ── Pipeline: migrate legacy docs + parallel index ──────────── + pipeline = IndexingPipelineService(session) + + await pipeline.migrate_legacy_docs(connector_docs) + + async def _get_llm(s): + return await get_user_long_context_llm(s, user_id, search_space_id) + + _, documents_indexed, documents_failed = await pipeline.index_batch_parallel( + connector_docs, + _get_llm, + max_concurrency=3, + on_heartbeat=on_heartbeat_callback, + heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, + ) + + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) - total_processed = documents_indexed - - # Final commit to ensure all documents are persisted (safety net) logger.info(f"Final commit: Total {documents_indexed} documents processed") try: await session.commit() @@ -519,59 +347,53 @@ async def index_notion_pages( "Successfully committed all Notion document changes to database" ) except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower() ): logger.warning( f"Duplicate content_hash detected during final commit. " - f"This may occur if the same page was indexed by multiple connectors. " f"Rolling back and continuing. Error: {e!s}" ) await session.rollback() - # Don't fail the entire task - some documents may have been successfully indexed else: raise - # Get final count of pages with skipped Notion AI content + # ── Build warning / notification message ────────────────────── pages_with_skipped_ai_content = notion_client.get_skipped_content_count() - # Build warning message if there were issues - warning_parts = [] + warning_parts: list[str] = [] if duplicate_content_count > 0: warning_parts.append(f"{duplicate_content_count} duplicate") if documents_failed > 0: warning_parts.append(f"{documents_failed} failed") - warning_message = ", ".join(warning_parts) if warning_parts else None - # Prepare result message with user-friendly notification about skipped content - result_message = None - if skipped_pages: - result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" - else: - result_message = f"Processed {total_processed} pages." - - # Add user-friendly message about skipped Notion AI content + notification_parts: list[str] = [] if pages_with_skipped_ai_content > 0: - result_message += ( - " Audio transcriptions and AI summaries from Notion aren't accessible " - "via their API - all other content was saved." + notification_parts.append( + "Some Notion AI content couldn't be synced (API limitation)" ) + if notion_client.is_using_legacy_token(): + notification_parts.append( + "Using legacy token. Reconnect with OAuth for better reliability." + ) + if warning_parts: + notification_parts.append(", ".join(warning_parts)) + + user_notification_message = ( + " ".join(notification_parts) if notification_parts else None + ) - # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Notion indexing for connector {connector_id}", { - "pages_processed": total_processed, + "pages_processed": documents_indexed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "documents_failed": documents_failed, "duplicate_content_count": duplicate_content_count, - "skipped_pages_count": len(skipped_pages), "pages_with_skipped_ai_content": pages_with_skipped_ai_content, - "result_message": result_message, }, ) @@ -581,35 +403,9 @@ async def index_notion_pages( f"({duplicate_content_count} duplicate content)" ) - # Clean up the async client await notion_client.close() - # Build user-friendly notification messages - # This will be shown in the notification to inform users - notification_parts = [] - - if pages_with_skipped_ai_content > 0: - notification_parts.append( - "Some Notion AI content couldn't be synced (API limitation)" - ) - - if notion_client.is_using_legacy_token(): - notification_parts.append( - "Using legacy token. Reconnect with OAuth for better reliability." - ) - - # Include warning message if there were issues - if warning_message: - notification_parts.append(warning_message) - - user_notification_message = ( - " ".join(notification_parts) if notification_parts else None - ) - - return ( - total_processed, - user_notification_message, - ) + return documents_indexed, documents_skipped, user_notification_message except SQLAlchemyError as db_error: await session.rollback() @@ -622,10 +418,9 @@ async def index_notion_pages( logger.error( f"Database error during Notion indexing: {db_error!s}", exc_info=True ) - # Clean up the async client in case of error if "notion_client" in locals(): await notion_client.close() - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -635,7 +430,6 @@ async def index_notion_pages( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Notion pages: {e!s}", exc_info=True) - # Clean up the async client in case of error if "notion_client" in locals(): await notion_client.close() - return 0, f"Failed to index Notion pages: {e!s}" + return 0, 0, f"Failed to index Notion pages: {e!s}" diff --git a/surfsense_backend/tests/unit/connector_indexers/test_linear_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_linear_parallel.py new file mode 100644 index 000000000..b0ea48644 --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_linear_parallel.py @@ -0,0 +1,355 @@ +"""Tests for Linear indexer migrated to the unified parallel pipeline.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import app.tasks.connector_indexers.linear_indexer as _mod +from app.db import DocumentType +from app.tasks.connector_indexers.linear_indexer import ( + _build_connector_doc, + index_linear_issues, +) + +pytestmark = pytest.mark.unit + +_USER_ID = "00000000-0000-0000-0000-000000000001" +_CONNECTOR_ID = 42 +_SEARCH_SPACE_ID = 1 + + +def _make_issue( + issue_id: str = "issue-1", + identifier: str = "ENG-1", + title: str = "Fix bug", +): + return {"id": issue_id, "identifier": identifier, "title": title} + + +def _make_formatted_issue( + issue_id: str = "issue-1", + identifier: str = "ENG-1", + title: str = "Fix bug", + state: str = "In Progress", + priority: str = "High", + comments=None, +): + return { + "id": issue_id, + "identifier": identifier, + "title": title, + "state": state, + "priority": priority, + "description": "Some description", + "comments": comments or [], + } + + +# --------------------------------------------------------------------------- +# Slice 1: _build_connector_doc tracer bullet +# --------------------------------------------------------------------------- + + +async def test_build_connector_doc_produces_correct_fields(): + """Tracer bullet: a Linear issue produces a ConnectorDocument with correct fields.""" + issue = _make_issue(issue_id="abc-123", identifier="ENG-42", title="Fix login bug") + formatted = _make_formatted_issue( + issue_id="abc-123", + identifier="ENG-42", + title="Fix login bug", + state="Done", + priority="Urgent", + comments=[{"id": "c1"}], + ) + markdown = "# ENG-42: Fix login bug\n\nDescription here" + + doc = _build_connector_doc( + issue, + formatted, + markdown, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert doc.title == "ENG-42: Fix login bug" + assert doc.unique_id == "abc-123" + assert doc.document_type == DocumentType.LINEAR_CONNECTOR + assert doc.source_markdown == markdown + assert doc.search_space_id == _SEARCH_SPACE_ID + assert doc.connector_id == _CONNECTOR_ID + assert doc.created_by_id == _USER_ID + assert doc.should_summarize is True + assert doc.metadata["issue_id"] == "abc-123" + assert doc.metadata["issue_identifier"] == "ENG-42" + assert doc.metadata["issue_title"] == "Fix login bug" + assert doc.metadata["state"] == "Done" + assert doc.metadata["priority"] == "Urgent" + assert doc.metadata["comment_count"] == 1 + assert doc.metadata["connector_id"] == _CONNECTOR_ID + assert doc.metadata["document_type"] == "Linear Issue" + assert doc.metadata["connector_type"] == "Linear" + assert doc.fallback_summary is not None + assert "ENG-42" in doc.fallback_summary + assert markdown in doc.fallback_summary + + +async def test_build_connector_doc_summary_disabled(): + """When enable_summary is False, should_summarize is False.""" + doc = _build_connector_doc( + _make_issue(), + _make_formatted_issue(), + "# content", + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=False, + ) + + assert doc.should_summarize is False + + +# --------------------------------------------------------------------------- +# Shared fixtures for Slices 2-6 +# --------------------------------------------------------------------------- + + +def _mock_connector(enable_summary: bool = True): + c = MagicMock() + c.config = {"access_token": "tok"} + c.enable_summary = enable_summary + c.last_indexed_at = None + return c + + +def _mock_linear_client(issues=None, error=None): + client = MagicMock() + client.get_issues_by_date_range = AsyncMock( + return_value=(issues if issues is not None else [], error), + ) + client.format_issue = MagicMock(side_effect=lambda i: _make_formatted_issue( + issue_id=i.get("id", ""), + identifier=i.get("identifier", ""), + title=i.get("title", ""), + )) + client.format_issue_to_markdown = MagicMock( + side_effect=lambda fi: f"# {fi.get('identifier', '')}: {fi.get('title', '')}\n\nContent" + ) + return client + + +@pytest.fixture +def linear_mocks(monkeypatch): + """Wire up all external boundary mocks for index_linear_issues.""" + mock_session = AsyncMock() + mock_session.no_autoflush = MagicMock() + + mock_connector = _mock_connector() + monkeypatch.setattr( + _mod, "get_connector_by_id", AsyncMock(return_value=mock_connector), + ) + + linear_client = _mock_linear_client(issues=[_make_issue()]) + monkeypatch.setattr( + _mod, "LinearConnector", MagicMock(return_value=linear_client), + ) + + monkeypatch.setattr( + _mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None), + ) + + monkeypatch.setattr( + _mod, "update_connector_last_indexed", AsyncMock(), + ) + + monkeypatch.setattr( + _mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")), + ) + + mock_task_logger = MagicMock() + mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock()) + mock_task_logger.log_task_progress = AsyncMock() + mock_task_logger.log_task_success = AsyncMock() + mock_task_logger.log_task_failure = AsyncMock() + monkeypatch.setattr( + _mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger), + ) + + batch_mock = AsyncMock(return_value=([], 1, 0)) + pipeline_mock = MagicMock() + pipeline_mock.index_batch_parallel = batch_mock + pipeline_mock.migrate_legacy_docs = AsyncMock() + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock), + ) + + return { + "session": mock_session, + "connector": mock_connector, + "linear_client": linear_client, + "task_logger": mock_task_logger, + "pipeline_mock": pipeline_mock, + "batch_mock": batch_mock, + } + + +async def _run_index(mocks, **overrides): + return await index_linear_issues( + session=mocks["session"], + connector_id=overrides.get("connector_id", _CONNECTOR_ID), + search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID), + user_id=overrides.get("user_id", _USER_ID), + start_date=overrides.get("start_date", "2025-01-01"), + end_date=overrides.get("end_date", "2025-12-31"), + update_last_indexed=overrides.get("update_last_indexed", True), + on_heartbeat_callback=overrides.get("on_heartbeat_callback"), + ) + + +# --------------------------------------------------------------------------- +# Slice 2: Full pipeline wiring +# --------------------------------------------------------------------------- + + +async def test_one_issue_calls_pipeline_and_returns_indexed_count(linear_mocks): + """One valid issue is passed to the pipeline and the indexed count is returned.""" + indexed, skipped, warning = await _run_index(linear_mocks) + + assert indexed == 1 + assert skipped == 0 + assert warning is None + + linear_mocks["batch_mock"].assert_called_once() + call_args = linear_mocks["batch_mock"].call_args + connector_docs = call_args[0][0] + assert len(connector_docs) == 1 + assert connector_docs[0].document_type == DocumentType.LINEAR_CONNECTOR + + +async def test_pipeline_called_with_max_concurrency_3(linear_mocks): + """index_batch_parallel is called with max_concurrency=3.""" + await _run_index(linear_mocks) + + call_kwargs = linear_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("max_concurrency") == 3 + + +async def test_migrate_legacy_docs_called_before_indexing(linear_mocks): + """migrate_legacy_docs is called on the pipeline before index_batch_parallel.""" + await _run_index(linear_mocks) + + linear_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once() + + +# --------------------------------------------------------------------------- +# Slice 3: Issue skipping (missing ID / title) +# --------------------------------------------------------------------------- + + +async def test_issues_with_missing_id_are_skipped(linear_mocks): + """Issues without id are skipped and not passed to the pipeline.""" + issues = [ + _make_issue(issue_id="valid-1", identifier="ENG-1", title="Valid"), + {"id": "", "identifier": "ENG-2", "title": "No ID"}, + ] + linear_mocks["linear_client"].get_issues_by_date_range.return_value = (issues, None) + + indexed, skipped, _ = await _run_index(linear_mocks) + + connector_docs = linear_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert connector_docs[0].unique_id == "valid-1" + assert skipped == 1 + + +async def test_issues_with_missing_title_are_skipped(linear_mocks): + """Issues without title are skipped.""" + issues = [ + _make_issue(issue_id="valid-1", identifier="ENG-1", title="Valid"), + {"id": "id-2", "identifier": "ENG-2", "title": ""}, + ] + linear_mocks["linear_client"].get_issues_by_date_range.return_value = (issues, None) + + indexed, skipped, _ = await _run_index(linear_mocks) + + connector_docs = linear_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 4: Duplicate content skipping +# --------------------------------------------------------------------------- + + +async def test_duplicate_content_issues_are_skipped(linear_mocks, monkeypatch): + """Issues whose content hash matches an existing document are skipped.""" + issues = [ + _make_issue(issue_id="new-1", identifier="ENG-1", title="New"), + _make_issue(issue_id="dup-1", identifier="ENG-2", title="Dup"), + ] + linear_mocks["linear_client"].get_issues_by_date_range.return_value = (issues, None) + + call_count = 0 + + async def _check_dup(session, content_hash): + nonlocal call_count + call_count += 1 + if call_count == 2: + dup = MagicMock() + dup.id = 99 + dup.document_type = "OTHER" + return dup + return None + + monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup) + + indexed, skipped, _ = await _run_index(linear_mocks) + + connector_docs = linear_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 5: Heartbeat callback forwarding +# --------------------------------------------------------------------------- + + +async def test_heartbeat_callback_forwarded_to_pipeline(linear_mocks): + """on_heartbeat_callback is passed through to index_batch_parallel.""" + heartbeat_cb = AsyncMock() + + await _run_index(linear_mocks, on_heartbeat_callback=heartbeat_cb) + + call_kwargs = linear_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("on_heartbeat") is heartbeat_cb + + +# --------------------------------------------------------------------------- +# Slice 6: Empty issues early return +# --------------------------------------------------------------------------- + + +async def test_empty_issues_returns_zero_tuple(linear_mocks): + """When no issues are found, returns (0, 0, None) and pipeline is not called.""" + linear_mocks["linear_client"].get_issues_by_date_range.return_value = ([], None) + + indexed, skipped, warning = await _run_index(linear_mocks) + + assert indexed == 0 + assert skipped == 0 + assert warning is None + + linear_mocks["batch_mock"].assert_not_called() + + +async def test_failed_docs_warning_in_result(linear_mocks): + """When documents fail indexing, the warning includes the count.""" + linear_mocks["batch_mock"].return_value = ([], 0, 2) + + _, _, warning = await _run_index(linear_mocks) + + assert warning is not None + assert "2 failed" in warning diff --git a/surfsense_backend/tests/unit/connector_indexers/test_notion_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_notion_parallel.py new file mode 100644 index 000000000..99fb8bad7 --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_notion_parallel.py @@ -0,0 +1,345 @@ +"""Tests for Notion indexer migrated to the unified parallel pipeline.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import app.tasks.connector_indexers.notion_indexer as _mod +from app.db import DocumentType +from app.tasks.connector_indexers.notion_indexer import ( + _build_connector_doc, + index_notion_pages, +) + +pytestmark = pytest.mark.unit + +_USER_ID = "00000000-0000-0000-0000-000000000001" +_CONNECTOR_ID = 42 +_SEARCH_SPACE_ID = 1 + + +def _make_page(page_id: str = "page-1", title: str = "Test Page", content=None): + if content is None: + content = [{"type": "paragraph", "content": "Hello world", "children": []}] + return {"page_id": page_id, "title": title, "content": content} + + +# --------------------------------------------------------------------------- +# Slice 1: _build_connector_doc tracer bullet +# --------------------------------------------------------------------------- + + +async def test_build_connector_doc_produces_correct_fields(): + """Tracer bullet: a single Notion page produces a ConnectorDocument with correct fields.""" + + page = _make_page(page_id="abc-123", title="My Notion Page") + markdown = "# My Notion Page\n\nHello world" + + doc = _build_connector_doc( + page, + markdown, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert doc.title == "My Notion Page" + assert doc.unique_id == "abc-123" + assert doc.document_type == DocumentType.NOTION_CONNECTOR + assert doc.source_markdown == markdown + assert doc.search_space_id == _SEARCH_SPACE_ID + assert doc.connector_id == _CONNECTOR_ID + assert doc.created_by_id == _USER_ID + assert doc.should_summarize is True + assert doc.metadata["page_title"] == "My Notion Page" + assert doc.metadata["page_id"] == "abc-123" + assert doc.metadata["connector_id"] == _CONNECTOR_ID + assert doc.metadata["document_type"] == "Notion Page" + assert doc.metadata["connector_type"] == "Notion" + assert doc.fallback_summary is not None + assert "My Notion Page" in doc.fallback_summary + assert markdown in doc.fallback_summary + + +async def test_build_connector_doc_summary_disabled(): + """When enable_summary is False, should_summarize is False.""" + doc = _build_connector_doc( + _make_page(), + "# content", + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=False, + ) + + assert doc.should_summarize is False + + +# --------------------------------------------------------------------------- +# Shared fixtures for Slices 2-7 (full index_notion_pages tests) +# --------------------------------------------------------------------------- + + +def _mock_connector(enable_summary: bool = True): + c = MagicMock() + c.config = {"access_token": "tok"} + c.enable_summary = enable_summary + c.last_indexed_at = None + return c + + +def _mock_notion_client(pages=None, skipped_count=0, legacy_token=False): + client = MagicMock() + client.get_all_pages = AsyncMock(return_value=pages if pages is not None else []) + client.get_skipped_content_count = MagicMock(return_value=skipped_count) + client.is_using_legacy_token = MagicMock(return_value=legacy_token) + client.close = AsyncMock() + client.set_retry_callback = MagicMock() + return client + + +@pytest.fixture +def notion_mocks(monkeypatch): + """Wire up all external boundary mocks for index_notion_pages.""" + mock_session = AsyncMock() + mock_session.no_autoflush = MagicMock() + + mock_connector = _mock_connector() + monkeypatch.setattr( + _mod, "get_connector_by_id", AsyncMock(return_value=mock_connector), + ) + + notion_client = _mock_notion_client(pages=[_make_page()]) + monkeypatch.setattr( + _mod, "NotionHistoryConnector", MagicMock(return_value=notion_client), + ) + + monkeypatch.setattr( + _mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None), + ) + + monkeypatch.setattr( + _mod, "update_connector_last_indexed", AsyncMock(), + ) + + monkeypatch.setattr( + _mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")), + ) + + monkeypatch.setattr( + _mod, "process_blocks", MagicMock(return_value="Converted markdown content"), + ) + + mock_task_logger = MagicMock() + mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock()) + mock_task_logger.log_task_progress = AsyncMock() + mock_task_logger.log_task_success = AsyncMock() + mock_task_logger.log_task_failure = AsyncMock() + monkeypatch.setattr( + _mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger), + ) + + batch_mock = AsyncMock(return_value=([], 1, 0)) + pipeline_mock = MagicMock() + pipeline_mock.index_batch_parallel = batch_mock + pipeline_mock.migrate_legacy_docs = AsyncMock() + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock), + ) + + return { + "session": mock_session, + "connector": mock_connector, + "notion_client": notion_client, + "task_logger": mock_task_logger, + "pipeline_mock": pipeline_mock, + "batch_mock": batch_mock, + } + + +async def _run_index(mocks, **overrides): + return await index_notion_pages( + session=mocks["session"], + connector_id=overrides.get("connector_id", _CONNECTOR_ID), + search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID), + user_id=overrides.get("user_id", _USER_ID), + start_date=overrides.get("start_date", "2025-01-01"), + end_date=overrides.get("end_date", "2025-12-31"), + update_last_indexed=overrides.get("update_last_indexed", True), + on_retry_callback=overrides.get("on_retry_callback"), + on_heartbeat_callback=overrides.get("on_heartbeat_callback"), + ) + + +# --------------------------------------------------------------------------- +# Slice 2: Full pipeline wiring +# --------------------------------------------------------------------------- + + +async def test_one_page_calls_pipeline_and_returns_indexed_count(notion_mocks): + """One valid page is passed to the pipeline and the indexed count is returned.""" + indexed, skipped, warning = await _run_index(notion_mocks) + + assert indexed == 1 + assert skipped == 0 + assert warning is None + + notion_mocks["batch_mock"].assert_called_once() + call_args = notion_mocks["batch_mock"].call_args + connector_docs = call_args[0][0] + assert len(connector_docs) == 1 + assert connector_docs[0].document_type == DocumentType.NOTION_CONNECTOR + + +async def test_pipeline_called_with_max_concurrency_3(notion_mocks): + """index_batch_parallel is called with max_concurrency=3.""" + await _run_index(notion_mocks) + + call_kwargs = notion_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("max_concurrency") == 3 + + +async def test_migrate_legacy_docs_called_before_indexing(notion_mocks): + """migrate_legacy_docs is called on the pipeline before index_batch_parallel.""" + await _run_index(notion_mocks) + + notion_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once() + + +# --------------------------------------------------------------------------- +# Slice 3: Page skipping (no content / missing ID) +# --------------------------------------------------------------------------- + + +async def test_pages_with_missing_id_are_skipped(notion_mocks, monkeypatch): + """Pages without page_id are skipped and not passed to the pipeline.""" + pages = [ + _make_page(page_id="valid-1"), + {"title": "No ID page", "content": [{"type": "paragraph", "content": "text", "children": []}]}, + ] + notion_mocks["notion_client"].get_all_pages.return_value = pages + + _, skipped, _ = await _run_index(notion_mocks) + + connector_docs = notion_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert connector_docs[0].unique_id == "valid-1" + assert skipped == 1 + + +async def test_pages_with_no_content_are_skipped(notion_mocks, monkeypatch): + """Pages with empty content are skipped.""" + pages = [ + _make_page(page_id="valid-1"), + _make_page(page_id="empty-1", content=[]), + ] + notion_mocks["notion_client"].get_all_pages.return_value = pages + + _, skipped, _ = await _run_index(notion_mocks) + + connector_docs = notion_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 4: Duplicate content skipping +# --------------------------------------------------------------------------- + + +async def test_duplicate_content_pages_are_skipped(notion_mocks, monkeypatch): + """Pages whose content hash matches an existing document are skipped.""" + pages = [ + _make_page(page_id="new-1"), + _make_page(page_id="dup-1"), + ] + notion_mocks["notion_client"].get_all_pages.return_value = pages + + call_count = 0 + + async def _check_dup(session, content_hash): + nonlocal call_count + call_count += 1 + if call_count == 2: + dup = MagicMock() + dup.id = 99 + dup.document_type = "OTHER" + return dup + return None + + monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup) + + _, skipped, _ = await _run_index(notion_mocks) + + connector_docs = notion_mocks["batch_mock"].call_args[0][0] + assert len(connector_docs) == 1 + assert skipped == 1 + + +# --------------------------------------------------------------------------- +# Slice 5: Heartbeat callback forwarding +# --------------------------------------------------------------------------- + + +async def test_heartbeat_callback_forwarded_to_pipeline(notion_mocks): + """on_heartbeat_callback is passed through to index_batch_parallel.""" + heartbeat_cb = AsyncMock() + + await _run_index(notion_mocks, on_heartbeat_callback=heartbeat_cb) + + call_kwargs = notion_mocks["batch_mock"].call_args[1] + assert call_kwargs.get("on_heartbeat") is heartbeat_cb + + +# --------------------------------------------------------------------------- +# Slice 6: Notion-specific warning messages +# --------------------------------------------------------------------------- + + +async def test_skipped_ai_content_warning_in_result(notion_mocks): + """When Notion AI content was skipped, the warning message includes it.""" + notion_mocks["notion_client"].get_skipped_content_count.return_value = 3 + + _, _, warning = await _run_index(notion_mocks) + + assert warning is not None + assert "API limitation" in warning + + +async def test_legacy_token_warning_in_result(notion_mocks): + """When using legacy token, the warning message includes a notice.""" + notion_mocks["notion_client"].is_using_legacy_token.return_value = True + + _, _, warning = await _run_index(notion_mocks) + + assert warning is not None + assert "legacy token" in warning.lower() + + +async def test_failed_docs_warning_in_result(notion_mocks): + """When documents fail indexing, the warning includes the count.""" + notion_mocks["batch_mock"].return_value = ([], 0, 2) + + _, _, warning = await _run_index(notion_mocks) + + assert warning is not None + assert "2 failed" in warning + + +# --------------------------------------------------------------------------- +# Slice 7: Empty pages early return +# --------------------------------------------------------------------------- + + +async def test_empty_pages_returns_zero_tuple(notion_mocks): + """When no pages are found, returns (0, 0, None) and updates last_indexed.""" + notion_mocks["notion_client"].get_all_pages.return_value = [] + + indexed, skipped, warning = await _run_index(notion_mocks) + + assert indexed == 0 + assert skipped == 0 + assert warning is None + + notion_mocks["batch_mock"].assert_not_called()