feat: migrate Confluence and Jira indexers to unified parallel pipeline

- Refactored Confluence and Jira indexers to utilize the shared IndexingPipelineService for improved document processing.
- Updated the `_build_connector_doc` function in both indexers to create ConnectorDocument instances with enhanced metadata and fallback summaries.
- Modified the `index_confluence_pages` and `index_jira_issues` functions to return a tuple of (indexed_count, skipped_count, warning_or_error_message) for better error handling and reporting.
- Added unit tests for both indexers to validate the new parallel processing logic and ensure correct document creation and indexing behavior.
This commit is contained in:
Anish Sarkar 2026-03-27 16:02:09 +05:30
parent 22e36d00fc
commit 0bc1c766ff
4 changed files with 942 additions and 534 deletions

View file

@ -1,49 +1,74 @@
"""
Confluence connector indexer.
Provides real-time document status updates during indexing using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (PENDING PROCESSING READY/FAILED)
"""
"""Confluence connector indexer using the unified parallel indexing pipeline."""
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.confluence_history import ConfluenceHistoryConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
page: dict,
full_content: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Confluence page dict to a ConnectorDocument."""
page_id = page.get("id", "")
page_title = page.get("title", "")
space_id = page.get("spaceId", "")
comment_count = len(page.get("comments", []))
metadata = {
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
"connector_id": connector_id,
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
fallback_summary = (
f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n{full_content}"
)
return ConnectorDocument(
title=page_title,
source_markdown=full_content,
unique_id=page_id,
document_type=DocumentType.CONFLUENCE_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_confluence_pages(
session: AsyncSession,
connector_id: int,
@ -53,26 +78,9 @@ async def index_confluence_pages(
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
"""
Index Confluence pages and comments.
Args:
session: Database session
connector_id: ID of the Confluence connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
) -> tuple[int, int, str | None]:
"""Index Confluence pages and comments."""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="confluence_pages_indexing",
source="connector_indexing_task",
@ -86,7 +94,6 @@ async def index_confluence_pages(
)
try:
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.CONFLUENCE_CONNECTOR
)
@ -98,9 +105,8 @@ async def index_confluence_pages(
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return 0, f"Connector with ID {connector_id} not found"
return 0, 0, f"Connector with ID {connector_id} not found"
# Initialize Confluence OAuth client
await task_logger.log_task_progress(
log_entry,
f"Initializing Confluence OAuth client for connector {connector_id}",
@ -114,7 +120,6 @@ async def index_confluence_pages(
)
)
# Calculate date range
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
@ -129,19 +134,14 @@ async def index_confluence_pages(
},
)
# Get pages within date range
try:
pages, error = await confluence_client.get_pages_by_date_range(
start_date=start_date_str, end_date=end_date_str, include_comments=True
)
if error:
# Don't treat "No pages found" as an error that should stop indexing
if "No pages found" in error:
logger.info(f"No Confluence pages found: {error}")
logger.info(
"No pages found is not a critical error, continuing with update"
)
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
@ -156,11 +156,10 @@ async def index_confluence_pages(
f"No Confluence pages found in date range {start_date_str} to {end_date_str}",
{"pages_found": 0},
)
# Close client before returning
if confluence_client:
with contextlib.suppress(Exception):
await confluence_client.close()
return 0, None
return 0, 0, None
else:
logger.error(f"Failed to get Confluence pages: {error}")
await task_logger.log_task_failure(
@ -169,36 +168,35 @@ async def index_confluence_pages(
"API Error",
{"error_type": "APIError"},
)
# Close client on error
if confluence_client:
with contextlib.suppress(Exception):
await confluence_client.close()
return 0, f"Failed to get Confluence pages: {error}"
return 0, 0, f"Failed to get Confluence pages: {error}"
logger.info(f"Retrieved {len(pages)} pages from Confluence API")
except Exception as e:
logger.error(f"Error fetching Confluence pages: {e!s}", exc_info=True)
# Close client on error
if confluence_client:
with contextlib.suppress(Exception):
await confluence_client.close()
return 0, f"Error fetching Confluence pages: {e!s}"
return 0, 0, f"Error fetching Confluence pages: {e!s}"
if not pages:
logger.info("No Confluence pages found for the specified date range")
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
if confluence_client:
with contextlib.suppress(Exception):
await confluence_client.close()
return 0, 0, None
# =======================================================================
# PHASE 1: Analyze all pages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
pages_to_process = [] # List of dicts with document and page data
new_documents_created = False
connector_docs: list[ConnectorDocument] = []
for page in pages:
try:
@ -213,12 +211,10 @@ async def index_confluence_pages(
documents_skipped += 1
continue
# Extract page content
page_content = ""
if page.get("body") and page["body"].get("storage"):
page_content = page["body"]["storage"].get("value", "")
# Add comments to content
comments = page.get("comments", [])
comments_content = ""
if comments:
@ -235,61 +231,25 @@ async def index_confluence_pages(
comments_content += f"**Comment by {comment_author}** ({comment_date}):\n{comment_body}\n\n"
# Combine page content with comments
full_content = f"# {page_title}\n\n{page_content}{comments_content}"
if not full_content.strip():
if not page_content.strip() and not comments:
logger.warning(f"Skipping page with no content: {page_title}")
documents_skipped += 1
continue
# Generate unique identifier hash for this Confluence page
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CONFLUENCE_CONNECTOR, page_id, search_space_id
doc = _build_connector_doc(
page,
full_content,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
# Generate content hash
content_hash = generate_content_hash(full_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
comment_count = len(comments)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append(
{
"document": existing_document,
"is_new": False,
"full_content": full_content,
"page_content": page_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
session, compute_content_hash(doc)
)
if duplicate_by_content:
@ -302,151 +262,29 @@ async def index_confluence_pages(
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=page_title,
document_type=DocumentType.CONFLUENCE_CONNECTOR,
document_metadata={
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
pages_to_process.append(
{
"document": document,
"is_new": True,
"full_content": full_content,
"page_content": page_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
}
)
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
documents_failed += 1
logger.error(f"Error building ConnectorDocument for page: {e!s}", exc_info=True)
documents_skipped += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents"
)
await session.commit()
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(pages_to_process)} documents")
async def _get_llm(s: AsyncSession):
return await get_user_long_context_llm(s, user_id, search_space_id)
for item in pages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and connector.enable_summary:
document_metadata = {
"page_title": item["page_title"],
"page_id": item["page_id"],
"space_id": item["space_id"],
"comment_count": item["comment_count"],
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["full_content"], user_llm, document_metadata
)
else:
summary_content = f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n{item['full_content']}"
summary_embedding = embed_text(summary_content)
# Process chunks - using the full page content with comments
chunks = await create_document_chunks(item["full_content"])
# Update document to READY with actual content
document.title = item["page_title"]
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"page_id": item["page_id"],
"page_title": item["page_title"],
"space_id": item["space_id"],
"comment_count": item["comment_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Confluence pages processed so far"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing page {item.get('page_title', 'Unknown')}: {e!s}",
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue # Skip this page and continue with others
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
logger.info(
f"Final commit: Total {documents_indexed} Confluence pages processed"
)
@ -456,7 +294,6 @@ async def index_confluence_pages(
"Successfully committed all Confluence document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
@ -467,11 +304,9 @@ async def index_confluence_pages(
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
@ -479,7 +314,6 @@ async def index_confluence_pages(
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Confluence indexing for connector {connector_id}",
@ -490,22 +324,19 @@ async def index_confluence_pages(
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Confluence indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
# Close the client connection
if confluence_client:
await confluence_client.close()
return documents_indexed, warning_message
return documents_indexed, documents_skipped, warning_message
except SQLAlchemyError as db_error:
await session.rollback()
# Close client if it exists
if confluence_client:
with contextlib.suppress(Exception):
await confluence_client.close()
@ -516,10 +347,9 @@ async def index_confluence_pages(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
# Close client if it exists
if confluence_client:
with contextlib.suppress(Exception):
await confluence_client.close()
@ -530,4 +360,4 @@ async def index_confluence_pages(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Confluence pages: {e!s}", exc_info=True)
return 0, f"Failed to index Confluence pages: {e!s}"
return 0, 0, f"Failed to index Confluence pages: {e!s}"

View file

@ -1,49 +1,80 @@
"""
Jira connector indexer.
Provides real-time document status updates during indexing using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (PENDING PROCESSING READY/FAILED)
"""
"""Jira connector indexer using the unified parallel indexing pipeline."""
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.jira_history import JiraHistoryConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
issue: dict,
formatted_issue: dict,
issue_content: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Jira issue dict to a ConnectorDocument."""
issue_id = issue.get("key", "")
issue_identifier = issue.get("key", "")
issue_title = issue.get("id", "")
state = formatted_issue.get("status", "Unknown")
priority = formatted_issue.get("priority", "Unknown")
comment_count = len(formatted_issue.get("comments", []))
metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"priority": priority,
"comment_count": comment_count,
"connector_id": connector_id,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
fallback_summary = (
f"Jira Issue {issue_identifier}: {issue_title}\n\n"
f"Status: {state}\n\n{issue_content}"
)
return ConnectorDocument(
title=f"{issue_identifier}: {issue_title}",
source_markdown=issue_content,
unique_id=issue_id,
document_type=DocumentType.JIRA_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_jira_issues(
session: AsyncSession,
connector_id: int,
@ -53,26 +84,9 @@ async def index_jira_issues(
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
"""
Index Jira issues and comments.
Args:
session: Database session
connector_id: ID of the Jira connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
) -> tuple[int, int, str | None]:
"""Index Jira issues and comments."""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="jira_issues_indexing",
source="connector_indexing_task",
@ -86,7 +100,6 @@ async def index_jira_issues(
)
try:
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.JIRA_CONNECTOR
)
@ -98,24 +111,15 @@ async def index_jira_issues(
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return 0, f"Connector with ID {connector_id} not found"
return 0, 0, f"Connector with ID {connector_id} not found"
# Initialize Jira client with internal refresh capability
# Token refresh will happen automatically when needed
await task_logger.log_task_progress(
log_entry,
f"Initializing Jira client for connector {connector_id}",
{"stage": "client_initialization"},
)
logger.info(f"Initializing Jira client for connector {connector_id}")
# Create connector with session and connector_id for internal refresh
# Token refresh will happen automatically when needed
jira_client = JiraHistoryConnector(session=session, connector_id=connector_id)
# Calculate date range
# Handle "undefined" strings from frontend
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
@ -135,19 +139,14 @@ async def index_jira_issues(
},
)
# Get issues within date range
try:
issues, error = await jira_client.get_issues_by_date_range(
start_date=start_date_str, end_date=end_date_str, include_comments=True
)
if error:
# Don't treat "No issues found" as an error that should stop indexing
if "No issues found" in error:
logger.info(f"No Jira issues found: {error}")
logger.info(
"No issues found is not a critical error, continuing with update"
)
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
@ -162,7 +161,8 @@ async def index_jira_issues(
f"No Jira issues found in date range {start_date_str} to {end_date_str}",
{"issues_found": 0},
)
return 0, None
await jira_client.close()
return 0, 0, None
else:
logger.error(f"Failed to get Jira issues: {error}")
await task_logger.log_task_failure(
@ -171,29 +171,30 @@ async def index_jira_issues(
"API Error",
{"error_type": "APIError"},
)
return 0, f"Failed to get Jira issues: {error}"
await jira_client.close()
return 0, 0, f"Failed to get Jira issues: {error}"
logger.info(f"Retrieved {len(issues)} issues from Jira API")
except Exception as e:
logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True)
return 0, f"Error fetching Jira issues: {e!s}"
await jira_client.close()
return 0, 0, f"Error fetching Jira issues: {e!s}"
# =======================================================================
# PHASE 1: Analyze all issues, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
documents_indexed = 0
if not issues:
logger.info("No Jira issues found for the specified date range")
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
await jira_client.close()
return 0, 0, None
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
issues_to_process = [] # List of dicts with document and issue data
new_documents_created = False
for issue in issues:
try:
issue_id = issue.get("key")
@ -207,10 +208,7 @@ async def index_jira_issues(
documents_skipped += 1
continue
# Format the issue for better readability
formatted_issue = jira_client.format_issue(issue)
# Convert to markdown
issue_content = jira_client.format_issue_to_markdown(formatted_issue)
if not issue_content:
@ -220,53 +218,19 @@ async def index_jira_issues(
documents_skipped += 1
continue
# Generate unique identifier hash for this Jira issue
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.JIRA_CONNECTOR, issue_id, search_space_id
doc = _build_connector_doc(
issue,
formatted_issue,
issue_content,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
# Generate content hash
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
comment_count = len(formatted_issue.get("comments", []))
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
issues_to_process.append(
{
"document": existing_document,
"is_new": False,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"formatted_issue": formatted_issue,
"comment_count": comment_count,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
session, compute_content_hash(doc)
)
if duplicate_by_content:
@ -279,160 +243,37 @@ async def index_jira_issues(
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=f"{issue_identifier}: {issue_title}",
document_type=DocumentType.JIRA_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": formatted_issue.get("status", "Unknown"),
"comment_count": comment_count,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
issues_to_process.append(
{
"document": document,
"is_new": True,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"formatted_issue": formatted_issue,
"comment_count": comment_count,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(issues_to_process)} documents")
for item in issues_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm and connector.enable_summary:
document_metadata = {
"issue_key": item["issue_identifier"],
"issue_title": item["issue_title"],
"status": item["formatted_issue"].get("status", "Unknown"),
"priority": item["formatted_issue"].get("priority", "Unknown"),
"comment_count": item["comment_count"],
"document_type": "Jira Issue",
"connector_type": "Jira",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["issue_content"], user_llm, document_metadata
)
else:
summary_content = f"Jira Issue {item['issue_identifier']}: {item['issue_title']}\n\n{item['issue_content']}"
summary_embedding = embed_text(summary_content)
# Process chunks - using the full issue content with comments
chunks = await create_document_chunks(item["issue_content"])
# Update document to READY with actual content
document.title = f"{item['issue_identifier']}: {item['issue_title']}"
document.content = summary_content
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"issue_id": item["issue_id"],
"issue_identifier": item["issue_identifier"],
"issue_title": item["issue_title"],
"state": item["formatted_issue"].get("status", "Unknown"),
"comment_count": item["comment_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Jira issues processed so far"
)
await session.commit()
connector_docs.append(doc)
except Exception as e:
logger.error(
f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}",
f"Error building ConnectorDocument for issue {issue_identifier}: {e!s}",
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue # Skip this issue and continue with others
documents_skipped += 1
continue
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s: AsyncSession):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
logger.info(f"Final commit: Total {documents_indexed} Jira issues processed")
try:
await session.commit()
logger.info("Successfully committed all JIRA document changes to database")
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
@ -447,7 +288,6 @@ async def index_jira_issues(
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
@ -455,7 +295,6 @@ async def index_jira_issues(
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed JIRA indexing for connector {connector_id}",
@ -466,17 +305,13 @@ async def index_jira_issues(
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"JIRA indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
# Clean up the connector
await jira_client.close()
return documents_indexed, warning_message
return documents_indexed, documents_skipped, warning_message
except SQLAlchemyError as db_error:
await session.rollback()
@ -487,11 +322,10 @@ async def index_jira_issues(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
# Clean up the connector in case of error
if "jira_client" in locals():
with contextlib.suppress(Exception):
await jira_client.close()
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -501,8 +335,7 @@ async def index_jira_issues(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True)
# Clean up the connector in case of error
if "jira_client" in locals():
with contextlib.suppress(Exception):
await jira_client.close()
return 0, f"Failed to index JIRA issues: {e!s}"
return 0, 0, f"Failed to index JIRA issues: {e!s}"

View file

@ -0,0 +1,373 @@
"""Tests for Confluence indexer migrated to the unified parallel pipeline."""
from unittest.mock import AsyncMock, MagicMock
import pytest
import app.tasks.connector_indexers.confluence_indexer as _mod
from app.db import DocumentType
from app.tasks.connector_indexers.confluence_indexer import (
_build_connector_doc,
index_confluence_pages,
)
pytestmark = pytest.mark.unit
_USER_ID = "00000000-0000-0000-0000-000000000001"
_CONNECTOR_ID = 42
_SEARCH_SPACE_ID = 1
def _make_page(
page_id: str = "p1",
title: str = "Home",
space_id: str = "S1",
body: str = "<p>Hello</p>",
comments=None,
):
return {
"id": page_id,
"title": title,
"spaceId": space_id,
"body": {"storage": {"value": body}},
"comments": comments or [],
}
def _to_markdown(page: dict) -> str:
page_title = page.get("title", "")
page_content = page.get("body", {}).get("storage", {}).get("value", "")
comments = page.get("comments", [])
comments_content = ""
if comments:
comments_content = "\n\n## Comments\n\n"
for comment in comments:
comment_body = (
comment.get("body", {}).get("storage", {}).get("value", "")
)
comment_author = comment.get("version", {}).get("authorId", "Unknown")
comment_date = comment.get("version", {}).get("createdAt", "")
comments_content += (
f"**Comment by {comment_author}** ({comment_date}):\n"
f"{comment_body}\n\n"
)
return f"# {page_title}\n\n{page_content}{comments_content}"
# ---------------------------------------------------------------------------
# Slice 1: _build_connector_doc tracer bullet
# ---------------------------------------------------------------------------
async def test_build_connector_doc_produces_correct_fields():
page = _make_page(
page_id="abc-123",
title="Engineering Handbook",
space_id="ENG",
comments=[{"id": "c1"}],
)
markdown = _to_markdown(page)
doc = _build_connector_doc(
page,
markdown,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert doc.title == "Engineering Handbook"
assert doc.unique_id == "abc-123"
assert doc.document_type == DocumentType.CONFLUENCE_CONNECTOR
assert doc.source_markdown == markdown
assert doc.search_space_id == _SEARCH_SPACE_ID
assert doc.connector_id == _CONNECTOR_ID
assert doc.created_by_id == _USER_ID
assert doc.should_summarize is True
assert doc.metadata["page_id"] == "abc-123"
assert doc.metadata["page_title"] == "Engineering Handbook"
assert doc.metadata["space_id"] == "ENG"
assert doc.metadata["comment_count"] == 1
assert doc.metadata["connector_id"] == _CONNECTOR_ID
assert doc.metadata["document_type"] == "Confluence Page"
assert doc.metadata["connector_type"] == "Confluence"
assert doc.fallback_summary is not None
assert "Engineering Handbook" in doc.fallback_summary
assert markdown in doc.fallback_summary
async def test_build_connector_doc_summary_disabled():
doc = _build_connector_doc(
_make_page(),
_to_markdown(_make_page()),
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=False,
)
assert doc.should_summarize is False
# ---------------------------------------------------------------------------
# Shared fixtures for Slices 2-7
# ---------------------------------------------------------------------------
def _mock_connector(enable_summary: bool = True):
c = MagicMock()
c.config = {"access_token": "tok"}
c.enable_summary = enable_summary
c.last_indexed_at = None
return c
def _mock_confluence_client(pages=None, error=None):
client = MagicMock()
client.get_pages_by_date_range = AsyncMock(
return_value=(pages if pages is not None else [], error),
)
client.close = AsyncMock()
return client
@pytest.fixture
def confluence_mocks(monkeypatch):
mock_session = AsyncMock()
mock_session.no_autoflush = MagicMock()
mock_connector = _mock_connector()
monkeypatch.setattr(
_mod, "get_connector_by_id", AsyncMock(return_value=mock_connector),
)
confluence_client = _mock_confluence_client(pages=[_make_page()])
monkeypatch.setattr(
_mod, "ConfluenceHistoryConnector", MagicMock(return_value=confluence_client),
)
monkeypatch.setattr(
_mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None),
)
monkeypatch.setattr(
_mod, "update_connector_last_indexed", AsyncMock(),
)
monkeypatch.setattr(
_mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")),
)
mock_task_logger = MagicMock()
mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock())
mock_task_logger.log_task_progress = AsyncMock()
mock_task_logger.log_task_success = AsyncMock()
mock_task_logger.log_task_failure = AsyncMock()
monkeypatch.setattr(
_mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger),
)
batch_mock = AsyncMock(return_value=([], 1, 0))
pipeline_mock = MagicMock()
pipeline_mock.index_batch_parallel = batch_mock
pipeline_mock.migrate_legacy_docs = AsyncMock()
monkeypatch.setattr(
_mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock),
)
return {
"session": mock_session,
"connector": mock_connector,
"confluence_client": confluence_client,
"task_logger": mock_task_logger,
"pipeline_mock": pipeline_mock,
"batch_mock": batch_mock,
}
async def _run_index(mocks, **overrides):
return await index_confluence_pages(
session=mocks["session"],
connector_id=overrides.get("connector_id", _CONNECTOR_ID),
search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID),
user_id=overrides.get("user_id", _USER_ID),
start_date=overrides.get("start_date", "2025-01-01"),
end_date=overrides.get("end_date", "2025-12-31"),
update_last_indexed=overrides.get("update_last_indexed", True),
on_heartbeat_callback=overrides.get("on_heartbeat_callback"),
)
# ---------------------------------------------------------------------------
# Slice 2: Full pipeline wiring
# ---------------------------------------------------------------------------
async def test_one_page_calls_pipeline_and_returns_indexed_count(confluence_mocks):
indexed, skipped, warning = await _run_index(confluence_mocks)
assert indexed == 1
assert skipped == 0
assert warning is None
confluence_mocks["batch_mock"].assert_called_once()
connector_docs = confluence_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].document_type == DocumentType.CONFLUENCE_CONNECTOR
async def test_pipeline_called_with_max_concurrency_3(confluence_mocks):
await _run_index(confluence_mocks)
call_kwargs = confluence_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("max_concurrency") == 3
async def test_migrate_legacy_docs_called_before_indexing(confluence_mocks):
await _run_index(confluence_mocks)
confluence_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once()
# ---------------------------------------------------------------------------
# Slice 3: Page skipping (missing id/title/content)
# ---------------------------------------------------------------------------
async def test_pages_with_missing_id_are_skipped(confluence_mocks):
pages = [
_make_page(page_id="p1", title="Valid"),
_make_page(page_id="", title="Missing id"),
]
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
pages,
None,
)
_, skipped, _ = await _run_index(confluence_mocks)
connector_docs = confluence_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
async def test_pages_with_missing_title_are_skipped(confluence_mocks):
pages = [
_make_page(page_id="p1", title="Valid"),
_make_page(page_id="p2", title=""),
]
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
pages,
None,
)
_, skipped, _ = await _run_index(confluence_mocks)
connector_docs = confluence_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
async def test_pages_with_no_content_are_skipped(confluence_mocks):
pages = [
_make_page(page_id="p1", title="Valid", body="<p>ok</p>"),
_make_page(page_id="p2", title="Empty", body=""),
]
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
pages,
None,
)
_, skipped, _ = await _run_index(confluence_mocks)
connector_docs = confluence_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 4: Duplicate content skipping
# ---------------------------------------------------------------------------
async def test_duplicate_content_pages_are_skipped(confluence_mocks, monkeypatch):
pages = [
_make_page(page_id="p1", title="One"),
_make_page(page_id="p2", title="Two"),
]
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
pages,
None,
)
call_count = 0
async def _check_dup(session, content_hash):
nonlocal call_count
call_count += 1
if call_count == 2:
dup = MagicMock()
dup.id = 99
dup.document_type = "OTHER"
return dup
return None
monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup)
_, skipped, _ = await _run_index(confluence_mocks)
connector_docs = confluence_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 5: Heartbeat callback forwarding
# ---------------------------------------------------------------------------
async def test_heartbeat_callback_forwarded_to_pipeline(confluence_mocks):
heartbeat_cb = AsyncMock()
await _run_index(confluence_mocks, on_heartbeat_callback=heartbeat_cb)
call_kwargs = confluence_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("on_heartbeat") is heartbeat_cb
# ---------------------------------------------------------------------------
# Slice 6: Empty pages and no-data success tuple
# ---------------------------------------------------------------------------
async def test_empty_pages_returns_zero_tuple(confluence_mocks):
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
[],
None,
)
indexed, skipped, warning = await _run_index(confluence_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
confluence_mocks["batch_mock"].assert_not_called()
async def test_no_pages_error_message_returns_success_tuple(confluence_mocks):
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
[],
"No pages found in date range",
)
indexed, skipped, warning = await _run_index(confluence_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
async def test_api_error_still_returns_3_tuple(confluence_mocks):
confluence_mocks["confluence_client"].get_pages_by_date_range.return_value = (
[],
"API exploded",
)
result = await _run_index(confluence_mocks)
assert len(result) == 3
assert result[0] == 0
assert result[1] == 0
assert "Failed to get Confluence pages" in result[2]
# ---------------------------------------------------------------------------
# Slice 7: Failed docs warning
# ---------------------------------------------------------------------------
async def test_failed_docs_warning_in_result(confluence_mocks):
confluence_mocks["batch_mock"].return_value = ([], 0, 2)
_, _, warning = await _run_index(confluence_mocks)
assert warning is not None
assert "2 failed" in warning

View file

@ -0,0 +1,372 @@
"""Tests for Jira indexer migrated to the unified parallel pipeline."""
from unittest.mock import AsyncMock, MagicMock
import pytest
import app.tasks.connector_indexers.jira_indexer as _mod
from app.db import DocumentType
from app.tasks.connector_indexers.jira_indexer import (
_build_connector_doc,
index_jira_issues,
)
pytestmark = pytest.mark.unit
_USER_ID = "00000000-0000-0000-0000-000000000001"
_CONNECTOR_ID = 42
_SEARCH_SPACE_ID = 1
def _make_issue(
issue_key: str = "ENG-1",
issue_id: str = "10001",
title: str = "Fix login",
):
return {"key": issue_key, "id": issue_id, "title": title}
def _make_formatted_issue(
issue_key: str = "ENG-1",
issue_id: str = "10001",
title: str = "Fix login",
status: str = "In Progress",
priority: str = "High",
comments=None,
):
return {
"key": issue_key,
"id": issue_id,
"title": title,
"status": status,
"priority": priority,
"comments": comments or [],
}
# ---------------------------------------------------------------------------
# Slice 1: _build_connector_doc tracer bullet
# ---------------------------------------------------------------------------
async def test_build_connector_doc_produces_correct_fields():
issue = _make_issue(issue_key="ENG-42", issue_id="4242", title="Fix auth bug")
formatted = _make_formatted_issue(
issue_key="ENG-42",
issue_id="4242",
title="Fix auth bug",
status="Done",
priority="Urgent",
comments=[{"id": "c1"}],
)
markdown = "# ENG-42: Fix auth bug\n\nBody"
doc = _build_connector_doc(
issue,
formatted,
markdown,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert doc.title == "ENG-42: 4242"
assert doc.unique_id == "ENG-42"
assert doc.document_type == DocumentType.JIRA_CONNECTOR
assert doc.source_markdown == markdown
assert doc.search_space_id == _SEARCH_SPACE_ID
assert doc.connector_id == _CONNECTOR_ID
assert doc.created_by_id == _USER_ID
assert doc.should_summarize is True
assert doc.metadata["issue_id"] == "ENG-42"
assert doc.metadata["issue_identifier"] == "ENG-42"
assert doc.metadata["issue_title"] == "4242"
assert doc.metadata["state"] == "Done"
assert doc.metadata["priority"] == "Urgent"
assert doc.metadata["comment_count"] == 1
assert doc.metadata["connector_id"] == _CONNECTOR_ID
assert doc.metadata["document_type"] == "Jira Issue"
assert doc.metadata["connector_type"] == "Jira"
assert doc.fallback_summary is not None
assert "ENG-42" in doc.fallback_summary
assert markdown in doc.fallback_summary
async def test_build_connector_doc_summary_disabled():
doc = _build_connector_doc(
_make_issue(),
_make_formatted_issue(),
"# content",
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=False,
)
assert doc.should_summarize is False
# ---------------------------------------------------------------------------
# Shared fixtures for Slices 2-7
# ---------------------------------------------------------------------------
def _mock_connector(enable_summary: bool = True):
c = MagicMock()
c.config = {"access_token": "tok"}
c.enable_summary = enable_summary
c.last_indexed_at = None
return c
def _mock_jira_client(issues=None, error=None):
client = MagicMock()
client.get_issues_by_date_range = AsyncMock(
return_value=(issues if issues is not None else [], error),
)
client.format_issue = MagicMock(
side_effect=lambda i: _make_formatted_issue(
issue_key=i.get("key", ""),
issue_id=i.get("id", ""),
title=i.get("title", ""),
)
)
client.format_issue_to_markdown = MagicMock(
side_effect=lambda fi: f"# {fi.get('key', '')}: {fi.get('id', '')}\n\nContent"
)
client.close = AsyncMock()
return client
@pytest.fixture
def jira_mocks(monkeypatch):
mock_session = AsyncMock()
mock_session.no_autoflush = MagicMock()
mock_connector = _mock_connector()
monkeypatch.setattr(
_mod, "get_connector_by_id", AsyncMock(return_value=mock_connector),
)
jira_client = _mock_jira_client(issues=[_make_issue()])
monkeypatch.setattr(
_mod, "JiraHistoryConnector", MagicMock(return_value=jira_client),
)
monkeypatch.setattr(
_mod, "check_duplicate_document_by_hash", AsyncMock(return_value=None),
)
monkeypatch.setattr(
_mod, "update_connector_last_indexed", AsyncMock(),
)
monkeypatch.setattr(
_mod, "calculate_date_range", MagicMock(return_value=("2025-01-01", "2025-12-31")),
)
mock_task_logger = MagicMock()
mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock())
mock_task_logger.log_task_progress = AsyncMock()
mock_task_logger.log_task_success = AsyncMock()
mock_task_logger.log_task_failure = AsyncMock()
monkeypatch.setattr(
_mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger),
)
batch_mock = AsyncMock(return_value=([], 1, 0))
pipeline_mock = MagicMock()
pipeline_mock.index_batch_parallel = batch_mock
pipeline_mock.migrate_legacy_docs = AsyncMock()
monkeypatch.setattr(
_mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock),
)
return {
"session": mock_session,
"connector": mock_connector,
"jira_client": jira_client,
"task_logger": mock_task_logger,
"pipeline_mock": pipeline_mock,
"batch_mock": batch_mock,
}
async def _run_index(mocks, **overrides):
return await index_jira_issues(
session=mocks["session"],
connector_id=overrides.get("connector_id", _CONNECTOR_ID),
search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID),
user_id=overrides.get("user_id", _USER_ID),
start_date=overrides.get("start_date", "2025-01-01"),
end_date=overrides.get("end_date", "2025-12-31"),
update_last_indexed=overrides.get("update_last_indexed", True),
on_heartbeat_callback=overrides.get("on_heartbeat_callback"),
)
# ---------------------------------------------------------------------------
# Slice 2: Full pipeline wiring
# ---------------------------------------------------------------------------
async def test_one_issue_calls_pipeline_and_returns_indexed_count(jira_mocks):
indexed, skipped, warning = await _run_index(jira_mocks)
assert indexed == 1
assert skipped == 0
assert warning is None
jira_mocks["batch_mock"].assert_called_once()
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].document_type == DocumentType.JIRA_CONNECTOR
async def test_pipeline_called_with_max_concurrency_3(jira_mocks):
await _run_index(jira_mocks)
call_kwargs = jira_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("max_concurrency") == 3
async def test_migrate_legacy_docs_called_before_indexing(jira_mocks):
await _run_index(jira_mocks)
jira_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once()
# ---------------------------------------------------------------------------
# Slice 3: Issue skipping (missing key/title/content)
# ---------------------------------------------------------------------------
async def test_issues_with_missing_key_are_skipped(jira_mocks):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
{"key": "", "id": "10002", "title": "No key"},
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
async def test_issues_with_missing_title_are_skipped(jira_mocks):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
{"key": "ENG-2", "id": "", "title": "Missing id used as title"},
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
async def test_issues_with_no_content_are_skipped(jira_mocks):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
_make_issue(issue_key="ENG-2", issue_id="10002"),
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
jira_mocks["jira_client"].format_issue_to_markdown.side_effect = [
"# ENG-1: 10001\n\nContent",
"",
]
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 4: Duplicate content skipping
# ---------------------------------------------------------------------------
async def test_duplicate_content_issues_are_skipped(jira_mocks, monkeypatch):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
_make_issue(issue_key="ENG-2", issue_id="10002"),
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
call_count = 0
async def _check_dup(session, content_hash):
nonlocal call_count
call_count += 1
if call_count == 2:
dup = MagicMock()
dup.id = 99
dup.document_type = "OTHER"
return dup
return None
monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup)
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 5: Heartbeat callback forwarding
# ---------------------------------------------------------------------------
async def test_heartbeat_callback_forwarded_to_pipeline(jira_mocks):
heartbeat_cb = AsyncMock()
await _run_index(jira_mocks, on_heartbeat_callback=heartbeat_cb)
call_kwargs = jira_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("on_heartbeat") is heartbeat_cb
# ---------------------------------------------------------------------------
# Slice 6: Empty issues and no-data success tuple
# ---------------------------------------------------------------------------
async def test_empty_issues_returns_zero_tuple(jira_mocks):
jira_mocks["jira_client"].get_issues_by_date_range.return_value = ([], None)
indexed, skipped, warning = await _run_index(jira_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
jira_mocks["batch_mock"].assert_not_called()
async def test_no_issues_error_message_returns_success_tuple(jira_mocks):
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (
[],
"No issues found in date range",
)
indexed, skipped, warning = await _run_index(jira_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
async def test_api_error_still_returns_3_tuple(jira_mocks):
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (
[],
"API exploded",
)
result = await _run_index(jira_mocks)
assert len(result) == 3
assert result[0] == 0
assert result[1] == 0
assert "Failed to get Jira issues" in result[2]
# ---------------------------------------------------------------------------
# Slice 7: Failed docs warning
# ---------------------------------------------------------------------------
async def test_failed_docs_warning_in_result(jira_mocks):
jira_mocks["batch_mock"].return_value = ([], 0, 2)
_, _, warning = await _run_index(jira_mocks)
assert warning is not None
assert "2 failed" in warning