feat: implement two-phase document indexing for Confluence and Jira connectors with real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 03:54:24 +05:30
parent 0249ea20a5
commit 1d870e45a4
2 changed files with 369 additions and 295 deletions

View file

@ -1,5 +1,9 @@
"""
Confluence connector indexer.
Provides real-time document status updates during indexing using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (PENDING PROCESSING READY/FAILED)
"""
import contextlib
@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.confluence_history import ConfluenceHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -29,6 +33,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -180,22 +185,22 @@ async def index_confluence_pages(
await confluence_client.close()
return 0, f"Error fetching Confluence pages: {e!s}"
# Process and index each page
# =======================================================================
# PHASE 1: Analyze all pages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
documents_indexed = 0
skipped_pages = []
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
pages_to_process = [] # List of dicts with document and page data
new_documents_created = False
for page in pages:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:
page_id = page.get("id")
page_title = page.get("title", "")
@ -205,7 +210,6 @@ async def index_confluence_pages(
logger.warning(
f"Skipping page with missing ID or title: {page_id or 'Unknown'}"
)
skipped_pages.append(f"{page_title or 'Unknown'} (missing data)")
documents_skipped += 1
continue
@ -236,7 +240,6 @@ async def index_confluence_pages(
if not full_content.strip():
logger.warning(f"Skipping page with no content: {page_title}")
skipped_pages.append(f"{page_title} (no content)")
documents_skipped += 1
continue
@ -258,74 +261,25 @@ async def index_confluence_pages(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Confluence page {page_title} unchanged. Skipping."
)
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Confluence page {page_title}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"page_title": page_title,
"page_id": page_id,
"space_id": space_id,
"comment_count": comment_count,
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
full_content, user_llm, document_metadata
)
else:
summary_content = f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n"
if page_content:
content_preview = page_content[:1000]
if len(page_content) > 1000:
content_preview += "..."
summary_content += (
f"Content Preview: {content_preview}\n\n"
)
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(full_content)
# Update existing document
existing_document.title = page_title
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(
f"Successfully updated Confluence page {page_title}"
)
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append({
'document': existing_document,
'is_new': False,
'full_content': full_content,
'page_content': page_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
'space_id': space_id,
'comment_count': comment_count,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -340,51 +294,11 @@ async def index_confluence_pages(
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"page_title": page_title,
"page_id": page_id,
"space_id": space_id,
"comment_count": comment_count,
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
full_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = (
f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n"
)
if page_content:
# Take first 500 characters of content for summary
content_preview = page_content[:1000]
if len(page_content) > 1000:
content_preview += "..."
summary_content += f"Content Preview: {content_preview}\n\n"
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full page content with comments
chunks = await create_document_chunks(full_content)
# Create and store new document
logger.info(f"Creating new document for page {page_title}")
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=page_title,
@ -394,23 +308,122 @@ async def index_confluence_pages(
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new page {page_title}")
new_documents_created = True
# Batch commit every 10 documents
pages_to_process.append({
'document': document,
'is_new': True,
'full_content': full_content,
'page_content': page_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
'space_id': space_id,
'comment_count': comment_count,
})
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(pages_to_process)} documents")
for item in pages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"page_title": item['page_title'],
"page_id": item['page_id'],
"space_id": item['space_id'],
"comment_count": item['comment_count'],
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['full_content'], user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = (
f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n"
)
if item['page_content']:
# Take first 1000 characters of content for summary
content_preview = item['page_content'][:1000]
if len(item['page_content']) > 1000:
content_preview += "..."
summary_content += f"Content Preview: {content_preview}\n\n"
summary_content += f"Comments: {item['comment_count']}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full page content with comments
chunks = await create_document_chunks(item['full_content'])
# Update document to READY with actual content
document.title = item['page_title']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"page_id": item['page_id'],
"page_title": item['page_title'],
"space_id": item['space_id'],
"comment_count": item['comment_count'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Confluence pages processed so far"
@ -419,53 +432,78 @@ async def index_confluence_pages(
except Exception as e:
logger.error(
f"Error processing page {page.get('title', 'Unknown')}: {e!s}",
f"Error processing page {item.get('page_title', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_pages.append(
f"{page.get('title', 'Unknown')} (processing error)"
)
documents_skipped += 1
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue # Skip this page and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
# Final commit to ensure all documents are persisted (safety net)
logger.info(
f"Final commit: Total {documents_indexed} Confluence pages processed"
)
await session.commit()
logger.info(
"Successfully committed all Confluence document changes to database"
)
try:
await session.commit()
logger.info(
"Successfully committed all Confluence document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same page was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Confluence indexing for connector {connector_id}",
{
"pages_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_pages_count": len(skipped_pages),
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Confluence indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
f"Confluence indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
# Close the client connection
if confluence_client:
await confluence_client.close()
return (
total_processed,
None,
) # Return None as the error message to indicate success
return documents_indexed, warning_message
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -1,5 +1,9 @@
"""
Jira connector indexer.
Provides real-time document status updates during indexing using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (PENDING PROCESSING READY/FAILED)
"""
import contextlib
@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.jira_history import JiraHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -29,6 +33,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -174,22 +179,22 @@ async def index_jira_issues(
logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True)
return 0, f"Error fetching Jira issues: {e!s}"
# Process and index each issue
# =======================================================================
# PHASE 1: Analyze all issues, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
documents_indexed = 0
skipped_issues = []
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
issues_to_process = [] # List of dicts with document and issue data
new_documents_created = False
for issue in issues:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:
issue_id = issue.get("key")
issue_identifier = issue.get("key", "")
@ -199,9 +204,6 @@ async def index_jira_issues(
logger.warning(
f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}"
)
skipped_issues.append(
f"{issue_identifier or 'Unknown'} (missing data)"
)
documents_skipped += 1
continue
@ -215,7 +217,6 @@ async def index_jira_issues(
logger.warning(
f"Skipping issue with no content: {issue_identifier} - {issue_title}"
)
skipped_issues.append(f"{issue_identifier} (no content)")
documents_skipped += 1
continue
@ -237,71 +238,25 @@ async def index_jira_issues(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Jira issue {issue_identifier} unchanged. Skipping."
)
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Jira issue {issue_identifier}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"issue_key": issue_identifier,
"issue_title": issue_title,
"status": formatted_issue.get("status", "Unknown"),
"priority": formatted_issue.get("priority", "Unknown"),
"comment_count": comment_count,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
issue_content, user_llm, document_metadata
)
else:
summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n"
if formatted_issue.get("description"):
summary_content += f"Description: {formatted_issue.get('description')}\n\n"
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(issue_content)
# Update existing document
existing_document.title = f"{issue_identifier}: {issue_title}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": formatted_issue.get("status", "Unknown"),
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(
f"Successfully updated Jira issue {issue_identifier}"
)
continue
# Queue existing document for update (will be set to processing in Phase 2)
issues_to_process.append({
'document': existing_document,
'is_new': False,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'formatted_issue': formatted_issue,
'comment_count': comment_count,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -316,50 +271,11 @@ async def index_jira_issues(
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"issue_key": issue_identifier,
"issue_title": issue_title,
"status": formatted_issue.get("status", "Unknown"),
"priority": formatted_issue.get("priority", "Unknown"),
"comment_count": comment_count,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
issue_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n"
if formatted_issue.get("description"):
summary_content += (
f"Description: {formatted_issue.get('description')}\n\n"
)
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full issue content with comments
chunks = await create_document_chunks(issue_content)
# Create and store new document
logger.info(
f"Creating new document for issue {issue_identifier} - {issue_title}"
)
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=f"{issue_identifier}: {issue_title}",
@ -370,25 +286,120 @@ async def index_jira_issues(
"issue_title": issue_title,
"state": formatted_issue.get("status", "Unknown"),
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(
f"Successfully indexed new issue {issue_identifier} - {issue_title}"
new_documents_created = True
issues_to_process.append({
'document': document,
'is_new': True,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'formatted_issue': formatted_issue,
'comment_count': comment_count,
})
except Exception as e:
logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(issues_to_process)} documents")
for item in issues_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Batch commit every 10 documents
if user_llm:
document_metadata = {
"issue_key": item['issue_identifier'],
"issue_title": item['issue_title'],
"status": item['formatted_issue'].get("status", "Unknown"),
"priority": item['formatted_issue'].get("priority", "Unknown"),
"comment_count": item['comment_count'],
"document_type": "Jira Issue",
"connector_type": "Jira",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['issue_content'], user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Jira Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['formatted_issue'].get('status', 'Unknown')}\n\n"
if item['formatted_issue'].get("description"):
summary_content += (
f"Description: {item['formatted_issue'].get('description')}\n\n"
)
summary_content += f"Comments: {item['comment_count']}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full issue content with comments
chunks = await create_document_chunks(item['issue_content'])
# Update document to READY with actual content
document.title = f"{item['issue_identifier']}: {item['issue_title']}"
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"issue_id": item['issue_id'],
"issue_identifier": item['issue_identifier'],
"issue_title": item['issue_title'],
"state": item['formatted_issue'].get("status", "Unknown"),
"comment_count": item['comment_count'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Jira issues processed so far"
@ -397,48 +408,73 @@ async def index_jira_issues(
except Exception as e:
logger.error(
f"Error processing issue {issue.get('identifier', 'Unknown')}: {e!s}",
f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_issues.append(
f"{issue.get('identifier', 'Unknown')} (processing error)"
)
documents_skipped += 1
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue # Skip this issue and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
# Final commit to ensure all documents are persisted (safety net)
logger.info(f"Final commit: Total {documents_indexed} Jira issues processed")
await session.commit()
logger.info("Successfully committed all JIRA document changes to database")
try:
await session.commit()
logger.info("Successfully committed all JIRA document changes to database")
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same issue was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed JIRA indexing for connector {connector_id}",
{
"issues_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_issues_count": len(skipped_issues),
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"JIRA indexing completed: {documents_indexed} new issues, {documents_skipped} skipped"
f"JIRA indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
# Clean up the connector
await jira_client.close()
return (
total_processed,
None,
) # Return None as the error message to indicate success
return documents_indexed, warning_message
except SQLAlchemyError as db_error:
await session.rollback()