feat: migrate Linear and Notion indexers to unified parallel pipeline

- Refactored Linear and Notion indexers to utilize the shared IndexingPipelineService for improved document deduplication, summarization, chunking, and embedding with bounded parallel indexing.
- Updated the `_build_connector_doc` function in both indexers to create ConnectorDocument instances with enhanced metadata and fallback summaries.
- Modified the `index_linear_issues` and `index_notion_pages` functions to return a tuple of (indexed_count, skipped_count, warning_or_error_message) for better error handling and reporting.
- Added unit tests for both indexers to validate the new parallel processing logic and ensure correct document creation and indexing behavior.
This commit is contained in:
Anish Sarkar 2026-03-27 11:19:32 +05:30
parent da6bbcfe39
commit db6dd058dd
4 changed files with 944 additions and 615 deletions

View file

@ -1,48 +1,84 @@
"""
Linear connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding with bounded parallel indexing.
"""
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.linear_connector import LinearConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
issue: dict,
formatted_issue: dict,
issue_content: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Linear issue dict to a ConnectorDocument."""
issue_id = issue.get("id", "")
issue_identifier = issue.get("identifier", "")
issue_title = issue.get("title", "")
state = formatted_issue.get("state", "Unknown")
priority = formatted_issue.get("priority", "Unknown")
comment_count = len(formatted_issue.get("comments", []))
metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"priority": priority,
"comment_count": comment_count,
"connector_id": connector_id,
"document_type": "Linear Issue",
"connector_type": "Linear",
}
fallback_summary = (
f"Linear Issue {issue_identifier}: {issue_title}\n\n"
f"Status: {state}\n\n{issue_content}"
)
return ConnectorDocument(
title=f"{issue_identifier}: {issue_title}",
source_markdown=issue_content,
unique_id=issue_id,
document_type=DocumentType.LINEAR_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_linear_issues(
session: AsyncSession,
connector_id: int,
@ -52,26 +88,15 @@ async def index_linear_issues(
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
) -> tuple[int, int, str | None]:
"""
Index Linear issues and comments.
Args:
session: Database session
connector_id: ID of the Linear connector
search_space_id: ID of the search space to store documents in
user_id: ID of the user
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
Tuple containing (number of documents indexed, error message or None)
Tuple of (indexed_count, skipped_count, warning_or_error_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="linear_issues_indexing",
source="connector_indexing_task",
@ -85,7 +110,7 @@ async def index_linear_issues(
)
try:
# Get the connector
# ── Connector lookup ──────────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Retrieving Linear connector {connector_id} from database",
@ -104,11 +129,11 @@ async def index_linear_issues(
{"error_type": "ConnectorNotFound"},
)
return (
0,
0,
f"Connector with ID {connector_id} not found or is not a Linear connector",
)
# Check if access_token exists (support both new OAuth format and old API key format)
if not connector.config.get("access_token") and not connector.config.get(
"LINEAR_API_KEY"
):
@ -118,26 +143,22 @@ async def index_linear_issues(
"Missing Linear access token",
{"error_type": "MissingToken"},
)
return 0, "Linear access token not found in connector config"
return 0, 0, "Linear access token not found in connector config"
# Initialize Linear client with internal refresh capability
# ── Client init ───────────────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Initializing Linear client for connector {connector_id}",
{"stage": "client_initialization"},
)
# Create connector with session and connector_id for internal refresh
# Token refresh will happen automatically when needed
linear_client = LinearConnector(session=session, connector_id=connector_id)
# Handle 'undefined' string from frontend (treat as None)
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
@ -154,37 +175,34 @@ async def index_linear_issues(
},
)
# Get issues within date range
# ── Fetch issues ──────────────────────────────────────────────
try:
issues, error = await linear_client.get_issues_by_date_range(
start_date=start_date_str, end_date=end_date_str, include_comments=True
start_date=start_date_str,
end_date=end_date_str,
include_comments=True,
)
if error:
# Don't treat "No issues found" as an error that should stop indexing
if "No issues found" in error:
logger.info(f"No Linear issues found: {error}")
logger.info(
"No issues found is not a critical error, continuing with update"
)
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
logger.info(
f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
)
return 0, None
return 0, 0, None
else:
logger.error(f"Failed to get Linear issues: {error}")
return 0, f"Failed to get Linear issues: {error}"
return 0, 0, f"Failed to get Linear issues: {error}"
logger.info(f"Retrieved {len(issues)} issues from Linear API")
except Exception as e:
logger.error(f"Exception when calling Linear API: {e!s}", exc_info=True)
return 0, f"Failed to get Linear issues: {e!s}"
logger.error(
f"Exception when calling Linear API: {e!s}", exc_info=True
)
return 0, 0, f"Failed to get Linear issues: {e!s}"
if not issues:
logger.info("No Linear issues found for the specified date range")
@ -193,19 +211,12 @@ async def index_linear_issues(
session, connector, update_last_indexed
)
await session.commit()
logger.info(
f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
)
return 0, None # Return None instead of error message when no issues found
return 0, 0, None
# Track the number of documents indexed
documents_indexed = 0
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
documents_failed = 0 # Track issues that failed processing
skipped_issues = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
duplicate_content_count = 0
await task_logger.log_task_progress(
log_entry,
@ -213,13 +224,6 @@ async def index_linear_issues(
{"stage": "process_issues", "total_issues": len(issues)},
)
# =======================================================================
# PHASE 1: Analyze all issues, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
issues_to_process = [] # List of dicts with document and issue data
new_documents_created = False
for issue in issues:
try:
issue_id = issue.get("id", "")
@ -230,271 +234,102 @@ async def index_linear_issues(
logger.warning(
f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}"
)
skipped_issues.append(
f"{issue_identifier or 'Unknown'} (missing data)"
)
documents_skipped += 1
continue
# Format the issue first to get well-structured data
formatted_issue = linear_client.format_issue(issue)
# Convert issue to markdown format
issue_content = linear_client.format_issue_to_markdown(formatted_issue)
issue_content = linear_client.format_issue_to_markdown(
formatted_issue
)
if not issue_content:
logger.warning(
f"Skipping issue with no content: {issue_identifier} - {issue_title}"
)
skipped_issues.append(f"{issue_identifier} (no content)")
documents_skipped += 1
continue
# Generate unique identifier hash for this Linear issue
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.LINEAR_CONNECTOR, issue_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
state = formatted_issue.get("state", "Unknown")
description = formatted_issue.get("description", "")
comment_count = len(formatted_issue.get("comments", []))
priority = formatted_issue.get("priority", "Unknown")
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Linear issue {issue_identifier} unchanged. Skipping."
)
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
issues_to_process.append(
{
"document": existing_document,
"is_new": False,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"description": description,
"comment_count": comment_count,
"priority": priority,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Linear issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=f"{issue_identifier}: {issue_title}",
document_type=DocumentType.LINEAR_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
doc = _build_connector_doc(
issue,
formatted_issue,
issue_content,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
issues_to_process.append(
{
"document": document,
"is_new": True,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"description": description,
"comment_count": comment_count,
"priority": priority,
}
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
except Exception as e:
logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(issues_to_process)} documents")
for item in issues_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"issue_id": item["issue_identifier"],
"issue_title": item["issue_title"],
"state": item["state"],
"priority": item["priority"],
"comment_count": item["comment_count"],
"document_type": "Linear Issue",
"connector_type": "Linear",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["issue_content"], user_llm, document_metadata_for_summary
with session.no_autoflush:
duplicate = await check_duplicate_document_by_hash(
session, compute_content_hash(doc)
)
else:
summary_content = f"Linear Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['state']}\n\n{item['issue_content']}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["issue_content"])
# Update document to READY with actual content
document.title = f"{item['issue_identifier']}: {item['issue_title']}"
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"issue_id": item["issue_id"],
"issue_identifier": item["issue_identifier"],
"issue_title": item["issue_title"],
"state": item["state"],
"comment_count": item["comment_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
if duplicate:
logger.info(
f"Committing batch: {documents_indexed} Linear issues processed so far"
f"Linear issue {doc.title} already indexed by another connector "
f"(existing document ID: {duplicate.id}, "
f"type: {duplicate.document_type}). Skipping."
)
await session.commit()
duplicate_content_count += 1
documents_skipped += 1
continue
connector_docs.append(doc)
except Exception as e:
logger.error(
f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}",
f"Error building ConnectorDocument for issue: {e!s}",
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
skipped_issues.append(
f"{item.get('issue_identifier', 'Unknown')} (processing error)"
)
documents_failed += 1
documents_skipped += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# ── Pipeline: migrate legacy docs + parallel index ────────────
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Linear issues processed")
logger.info(
f"Final commit: Total {documents_indexed} Linear issues processed"
)
try:
await session.commit()
logger.info(
"Successfully committed all Linear document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same issue was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Build warning message if there were issues
warning_parts = []
warning_parts: list[str] = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Linear indexing for connector {connector_id}",
@ -503,7 +338,7 @@ async def index_linear_issues(
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"skipped_issues_count": len(skipped_issues),
"duplicate_content_count": duplicate_content_count,
},
)
@ -511,7 +346,7 @@ async def index_linear_issues(
f"Linear indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed"
)
return documents_indexed, warning_message
return documents_indexed, documents_skipped, warning_message
except SQLAlchemyError as db_error:
await session.rollback()
@ -522,7 +357,7 @@ async def index_linear_issues(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -532,4 +367,4 @@ async def index_linear_issues(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Linear issues: {e!s}", exc_info=True)
return 0, f"Failed to index Linear issues: {e!s}"
return 0, 0, f"Failed to index Linear issues: {e!s}"

View file

@ -1,12 +1,10 @@
"""
Notion connector indexer.
Implements real-time document status updates using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (pending processing ready/failed)
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding with bounded parallel indexing.
"""
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
@ -14,42 +12,64 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.notion_history import NotionHistoryConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.notion_utils import process_blocks
from .base import (
build_document_metadata_string,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
# Type alias for retry callback
# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]]
# Type alias for heartbeat callback
# Signature: async callback(indexed_count) -> None
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
page: dict,
markdown_content: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Notion page dict to a ConnectorDocument."""
page_id = page.get("page_id", "")
page_title = page.get("title", f"Untitled page ({page_id})")
metadata = {
"page_title": page_title,
"page_id": page_id,
"connector_id": connector_id,
"document_type": "Notion Page",
"connector_type": "Notion",
}
fallback_summary = f"Notion Page: {page_title}\n\n{markdown_content}"
return ConnectorDocument(
title=page_title,
source_markdown=markdown_content,
unique_id=page_id,
document_type=DocumentType.NOTION_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_notion_pages(
session: AsyncSession,
connector_id: int,
@ -60,30 +80,15 @@ async def index_notion_pages(
update_last_indexed: bool = True,
on_retry_callback: RetryCallbackType | None = None,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
) -> tuple[int, int, str | None]:
"""
Index Notion pages from all accessible pages.
Args:
session: Database session
connector_id: ID of the Notion connector
search_space_id: ID of the search space to store documents in
user_id: ID of the user
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
on_retry_callback: Optional callback for retry progress notifications.
Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds)
retry_reason is one of: 'rate_limit', 'server_error', 'timeout'
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Called periodically with (indexed_count) to prevent task appearing stuck.
Returns:
Tuple containing (number of documents indexed, error message or None)
Tuple of (indexed_count, skipped_count, warning_or_error_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="notion_pages_indexing",
source="connector_indexing_task",
@ -97,7 +102,7 @@ async def index_notion_pages(
)
try:
# Get the connector
# ── Connector lookup ──────────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Retrieving Notion connector {connector_id} from database",
@ -116,11 +121,11 @@ async def index_notion_pages(
{"error_type": "ConnectorNotFound"},
)
return (
0,
0,
f"Connector with ID {connector_id} not found or is not a Notion connector",
)
# Check if access_token exists (support both new OAuth format and old integration token format)
if not connector.config.get("access_token") and not connector.config.get(
"NOTION_INTEGRATION_TOKEN"
):
@ -130,9 +135,9 @@ async def index_notion_pages(
"Missing Notion access token",
{"error_type": "MissingToken"},
)
return 0, "Notion access token not found in connector config"
return 0, 0, "Notion access token not found in connector config"
# Initialize Notion client with internal refresh capability
# ── Client init ───────────────────────────────────────────────
await task_logger.log_task_progress(
log_entry,
f"Initializing Notion client for connector {connector_id}",
@ -141,18 +146,15 @@ async def index_notion_pages(
logger.info(f"Initializing Notion client for connector {connector_id}")
# Handle 'undefined' string from frontend (treat as None)
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range using the shared utility function
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Convert YYYY-MM-DD to ISO format for Notion API
start_date_iso = datetime.strptime(start_date_str, "%Y-%m-%d").strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
@ -160,13 +162,10 @@ async def index_notion_pages(
"%Y-%m-%dT%H:%M:%SZ"
)
# Create connector with session and connector_id for internal refresh
# Token refresh will happen automatically when needed
notion_client = NotionHistoryConnector(
session=session, connector_id=connector_id
)
# Set retry callback if provided (for user notifications during rate limits)
if on_retry_callback:
notion_client.set_retry_callback(on_retry_callback)
@ -182,21 +181,19 @@ async def index_notion_pages(
},
)
# Get all pages
# ── Fetch pages ───────────────────────────────────────────────
try:
pages = await notion_client.get_all_pages(
start_date=start_date_iso, end_date=end_date_iso
)
logger.info(f"Found {len(pages)} Notion pages")
# Get count of pages that had unsupported content skipped
pages_with_skipped_content = notion_client.get_skipped_content_count()
if pages_with_skipped_content > 0:
logger.info(
f"{pages_with_skipped_content} pages had Notion AI content skipped (not available via API)"
)
# Check if using legacy integration token and log warning
if notion_client.is_using_legacy_token():
logger.warning(
f"Connector {connector_id} is using legacy integration token. "
@ -204,8 +201,6 @@ async def index_notion_pages(
)
except Exception as e:
error_str = str(e)
# Check if this is an unsupported block type error (transcription, ai_block, etc.)
# These are known Notion API limitations and should be logged as warnings, not errors
unsupported_block_errors = [
"transcription is not supported",
"ai_block is not supported",
@ -216,7 +211,6 @@ async def index_notion_pages(
)
if is_unsupported_block_error:
# Log as warning since this is a known Notion API limitation
logger.warning(
f"Notion API limitation for connector {connector_id}: {error_str}. "
"This is a known issue with Notion AI blocks (transcription, ai_block) "
@ -229,7 +223,6 @@ async def index_notion_pages(
{"error_type": "UnsupportedBlockType", "is_known_limitation": True},
)
else:
# Log as error for other failures
logger.error(
f"Error fetching Notion pages for connector {connector_id}: {error_str}",
exc_info=True,
@ -242,7 +235,7 @@ async def index_notion_pages(
)
await notion_client.close()
return 0, f"Failed to get Notion pages: {e!s}"
return 0, 0, f"Failed to get Notion pages: {e!s}"
if not pages:
await task_logger.log_task_success(
@ -252,21 +245,17 @@ async def index_notion_pages(
{"pages_found": 0},
)
logger.info("No Notion pages found to index")
# CRITICAL: Update timestamp even when no pages found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
await notion_client.close()
return 0, None # Success with 0 pages, not an error
return 0, 0, None
# Track the number of documents indexed
documents_indexed = 0
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
skipped_pages = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
await task_logger.log_task_progress(
log_entry,
@ -274,13 +263,6 @@ async def index_notion_pages(
{"stage": "process_pages", "total_pages": len(pages)},
)
# =======================================================================
# PHASE 1: Analyze all pages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
pages_to_process = [] # List of dicts with document and page data
new_documents_created = False
for page in pages:
try:
page_id = page.get("page_id")
@ -293,225 +275,71 @@ async def index_notion_pages(
if not page_content:
logger.info(f"No content found in page {page_title}. Skipping.")
skipped_pages.append(f"{page_title} (no content)")
documents_skipped += 1
continue
# Convert page content to markdown format
markdown_content = f"# Notion Page: {page_title}\n\n"
markdown_content += process_blocks(page_content)
# Format document metadata
metadata_sections = [
("METADATA", [f"PAGE_TITLE: {page_title}", f"PAGE_ID: {page_id}"]),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
markdown_content,
"TEXT_END",
],
),
]
# Build the document string
combined_document_string = build_document_metadata_string(
metadata_sections
)
# Generate unique identifier hash for this Notion page
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.NOTION_CONNECTOR, page_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
}
if not markdown_content.strip():
logger.warning(
f"Skipping page with empty markdown: {page_title}"
)
documents_skipped += 1
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
doc = _build_connector_doc(
page,
markdown_content,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
if duplicate_by_content:
with session.no_autoflush:
duplicate = await check_duplicate_document_by_hash(
session, compute_content_hash(doc)
)
if duplicate:
logger.info(
f"Notion page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
f"Notion page {doc.title} already indexed by another connector "
f"(existing document ID: {duplicate.id}, "
f"type: {duplicate.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=page_title,
document_type=DocumentType.NOTION_CONNECTOR,
document_metadata={
"page_title": page_title,
"page_id": page_id,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
pages_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
}
)
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(pages_to_process)} documents")
for item in pages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
logger.error(
f"Error building ConnectorDocument for page: {e!s}",
exc_info=True,
)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"page_title": item["page_title"],
"page_id": item["page_id"],
"document_type": "Notion Page",
"connector_type": "Notion",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
summary_content = f"Notion Page: {item['page_title']}\n\n{item['markdown_content']}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item["page_title"]
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"page_title": item["page_title"],
"page_id": item["page_id"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Notion pages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Notion page: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
skipped_pages.append(f"{item['page_title']} (processing error)")
documents_failed += 1
documents_skipped += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# ── Pipeline: migrate legacy docs + parallel index ────────────
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)
total_processed = documents_indexed
# Final commit to ensure all documents are persisted (safety net)
logger.info(f"Final commit: Total {documents_indexed} documents processed")
try:
await session.commit()
@ -519,59 +347,53 @@ async def index_notion_pages(
"Successfully committed all Notion document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same page was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Get final count of pages with skipped Notion AI content
# ── Build warning / notification message ──────────────────────
pages_with_skipped_ai_content = notion_client.get_skipped_content_count()
# Build warning message if there were issues
warning_parts = []
warning_parts: list[str] = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Prepare result message with user-friendly notification about skipped content
result_message = None
if skipped_pages:
result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}"
else:
result_message = f"Processed {total_processed} pages."
# Add user-friendly message about skipped Notion AI content
notification_parts: list[str] = []
if pages_with_skipped_ai_content > 0:
result_message += (
" Audio transcriptions and AI summaries from Notion aren't accessible "
"via their API - all other content was saved."
notification_parts.append(
"Some Notion AI content couldn't be synced (API limitation)"
)
if notion_client.is_using_legacy_token():
notification_parts.append(
"Using legacy token. Reconnect with OAuth for better reliability."
)
if warning_parts:
notification_parts.append(", ".join(warning_parts))
user_notification_message = (
" ".join(notification_parts) if notification_parts else None
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Notion indexing for connector {connector_id}",
{
"pages_processed": total_processed,
"pages_processed": documents_indexed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_pages_count": len(skipped_pages),
"pages_with_skipped_ai_content": pages_with_skipped_ai_content,
"result_message": result_message,
},
)
@ -581,35 +403,9 @@ async def index_notion_pages(
f"({duplicate_content_count} duplicate content)"
)
# Clean up the async client
await notion_client.close()
# Build user-friendly notification messages
# This will be shown in the notification to inform users
notification_parts = []
if pages_with_skipped_ai_content > 0:
notification_parts.append(
"Some Notion AI content couldn't be synced (API limitation)"
)
if notion_client.is_using_legacy_token():
notification_parts.append(
"Using legacy token. Reconnect with OAuth for better reliability."
)
# Include warning message if there were issues
if warning_message:
notification_parts.append(warning_message)
user_notification_message = (
" ".join(notification_parts) if notification_parts else None
)
return (
total_processed,
user_notification_message,
)
return documents_indexed, documents_skipped, user_notification_message
except SQLAlchemyError as db_error:
await session.rollback()
@ -622,10 +418,9 @@ async def index_notion_pages(
logger.error(
f"Database error during Notion indexing: {db_error!s}", exc_info=True
)
# Clean up the async client in case of error
if "notion_client" in locals():
await notion_client.close()
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -635,7 +430,6 @@ async def index_notion_pages(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Notion pages: {e!s}", exc_info=True)
# Clean up the async client in case of error
if "notion_client" in locals():
await notion_client.close()
return 0, f"Failed to index Notion pages: {e!s}"
return 0, 0, f"Failed to index Notion pages: {e!s}"

View file

@ -0,0 +1,355 @@
"""Tests for Linear indexer migrated to the unified parallel pipeline."""
from unittest.mock import AsyncMock, MagicMock
import pytest
import app.tasks.connector_indexers.linear_indexer as _mod
from app.db import DocumentType
from app.tasks.connector_indexers.linear_indexer import (
_build_connector_doc,
index_linear_issues,
)
pytestmark = pytest.mark.unit
_USER_ID = "00000000-0000-0000-0000-000000000001"
_CONNECTOR_ID = 42
_SEARCH_SPACE_ID = 1
def _make_issue(
issue_id: str = "issue-1",
identifier: str = "ENG-1",
title: str = "Fix bug",
):
return {"id": issue_id, "identifier": identifier, "title": title}
def _make_formatted_issue(
issue_id: str = "issue-1",
identifier: str = "ENG-1",
title: str = "Fix bug",
state: str = "In Progress",
priority: str = "High",
comments=None,
):
return {
"id": issue_id,
"identifier": identifier,
"title": title,
"state": state,
"priority": priority,
"description": "Some description",
"comments": comments or [],
}
# ---------------------------------------------------------------------------
# Slice 1: _build_connector_doc tracer bullet
# ---------------------------------------------------------------------------
async def test_build_connector_doc_produces_correct_fields():
"""Tracer bullet: a Linear issue produces a ConnectorDocument with correct fields."""
issue = _make_issue(issue_id="abc-123", identifier="ENG-42", title="Fix login bug")
formatted = _make_formatted_issue(
issue_id="abc-123",
identifier="ENG-42",
title="Fix login bug",
state="Done",
priority="Urgent",
comments=[{"id": "c1"}],
)
markdown = "# ENG-42: Fix login bug\n\nDescription here"
doc = _build_connector_doc(
issue,
formatted,
markdown,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert doc.title == "ENG-42: Fix login bug"
assert doc.unique_id == "abc-123"
assert doc.document_type == DocumentType.LINEAR_CONNECTOR
assert doc.source_markdown == markdown
assert doc.search_space_id == _SEARCH_SPACE_ID
assert doc.connector_id == _CONNECTOR_ID
assert doc.created_by_id == _USER_ID
assert doc.should_summarize is True
assert doc.metadata["issue_id"] == "abc-123"
assert doc.metadata["issue_identifier"] == "ENG-42"
assert doc.metadata["issue_title"] == "Fix login bug"
assert doc.metadata["state"] == "Done"
assert doc.metadata["priority"] == "Urgent"
assert doc.metadata["comment_count"] == 1
assert doc.metadata["connector_id"] == _CONNECTOR_ID
assert doc.metadata["document_type"] == "Linear Issue"
assert doc.metadata["connector_type"] == "Linear"
assert doc.fallback_summary is not None
assert "ENG-42" in doc.fallback_summary
assert markdown in doc.fallback_summary
async def test_build_connector_doc_summary_disabled():
"""When enable_summary is False, should_summarize is False."""
doc = _build_connector_doc(
_make_issue(),
_make_formatted_issue(),
"# content",
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=False,
)
assert doc.should_summarize is False
# ---------------------------------------------------------------------------
# Shared fixtures for Slices 2-6
# ---------------------------------------------------------------------------
def _mock_connector(enable_summary: bool = True):
c = MagicMock()
c.config = {"access_token": "tok"}
c.enable_summary = enable_summary
c.last_indexed_at = None
return c
def _mock_linear_client(issues=None, error=None):
client = MagicMock()
client.get_issues_by_date_range = AsyncMock(
return_value=(issues if issues is not None else [], error),
)
client.format_issue = MagicMock(side_effect=lambda i: _make_formatted_issue(
issue_id=i.get("id", ""),
identifier=i.get("identifier", ""),
title=i.get("title", ""),
))
client.format_issue_to_markdown = MagicMock(
side_effect=lambda fi: f"# {fi.get('identifier', '')}: {fi.get('title', '')}\n\nContent"
)
return client
@pytest.fixture
def linear_mocks(monkeypatch):
"""Wire up all external boundary mocks for index_linear_issues."""
mock_session = AsyncMock()
mock_session.no_autoflush = MagicMock()
mock_connector = _mock_connector()
monkeypatch.setattr(
_mod, "get_connector_by_id", AsyncMock(return_value=mock_connector),
)
linear_client = _mock_linear_client(issues=[_make_issue()])
monkeypatch.setattr(
_mod, "LinearConnector", MagicMock(return_value=linear_client),
)
monkeypatch.setattr(
_mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None),
)
monkeypatch.setattr(
_mod, "update_connector_last_indexed", AsyncMock(),
)
monkeypatch.setattr(
_mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")),
)
mock_task_logger = MagicMock()
mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock())
mock_task_logger.log_task_progress = AsyncMock()
mock_task_logger.log_task_success = AsyncMock()
mock_task_logger.log_task_failure = AsyncMock()
monkeypatch.setattr(
_mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger),
)
batch_mock = AsyncMock(return_value=([], 1, 0))
pipeline_mock = MagicMock()
pipeline_mock.index_batch_parallel = batch_mock
pipeline_mock.migrate_legacy_docs = AsyncMock()
monkeypatch.setattr(
_mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock),
)
return {
"session": mock_session,
"connector": mock_connector,
"linear_client": linear_client,
"task_logger": mock_task_logger,
"pipeline_mock": pipeline_mock,
"batch_mock": batch_mock,
}
async def _run_index(mocks, **overrides):
return await index_linear_issues(
session=mocks["session"],
connector_id=overrides.get("connector_id", _CONNECTOR_ID),
search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID),
user_id=overrides.get("user_id", _USER_ID),
start_date=overrides.get("start_date", "2025-01-01"),
end_date=overrides.get("end_date", "2025-12-31"),
update_last_indexed=overrides.get("update_last_indexed", True),
on_heartbeat_callback=overrides.get("on_heartbeat_callback"),
)
# ---------------------------------------------------------------------------
# Slice 2: Full pipeline wiring
# ---------------------------------------------------------------------------
async def test_one_issue_calls_pipeline_and_returns_indexed_count(linear_mocks):
"""One valid issue is passed to the pipeline and the indexed count is returned."""
indexed, skipped, warning = await _run_index(linear_mocks)
assert indexed == 1
assert skipped == 0
assert warning is None
linear_mocks["batch_mock"].assert_called_once()
call_args = linear_mocks["batch_mock"].call_args
connector_docs = call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].document_type == DocumentType.LINEAR_CONNECTOR
async def test_pipeline_called_with_max_concurrency_3(linear_mocks):
"""index_batch_parallel is called with max_concurrency=3."""
await _run_index(linear_mocks)
call_kwargs = linear_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("max_concurrency") == 3
async def test_migrate_legacy_docs_called_before_indexing(linear_mocks):
"""migrate_legacy_docs is called on the pipeline before index_batch_parallel."""
await _run_index(linear_mocks)
linear_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once()
# ---------------------------------------------------------------------------
# Slice 3: Issue skipping (missing ID / title)
# ---------------------------------------------------------------------------
async def test_issues_with_missing_id_are_skipped(linear_mocks):
"""Issues without id are skipped and not passed to the pipeline."""
issues = [
_make_issue(issue_id="valid-1", identifier="ENG-1", title="Valid"),
{"id": "", "identifier": "ENG-2", "title": "No ID"},
]
linear_mocks["linear_client"].get_issues_by_date_range.return_value = (issues, None)
indexed, skipped, _ = await _run_index(linear_mocks)
connector_docs = linear_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].unique_id == "valid-1"
assert skipped == 1
async def test_issues_with_missing_title_are_skipped(linear_mocks):
"""Issues without title are skipped."""
issues = [
_make_issue(issue_id="valid-1", identifier="ENG-1", title="Valid"),
{"id": "id-2", "identifier": "ENG-2", "title": ""},
]
linear_mocks["linear_client"].get_issues_by_date_range.return_value = (issues, None)
indexed, skipped, _ = await _run_index(linear_mocks)
connector_docs = linear_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 4: Duplicate content skipping
# ---------------------------------------------------------------------------
async def test_duplicate_content_issues_are_skipped(linear_mocks, monkeypatch):
"""Issues whose content hash matches an existing document are skipped."""
issues = [
_make_issue(issue_id="new-1", identifier="ENG-1", title="New"),
_make_issue(issue_id="dup-1", identifier="ENG-2", title="Dup"),
]
linear_mocks["linear_client"].get_issues_by_date_range.return_value = (issues, None)
call_count = 0
async def _check_dup(session, content_hash):
nonlocal call_count
call_count += 1
if call_count == 2:
dup = MagicMock()
dup.id = 99
dup.document_type = "OTHER"
return dup
return None
monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup)
indexed, skipped, _ = await _run_index(linear_mocks)
connector_docs = linear_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 5: Heartbeat callback forwarding
# ---------------------------------------------------------------------------
async def test_heartbeat_callback_forwarded_to_pipeline(linear_mocks):
"""on_heartbeat_callback is passed through to index_batch_parallel."""
heartbeat_cb = AsyncMock()
await _run_index(linear_mocks, on_heartbeat_callback=heartbeat_cb)
call_kwargs = linear_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("on_heartbeat") is heartbeat_cb
# ---------------------------------------------------------------------------
# Slice 6: Empty issues early return
# ---------------------------------------------------------------------------
async def test_empty_issues_returns_zero_tuple(linear_mocks):
"""When no issues are found, returns (0, 0, None) and pipeline is not called."""
linear_mocks["linear_client"].get_issues_by_date_range.return_value = ([], None)
indexed, skipped, warning = await _run_index(linear_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
linear_mocks["batch_mock"].assert_not_called()
async def test_failed_docs_warning_in_result(linear_mocks):
"""When documents fail indexing, the warning includes the count."""
linear_mocks["batch_mock"].return_value = ([], 0, 2)
_, _, warning = await _run_index(linear_mocks)
assert warning is not None
assert "2 failed" in warning

View file

@ -0,0 +1,345 @@
"""Tests for Notion indexer migrated to the unified parallel pipeline."""
from unittest.mock import AsyncMock, MagicMock
import pytest
import app.tasks.connector_indexers.notion_indexer as _mod
from app.db import DocumentType
from app.tasks.connector_indexers.notion_indexer import (
_build_connector_doc,
index_notion_pages,
)
pytestmark = pytest.mark.unit
_USER_ID = "00000000-0000-0000-0000-000000000001"
_CONNECTOR_ID = 42
_SEARCH_SPACE_ID = 1
def _make_page(page_id: str = "page-1", title: str = "Test Page", content=None):
if content is None:
content = [{"type": "paragraph", "content": "Hello world", "children": []}]
return {"page_id": page_id, "title": title, "content": content}
# ---------------------------------------------------------------------------
# Slice 1: _build_connector_doc tracer bullet
# ---------------------------------------------------------------------------
async def test_build_connector_doc_produces_correct_fields():
"""Tracer bullet: a single Notion page produces a ConnectorDocument with correct fields."""
page = _make_page(page_id="abc-123", title="My Notion Page")
markdown = "# My Notion Page\n\nHello world"
doc = _build_connector_doc(
page,
markdown,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert doc.title == "My Notion Page"
assert doc.unique_id == "abc-123"
assert doc.document_type == DocumentType.NOTION_CONNECTOR
assert doc.source_markdown == markdown
assert doc.search_space_id == _SEARCH_SPACE_ID
assert doc.connector_id == _CONNECTOR_ID
assert doc.created_by_id == _USER_ID
assert doc.should_summarize is True
assert doc.metadata["page_title"] == "My Notion Page"
assert doc.metadata["page_id"] == "abc-123"
assert doc.metadata["connector_id"] == _CONNECTOR_ID
assert doc.metadata["document_type"] == "Notion Page"
assert doc.metadata["connector_type"] == "Notion"
assert doc.fallback_summary is not None
assert "My Notion Page" in doc.fallback_summary
assert markdown in doc.fallback_summary
async def test_build_connector_doc_summary_disabled():
"""When enable_summary is False, should_summarize is False."""
doc = _build_connector_doc(
_make_page(),
"# content",
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=False,
)
assert doc.should_summarize is False
# ---------------------------------------------------------------------------
# Shared fixtures for Slices 2-7 (full index_notion_pages tests)
# ---------------------------------------------------------------------------
def _mock_connector(enable_summary: bool = True):
c = MagicMock()
c.config = {"access_token": "tok"}
c.enable_summary = enable_summary
c.last_indexed_at = None
return c
def _mock_notion_client(pages=None, skipped_count=0, legacy_token=False):
client = MagicMock()
client.get_all_pages = AsyncMock(return_value=pages if pages is not None else [])
client.get_skipped_content_count = MagicMock(return_value=skipped_count)
client.is_using_legacy_token = MagicMock(return_value=legacy_token)
client.close = AsyncMock()
client.set_retry_callback = MagicMock()
return client
@pytest.fixture
def notion_mocks(monkeypatch):
"""Wire up all external boundary mocks for index_notion_pages."""
mock_session = AsyncMock()
mock_session.no_autoflush = MagicMock()
mock_connector = _mock_connector()
monkeypatch.setattr(
_mod, "get_connector_by_id", AsyncMock(return_value=mock_connector),
)
notion_client = _mock_notion_client(pages=[_make_page()])
monkeypatch.setattr(
_mod, "NotionHistoryConnector", MagicMock(return_value=notion_client),
)
monkeypatch.setattr(
_mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None),
)
monkeypatch.setattr(
_mod, "update_connector_last_indexed", AsyncMock(),
)
monkeypatch.setattr(
_mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")),
)
monkeypatch.setattr(
_mod, "process_blocks", MagicMock(return_value="Converted markdown content"),
)
mock_task_logger = MagicMock()
mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock())
mock_task_logger.log_task_progress = AsyncMock()
mock_task_logger.log_task_success = AsyncMock()
mock_task_logger.log_task_failure = AsyncMock()
monkeypatch.setattr(
_mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger),
)
batch_mock = AsyncMock(return_value=([], 1, 0))
pipeline_mock = MagicMock()
pipeline_mock.index_batch_parallel = batch_mock
pipeline_mock.migrate_legacy_docs = AsyncMock()
monkeypatch.setattr(
_mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock),
)
return {
"session": mock_session,
"connector": mock_connector,
"notion_client": notion_client,
"task_logger": mock_task_logger,
"pipeline_mock": pipeline_mock,
"batch_mock": batch_mock,
}
async def _run_index(mocks, **overrides):
return await index_notion_pages(
session=mocks["session"],
connector_id=overrides.get("connector_id", _CONNECTOR_ID),
search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID),
user_id=overrides.get("user_id", _USER_ID),
start_date=overrides.get("start_date", "2025-01-01"),
end_date=overrides.get("end_date", "2025-12-31"),
update_last_indexed=overrides.get("update_last_indexed", True),
on_retry_callback=overrides.get("on_retry_callback"),
on_heartbeat_callback=overrides.get("on_heartbeat_callback"),
)
# ---------------------------------------------------------------------------
# Slice 2: Full pipeline wiring
# ---------------------------------------------------------------------------
async def test_one_page_calls_pipeline_and_returns_indexed_count(notion_mocks):
"""One valid page is passed to the pipeline and the indexed count is returned."""
indexed, skipped, warning = await _run_index(notion_mocks)
assert indexed == 1
assert skipped == 0
assert warning is None
notion_mocks["batch_mock"].assert_called_once()
call_args = notion_mocks["batch_mock"].call_args
connector_docs = call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].document_type == DocumentType.NOTION_CONNECTOR
async def test_pipeline_called_with_max_concurrency_3(notion_mocks):
"""index_batch_parallel is called with max_concurrency=3."""
await _run_index(notion_mocks)
call_kwargs = notion_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("max_concurrency") == 3
async def test_migrate_legacy_docs_called_before_indexing(notion_mocks):
"""migrate_legacy_docs is called on the pipeline before index_batch_parallel."""
await _run_index(notion_mocks)
notion_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once()
# ---------------------------------------------------------------------------
# Slice 3: Page skipping (no content / missing ID)
# ---------------------------------------------------------------------------
async def test_pages_with_missing_id_are_skipped(notion_mocks, monkeypatch):
"""Pages without page_id are skipped and not passed to the pipeline."""
pages = [
_make_page(page_id="valid-1"),
{"title": "No ID page", "content": [{"type": "paragraph", "content": "text", "children": []}]},
]
notion_mocks["notion_client"].get_all_pages.return_value = pages
_, skipped, _ = await _run_index(notion_mocks)
connector_docs = notion_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].unique_id == "valid-1"
assert skipped == 1
async def test_pages_with_no_content_are_skipped(notion_mocks, monkeypatch):
"""Pages with empty content are skipped."""
pages = [
_make_page(page_id="valid-1"),
_make_page(page_id="empty-1", content=[]),
]
notion_mocks["notion_client"].get_all_pages.return_value = pages
_, skipped, _ = await _run_index(notion_mocks)
connector_docs = notion_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 4: Duplicate content skipping
# ---------------------------------------------------------------------------
async def test_duplicate_content_pages_are_skipped(notion_mocks, monkeypatch):
"""Pages whose content hash matches an existing document are skipped."""
pages = [
_make_page(page_id="new-1"),
_make_page(page_id="dup-1"),
]
notion_mocks["notion_client"].get_all_pages.return_value = pages
call_count = 0
async def _check_dup(session, content_hash):
nonlocal call_count
call_count += 1
if call_count == 2:
dup = MagicMock()
dup.id = 99
dup.document_type = "OTHER"
return dup
return None
monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup)
_, skipped, _ = await _run_index(notion_mocks)
connector_docs = notion_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 5: Heartbeat callback forwarding
# ---------------------------------------------------------------------------
async def test_heartbeat_callback_forwarded_to_pipeline(notion_mocks):
"""on_heartbeat_callback is passed through to index_batch_parallel."""
heartbeat_cb = AsyncMock()
await _run_index(notion_mocks, on_heartbeat_callback=heartbeat_cb)
call_kwargs = notion_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("on_heartbeat") is heartbeat_cb
# ---------------------------------------------------------------------------
# Slice 6: Notion-specific warning messages
# ---------------------------------------------------------------------------
async def test_skipped_ai_content_warning_in_result(notion_mocks):
"""When Notion AI content was skipped, the warning message includes it."""
notion_mocks["notion_client"].get_skipped_content_count.return_value = 3
_, _, warning = await _run_index(notion_mocks)
assert warning is not None
assert "API limitation" in warning
async def test_legacy_token_warning_in_result(notion_mocks):
"""When using legacy token, the warning message includes a notice."""
notion_mocks["notion_client"].is_using_legacy_token.return_value = True
_, _, warning = await _run_index(notion_mocks)
assert warning is not None
assert "legacy token" in warning.lower()
async def test_failed_docs_warning_in_result(notion_mocks):
"""When documents fail indexing, the warning includes the count."""
notion_mocks["batch_mock"].return_value = ([], 0, 2)
_, _, warning = await _run_index(notion_mocks)
assert warning is not None
assert "2 failed" in warning
# ---------------------------------------------------------------------------
# Slice 7: Empty pages early return
# ---------------------------------------------------------------------------
async def test_empty_pages_returns_zero_tuple(notion_mocks):
"""When no pages are found, returns (0, 0, None) and updates last_indexed."""
notion_mocks["notion_client"].get_all_pages.return_value = []
indexed, skipped, warning = await _run_index(notion_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
notion_mocks["batch_mock"].assert_not_called()