feat: implement two-phase document indexing for Linear and Slack connectors with real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 02:59:21 +05:30
parent 781cdc3dbd
commit 2077344934
2 changed files with 337 additions and 230 deletions

View file

@ -1,5 +1,9 @@
"""
Linear connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
import time
@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.linear_connector import LinearConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -28,6 +32,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -196,6 +201,7 @@ async def index_linear_issues(
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
documents_failed = 0 # Track issues that failed processing
skipped_issues = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -207,16 +213,14 @@ async def index_linear_issues(
{"stage": "process_issues", "total_issues": len(issues)},
)
# Process each issue
for issue in issues:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all issues, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
issues_to_process = [] # List of dicts with document and issue data
new_documents_created = False
for issue in issues:
try:
issue_id = issue.get("id", "")
issue_identifier = issue.get("identifier", "")
@ -262,78 +266,35 @@ async def index_linear_issues(
state = formatted_issue.get("state", "Unknown")
description = formatted_issue.get("description", "")
comment_count = len(formatted_issue.get("comments", []))
priority = formatted_issue.get("priority", "Unknown")
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Linear issue {issue_identifier} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Linear issue {issue_identifier}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"issue_id": issue_identifier,
"issue_title": issue_title,
"state": state,
"priority": formatted_issue.get("priority", "Unknown"),
"comment_count": comment_count,
"document_type": "Linear Issue",
"connector_type": "Linear",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
issue_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
if description and len(description) > 1000:
description = description[:997] + "..."
summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n"
if description:
summary_content += f"Description: {description}\n\n"
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(issue_content)
# Update existing document
existing_document.title = f"{issue_identifier}: {issue_title}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(
f"Successfully updated Linear issue {issue_identifier}"
)
continue
# Queue existing document for update (will be set to processing in Phase 2)
issues_to_process.append({
'document': existing_document,
'is_new': False,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'state': state,
'description': description,
'comment_count': comment_count,
'priority': priority,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -351,48 +312,7 @@ async def index_linear_issues(
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"issue_id": issue_identifier,
"issue_title": issue_title,
"state": state,
"priority": formatted_issue.get("priority", "Unknown"),
"comment_count": comment_count,
"document_type": "Linear Issue",
"connector_type": "Linear",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
issue_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
# Truncate description if it's too long for the summary
if description and len(description) > 1000:
description = description[:997] + "..."
summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n"
if description:
summary_content += f"Description: {description}\n\n"
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full issue content with comments
chunks = await create_document_chunks(issue_content)
# Create and store new document
logger.info(
f"Creating new document for issue {issue_identifier} - {issue_title}"
)
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=f"{issue_identifier}: {issue_title}",
@ -403,25 +323,119 @@ async def index_linear_issues(
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(
f"Successfully indexed new issue {issue_identifier} - {issue_title}"
new_documents_created = True
issues_to_process.append({
'document': document,
'is_new': True,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'state': state,
'description': description,
'comment_count': comment_count,
'priority': priority,
})
except Exception as e:
logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(issues_to_process)} documents")
for item in issues_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Batch commit every 10 documents
if user_llm:
document_metadata_for_summary = {
"issue_id": item['issue_identifier'],
"issue_title": item['issue_title'],
"state": item['state'],
"priority": item['priority'],
"comment_count": item['comment_count'],
"document_type": "Linear Issue",
"connector_type": "Linear",
}
summary_content, summary_embedding = await generate_document_summary(
item['issue_content'], user_llm, document_metadata_for_summary
)
else:
# Fallback to simple summary if no LLM configured
description = item['description']
if description and len(description) > 1000:
description = description[:997] + "..."
summary_content = f"Linear Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['state']}\n\n"
if description:
summary_content += f"Description: {description}\n\n"
summary_content += f"Comments: {item['comment_count']}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['issue_content'])
# Update document to READY with actual content
document.title = f"{item['issue_identifier']}: {item['issue_title']}"
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"issue_id": item['issue_id'],
"issue_identifier": item['issue_identifier'],
"issue_title": item['issue_title'],
"state": item['state'],
"comment_count": item['comment_count'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Linear issues processed so far"
@ -430,44 +444,68 @@ async def index_linear_issues(
except Exception as e:
logger.error(
f"Error processing issue {issue.get('identifier', 'Unknown')}: {e!s}",
f"Error processing issue {item.get('issue_identifier', 'Unknown')}: {e!s}",
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
skipped_issues.append(
f"{issue.get('identifier', 'Unknown')} (processing error)"
f"{item.get('issue_identifier', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this issue and continue with others
documents_failed += 1
continue
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Linear issues processed")
await session.commit()
logger.info("Successfully committed all Linear document changes to database")
try:
await session.commit()
logger.info("Successfully committed all Linear document changes to database")
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same issue was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Build warning message if there were issues
warning_parts = []
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Linear indexing for connector {connector_id}",
{
"issues_processed": total_processed,
"issues_processed": documents_indexed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"skipped_issues_count": len(skipped_issues),
},
)
logger.info(
f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped"
f"Linear indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
return documents_indexed, warning_message
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -1,5 +1,9 @@
"""
Slack connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
import time
@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.slack_history import SlackHistory
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
@ -28,6 +32,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -168,11 +173,15 @@ async def index_slack_messages(
f"No Slack channels found for connector {connector_id}",
{"channels_found": 0},
)
return 0, "No Slack channels found"
# CRITICAL: Update timestamp even when no channels found so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no channels found
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
documents_failed = 0 # Track messages that failed processing
skipped_channels = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -184,15 +193,14 @@ async def index_slack_messages(
{"stage": "process_channels", "total_channels": len(channels)},
)
# Process each channel
# =======================================================================
# PHASE 1: Collect all messages from all channels, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
new_documents_created = False
for channel_obj in channels:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
channel_id = channel_obj["id"]
channel_name = channel_obj["name"]
is_private = channel_obj["is_private"]
@ -305,47 +313,29 @@ async def index_slack_messages(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Slack message {msg_ts} in channel {channel_name}. Updating document."
)
# Update chunks and embedding
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Update existing document
existing_document.content = combined_document_string
existing_document.content_hash = content_hash
existing_document.embedding = doc_embedding
existing_document.document_metadata = {
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
# Delete old chunks and add new ones
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(f"Successfully updated Slack message {msg_ts}")
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'channel_name': channel_name,
'channel_id': channel_id,
'msg_ts': msg_ts,
'start_date': start_date_str,
'end_date': end_date_str,
'message_count': len(formatted_messages),
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -363,14 +353,7 @@ async def index_slack_messages(
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(combined_document_string)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Create and store new document
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=channel_name,
@ -378,33 +361,37 @@ async def index_slack_messages(
document_metadata={
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"msg_ts": msg_ts,
"connector_id": connector_id,
},
content=combined_document_string,
embedding=doc_embedding,
chunks=chunks,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
new_documents_created = True
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Slack channels processed so far"
)
await session.commit()
messages_to_process.append({
'document': document,
'is_new': True,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'channel_name': channel_name,
'channel_id': channel_id,
'msg_ts': msg_ts,
'start_date': start_date_str,
'end_date': end_date_str,
'message_count': len(formatted_messages),
})
logger.info(
f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages"
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}"
)
except SlackApiError as slack_error:
@ -420,43 +407,125 @@ async def index_slack_messages(
documents_skipped += 1
continue # Skip this channel and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one channel
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
for item in messages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (embeddings, chunks)
chunks = await create_document_chunks(item['combined_document_string'])
doc_embedding = config.embedding_model_instance.embed(
item['combined_document_string']
)
# Update document to READY with actual content
document.title = item['channel_name']
document.content = item['combined_document_string']
document.content_hash = item['content_hash']
document.embedding = doc_embedding
document.document_metadata = {
"channel_name": item['channel_name'],
"channel_id": item['channel_id'],
"start_date": item['start_date'],
"end_date": item['end_date'],
"message_count": item['message_count'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Slack messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing Slack message {item.get('msg_ts', 'Unknown')}: {e!s}",
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Slack channels processed")
await session.commit()
logger.info(f"Final commit: Total {documents_indexed} Slack messages processed")
try:
await session.commit()
logger.info("Successfully committed all Slack document changes to database")
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same message was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Prepare result message
result_message = None
if skipped_channels:
result_message = f"Processed {total_processed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
else:
result_message = f"Processed {total_processed} channels."
# Build warning message if there were issues
warning_parts = []
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Slack indexing for connector {connector_id}",
{
"channels_processed": total_processed,
"channels_processed": len(channels),
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"skipped_channels_count": len(skipped_channels),
"result_message": result_message,
},
)
logger.info(
f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped"
f"Slack indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed"
)
return (
total_processed,
None,
) # Return None on success (result_message is for logging only)
return documents_indexed, warning_message
except SQLAlchemyError as db_error:
await session.rollback()