mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-26 01:06:23 +02:00
feat: implement two-phase document indexing for BookStack, Elasticsearch, and Luma connectors with real-time status updates
This commit is contained in:
parent
bfa3be655e
commit
0f61a249c0
3 changed files with 580 additions and 364 deletions
|
|
@ -1,5 +1,9 @@
|
||||||
"""
|
"""
|
||||||
BookStack connector indexer.
|
BookStack connector indexer.
|
||||||
|
|
||||||
|
Implements 2-phase document status updates for real-time UI feedback:
|
||||||
|
- Phase 1: Collect all pages and create pending documents (visible in UI immediately)
|
||||||
|
- Phase 2: Process each page: pending → processing → ready/failed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.config import config
|
from app.config import config
|
||||||
from app.connectors.bookstack_connector import BookStackConnector
|
from app.connectors.bookstack_connector import BookStackConnector
|
||||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
from app.services.llm_service import get_user_long_context_llm
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
|
|
@ -28,6 +32,7 @@ from .base import (
|
||||||
get_connector_by_id,
|
get_connector_by_id,
|
||||||
get_current_timestamp,
|
get_current_timestamp,
|
||||||
logger,
|
logger,
|
||||||
|
safe_set_chunks,
|
||||||
update_connector_last_indexed,
|
update_connector_last_indexed,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -184,22 +189,22 @@ async def index_bookstack_pages(
|
||||||
logger.error(f"Error fetching BookStack pages: {e!s}", exc_info=True)
|
logger.error(f"Error fetching BookStack pages: {e!s}", exc_info=True)
|
||||||
return 0, f"Error fetching BookStack pages: {e!s}"
|
return 0, f"Error fetching BookStack pages: {e!s}"
|
||||||
|
|
||||||
# Process and index each page
|
# =======================================================================
|
||||||
|
# PHASE 1: Analyze all pages, create pending documents
|
||||||
|
# This makes ALL documents visible in the UI immediately with pending status
|
||||||
|
# =======================================================================
|
||||||
documents_indexed = 0
|
documents_indexed = 0
|
||||||
skipped_pages = []
|
skipped_pages = []
|
||||||
documents_skipped = 0
|
documents_skipped = 0
|
||||||
|
documents_failed = 0
|
||||||
|
|
||||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||||
last_heartbeat_time = time.time()
|
last_heartbeat_time = time.time()
|
||||||
|
|
||||||
|
pages_to_process = [] # List of dicts with document and page data
|
||||||
|
new_documents_created = False
|
||||||
|
|
||||||
for page in pages:
|
for page in pages:
|
||||||
# Check if it's time for a heartbeat update
|
|
||||||
if (
|
|
||||||
on_heartbeat_callback
|
|
||||||
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
|
|
||||||
):
|
|
||||||
await on_heartbeat_callback(documents_indexed)
|
|
||||||
last_heartbeat_time = time.time()
|
|
||||||
try:
|
try:
|
||||||
page_id = page.get("id")
|
page_id = page.get("id")
|
||||||
page_name = page.get("name", "")
|
page_name = page.get("name", "")
|
||||||
|
|
@ -218,7 +223,7 @@ async def index_bookstack_pages(
|
||||||
|
|
||||||
# Fetch full page content (Markdown preferred)
|
# Fetch full page content (Markdown preferred)
|
||||||
try:
|
try:
|
||||||
page_detail, page_content = bookstack_client.get_page_with_content(
|
_, page_content = bookstack_client.get_page_with_content(
|
||||||
page_id, use_markdown=True
|
page_id, use_markdown=True
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -252,82 +257,34 @@ async def index_bookstack_pages(
|
||||||
# Build page URL
|
# Build page URL
|
||||||
page_url = f"{bookstack_base_url}/books/{book_slug}/page/{page_slug}"
|
page_url = f"{bookstack_base_url}/books/{book_slug}/page/{page_slug}"
|
||||||
|
|
||||||
# Build document metadata
|
|
||||||
doc_metadata = {
|
|
||||||
"page_id": page_id,
|
|
||||||
"page_name": page_name,
|
|
||||||
"page_slug": page_slug,
|
|
||||||
"book_id": book_id,
|
|
||||||
"book_slug": book_slug,
|
|
||||||
"chapter_id": chapter_id,
|
|
||||||
"base_url": bookstack_base_url,
|
|
||||||
"page_url": page_url,
|
|
||||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
||||||
}
|
|
||||||
|
|
||||||
if existing_document:
|
if existing_document:
|
||||||
# Document exists - check if content has changed
|
# Document exists - check if content has changed
|
||||||
if existing_document.content_hash == content_hash:
|
if existing_document.content_hash == content_hash:
|
||||||
|
# Ensure status is ready (might have been stuck in processing/pending)
|
||||||
|
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
|
||||||
|
existing_document.status = DocumentStatus.ready()
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Document for BookStack page {page_name} unchanged. Skipping."
|
f"Document for BookStack page {page_name} unchanged. Skipping."
|
||||||
)
|
)
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
else:
|
|
||||||
# Content has changed - update the existing document
|
|
||||||
logger.info(
|
|
||||||
f"Content changed for BookStack page {page_name}. Updating document."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Generate summary with metadata
|
# Queue existing document for update (will be set to processing in Phase 2)
|
||||||
user_llm = await get_user_long_context_llm(
|
pages_to_process.append({
|
||||||
session, user_id, search_space_id
|
'document': existing_document,
|
||||||
)
|
'is_new': False,
|
||||||
|
'page_id': page_id,
|
||||||
if user_llm:
|
'page_name': page_name,
|
||||||
summary_metadata = {
|
'page_slug': page_slug,
|
||||||
"page_name": page_name,
|
'book_id': book_id,
|
||||||
"page_id": page_id,
|
'book_slug': book_slug,
|
||||||
"book_id": book_id,
|
'chapter_id': chapter_id,
|
||||||
"document_type": "BookStack Page",
|
'page_url': page_url,
|
||||||
"connector_type": "BookStack",
|
'page_content': page_content,
|
||||||
}
|
'full_content': full_content,
|
||||||
(
|
'content_hash': content_hash,
|
||||||
summary_content,
|
})
|
||||||
summary_embedding,
|
continue
|
||||||
) = await generate_document_summary(
|
|
||||||
full_content, user_llm, summary_metadata
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = (
|
|
||||||
f"BookStack Page: {page_name}\n\nBook ID: {book_id}\n\n"
|
|
||||||
)
|
|
||||||
if page_content:
|
|
||||||
content_preview = page_content[:1000]
|
|
||||||
if len(page_content) > 1000:
|
|
||||||
content_preview += "..."
|
|
||||||
summary_content += (
|
|
||||||
f"Content Preview: {content_preview}\n\n"
|
|
||||||
)
|
|
||||||
summary_embedding = config.embedding_model_instance.embed(
|
|
||||||
summary_content
|
|
||||||
)
|
|
||||||
|
|
||||||
# Process chunks
|
|
||||||
chunks = await create_document_chunks(full_content)
|
|
||||||
|
|
||||||
# Update existing document
|
|
||||||
existing_document.title = page_name
|
|
||||||
existing_document.content = summary_content
|
|
||||||
existing_document.content_hash = content_hash
|
|
||||||
existing_document.embedding = summary_embedding
|
|
||||||
existing_document.document_metadata = doc_metadata
|
|
||||||
existing_document.chunks = chunks
|
|
||||||
existing_document.updated_at = get_current_timestamp()
|
|
||||||
|
|
||||||
documents_indexed += 1
|
|
||||||
logger.info(f"Successfully updated BookStack page {page_name}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Document doesn't exist by unique_identifier_hash
|
# Document doesn't exist by unique_identifier_hash
|
||||||
# Check if a document with the same content_hash exists (from another connector)
|
# Check if a document with the same content_hash exists (from another connector)
|
||||||
|
|
@ -345,17 +302,104 @@ async def index_bookstack_pages(
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Document doesn't exist - create new one
|
# Create new document with PENDING status (visible in UI immediately)
|
||||||
# Generate summary with metadata
|
document = Document(
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
title=page_name,
|
||||||
|
document_type=DocumentType.BOOKSTACK_CONNECTOR,
|
||||||
|
document_metadata={
|
||||||
|
"page_id": page_id,
|
||||||
|
"page_name": page_name,
|
||||||
|
"page_slug": page_slug,
|
||||||
|
"book_id": book_id,
|
||||||
|
"book_slug": book_slug,
|
||||||
|
"chapter_id": chapter_id,
|
||||||
|
"base_url": bookstack_base_url,
|
||||||
|
"page_url": page_url,
|
||||||
|
"connector_id": connector_id,
|
||||||
|
},
|
||||||
|
content="Pending...", # Placeholder until processed
|
||||||
|
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||||
|
unique_identifier_hash=unique_identifier_hash,
|
||||||
|
embedding=None,
|
||||||
|
chunks=[], # Empty at creation - safe for async
|
||||||
|
status=DocumentStatus.pending(), # Pending until processing starts
|
||||||
|
updated_at=get_current_timestamp(),
|
||||||
|
created_by_id=user_id,
|
||||||
|
connector_id=connector_id,
|
||||||
|
)
|
||||||
|
session.add(document)
|
||||||
|
new_documents_created = True
|
||||||
|
|
||||||
|
pages_to_process.append({
|
||||||
|
'document': document,
|
||||||
|
'is_new': True,
|
||||||
|
'page_id': page_id,
|
||||||
|
'page_name': page_name,
|
||||||
|
'page_slug': page_slug,
|
||||||
|
'book_id': book_id,
|
||||||
|
'book_slug': book_slug,
|
||||||
|
'chapter_id': chapter_id,
|
||||||
|
'page_url': page_url,
|
||||||
|
'page_content': page_content,
|
||||||
|
'full_content': full_content,
|
||||||
|
'content_hash': content_hash,
|
||||||
|
})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
|
||||||
|
documents_failed += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Commit all pending documents - they all appear in UI now
|
||||||
|
if new_documents_created:
|
||||||
|
logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents")
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# =======================================================================
|
||||||
|
# PHASE 2: Process each document one by one
|
||||||
|
# Each document transitions: pending → processing → ready/failed
|
||||||
|
# =======================================================================
|
||||||
|
logger.info(f"Phase 2: Processing {len(pages_to_process)} documents")
|
||||||
|
|
||||||
|
for item in pages_to_process:
|
||||||
|
# Send heartbeat periodically
|
||||||
|
if on_heartbeat_callback:
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||||
|
await on_heartbeat_callback(documents_indexed)
|
||||||
|
last_heartbeat_time = current_time
|
||||||
|
|
||||||
|
document = item['document']
|
||||||
|
try:
|
||||||
|
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||||
|
document.status = DocumentStatus.processing()
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# Heavy processing (LLM, embeddings, chunks)
|
||||||
user_llm = await get_user_long_context_llm(
|
user_llm = await get_user_long_context_llm(
|
||||||
session, user_id, search_space_id
|
session, user_id, search_space_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Build document metadata
|
||||||
|
doc_metadata = {
|
||||||
|
"page_id": item['page_id'],
|
||||||
|
"page_name": item['page_name'],
|
||||||
|
"page_slug": item['page_slug'],
|
||||||
|
"book_id": item['book_id'],
|
||||||
|
"book_slug": item['book_slug'],
|
||||||
|
"chapter_id": item['chapter_id'],
|
||||||
|
"base_url": bookstack_base_url,
|
||||||
|
"page_url": item['page_url'],
|
||||||
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
"connector_id": connector_id,
|
||||||
|
}
|
||||||
|
|
||||||
if user_llm:
|
if user_llm:
|
||||||
summary_metadata = {
|
summary_metadata = {
|
||||||
"page_name": page_name,
|
"page_name": item['page_name'],
|
||||||
"page_id": page_id,
|
"page_id": item['page_id'],
|
||||||
"book_id": book_id,
|
"book_id": item['book_id'],
|
||||||
"document_type": "BookStack Page",
|
"document_type": "BookStack Page",
|
||||||
"connector_type": "BookStack",
|
"connector_type": "BookStack",
|
||||||
}
|
}
|
||||||
|
|
@ -363,17 +407,17 @@ async def index_bookstack_pages(
|
||||||
summary_content,
|
summary_content,
|
||||||
summary_embedding,
|
summary_embedding,
|
||||||
) = await generate_document_summary(
|
) = await generate_document_summary(
|
||||||
full_content, user_llm, summary_metadata
|
item['full_content'], user_llm, summary_metadata
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Fallback to simple summary if no LLM configured
|
# Fallback to simple summary if no LLM configured
|
||||||
summary_content = (
|
summary_content = (
|
||||||
f"BookStack Page: {page_name}\n\nBook ID: {book_id}\n\n"
|
f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n"
|
||||||
)
|
)
|
||||||
if page_content:
|
if item['page_content']:
|
||||||
# Take first 1000 characters of content for summary
|
# Take first 1000 characters of content for summary
|
||||||
content_preview = page_content[:1000]
|
content_preview = item['page_content'][:1000]
|
||||||
if len(page_content) > 1000:
|
if len(item['page_content']) > 1000:
|
||||||
content_preview += "..."
|
content_preview += "..."
|
||||||
summary_content += f"Content Preview: {content_preview}\n\n"
|
summary_content += f"Content Preview: {content_preview}\n\n"
|
||||||
summary_embedding = config.embedding_model_instance.embed(
|
summary_embedding = config.embedding_model_instance.embed(
|
||||||
|
|
@ -381,30 +425,21 @@ async def index_bookstack_pages(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process chunks - using the full page content
|
# Process chunks - using the full page content
|
||||||
chunks = await create_document_chunks(full_content)
|
chunks = await create_document_chunks(item['full_content'])
|
||||||
|
|
||||||
# Create and store new document
|
# Update document to READY with actual content
|
||||||
logger.info(f"Creating new document for page {page_name}")
|
document.title = item['page_name']
|
||||||
document = Document(
|
document.content = summary_content
|
||||||
search_space_id=search_space_id,
|
document.content_hash = item['content_hash']
|
||||||
title=page_name,
|
document.embedding = summary_embedding
|
||||||
document_type=DocumentType.BOOKSTACK_CONNECTOR,
|
document.document_metadata = doc_metadata
|
||||||
document_metadata=doc_metadata,
|
safe_set_chunks(document, chunks)
|
||||||
content=summary_content,
|
document.updated_at = get_current_timestamp()
|
||||||
content_hash=content_hash,
|
document.status = DocumentStatus.ready()
|
||||||
unique_identifier_hash=unique_identifier_hash,
|
|
||||||
embedding=summary_embedding,
|
|
||||||
chunks=chunks,
|
|
||||||
updated_at=get_current_timestamp(),
|
|
||||||
created_by_id=user_id,
|
|
||||||
connector_id=connector_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
session.add(document)
|
|
||||||
documents_indexed += 1
|
documents_indexed += 1
|
||||||
logger.info(f"Successfully indexed new page {page_name}")
|
|
||||||
|
|
||||||
# Batch commit every 10 documents
|
# Batch commit every 10 documents (for ready status updates)
|
||||||
if documents_indexed % 10 == 0:
|
if documents_indexed % 10 == 0:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Committing batch: {documents_indexed} BookStack pages processed so far"
|
f"Committing batch: {documents_indexed} BookStack pages processed so far"
|
||||||
|
|
@ -413,46 +448,72 @@ async def index_bookstack_pages(
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Error processing page {page.get('name', 'Unknown')}: {e!s}",
|
f"Error processing page {item.get('page_name', 'Unknown')}: {e!s}",
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
# Mark document as failed with reason (visible in UI)
|
||||||
|
try:
|
||||||
|
document.status = DocumentStatus.failed(str(e))
|
||||||
|
document.updated_at = get_current_timestamp()
|
||||||
|
except Exception as status_error:
|
||||||
|
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||||
skipped_pages.append(
|
skipped_pages.append(
|
||||||
f"{page.get('name', 'Unknown')} (processing error)"
|
f"{item.get('page_name', 'Unknown')} (processing error)"
|
||||||
)
|
)
|
||||||
documents_skipped += 1
|
documents_failed += 1
|
||||||
continue # Skip this page and continue with others
|
continue
|
||||||
|
|
||||||
# Update the last_indexed_at timestamp for the connector only if requested
|
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
|
||||||
total_processed = documents_indexed
|
# This ensures the UI shows "Last indexed" instead of "Never indexed"
|
||||||
if update_last_indexed:
|
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
|
||||||
|
|
||||||
# Final commit for any remaining documents not yet committed in batches
|
# Final commit for any remaining documents not yet committed in batches
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Final commit: Total {documents_indexed} BookStack pages processed"
|
f"Final commit: Total {documents_indexed} BookStack pages processed"
|
||||||
)
|
)
|
||||||
await session.commit()
|
try:
|
||||||
logger.info("Successfully committed all BookStack document changes to database")
|
await session.commit()
|
||||||
|
logger.info("Successfully committed all BookStack document changes to database")
|
||||||
|
except Exception as e:
|
||||||
|
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||||
|
if (
|
||||||
|
"duplicate key value violates unique constraint" in str(e).lower()
|
||||||
|
or "uniqueviolationerror" in str(e).lower()
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
f"Duplicate content_hash detected during final commit. "
|
||||||
|
f"This may occur if the same page was indexed by multiple connectors. "
|
||||||
|
f"Rolling back and continuing. Error: {e!s}"
|
||||||
|
)
|
||||||
|
await session.rollback()
|
||||||
|
# Don't fail the entire task - some documents may have been successfully indexed
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Build warning message if there were issues
|
||||||
|
warning_parts = []
|
||||||
|
if documents_failed > 0:
|
||||||
|
warning_parts.append(f"{documents_failed} failed")
|
||||||
|
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||||
|
|
||||||
# Log success
|
# Log success
|
||||||
await task_logger.log_task_success(
|
await task_logger.log_task_success(
|
||||||
log_entry,
|
log_entry,
|
||||||
f"Successfully completed BookStack indexing for connector {connector_id}",
|
f"Successfully completed BookStack indexing for connector {connector_id}",
|
||||||
{
|
{
|
||||||
"pages_processed": total_processed,
|
"pages_processed": documents_indexed,
|
||||||
"documents_indexed": documents_indexed,
|
"documents_indexed": documents_indexed,
|
||||||
"documents_skipped": documents_skipped,
|
"documents_skipped": documents_skipped,
|
||||||
|
"documents_failed": documents_failed,
|
||||||
"skipped_pages_count": len(skipped_pages),
|
"skipped_pages_count": len(skipped_pages),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"BookStack indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
|
f"BookStack indexing completed: {documents_indexed} ready, "
|
||||||
|
f"{documents_skipped} skipped, {documents_failed} failed"
|
||||||
)
|
)
|
||||||
return (
|
return documents_indexed, warning_message
|
||||||
total_processed,
|
|
||||||
None,
|
|
||||||
) # Return None as the error message to indicate success
|
|
||||||
|
|
||||||
except SQLAlchemyError as db_error:
|
except SQLAlchemyError as db_error:
|
||||||
await session.rollback()
|
await session.rollback()
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,9 @@
|
||||||
"""
|
"""
|
||||||
Elasticsearch indexer for SurfSense
|
Elasticsearch indexer for SurfSense
|
||||||
|
|
||||||
|
Implements 2-phase document status updates for real-time UI feedback:
|
||||||
|
- Phase 1: Collect all documents and create pending documents (visible in UI immediately)
|
||||||
|
- Phase 2: Process each document: pending → processing → ready/failed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
@ -13,7 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from sqlalchemy.future import select
|
from sqlalchemy.future import select
|
||||||
|
|
||||||
from app.connectors.elasticsearch_connector import ElasticsearchConnector
|
from app.connectors.elasticsearch_connector import ElasticsearchConnector
|
||||||
from app.db import Document, DocumentType, SearchSourceConnector
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnector
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
|
|
@ -25,6 +29,7 @@ from .base import (
|
||||||
check_document_by_unique_identifier,
|
check_document_by_unique_identifier,
|
||||||
check_duplicate_document_by_hash,
|
check_duplicate_document_by_hash,
|
||||||
get_current_timestamp,
|
get_current_timestamp,
|
||||||
|
safe_set_chunks,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Type hint for heartbeat callback
|
# Type hint for heartbeat callback
|
||||||
|
|
@ -164,6 +169,8 @@ async def index_elasticsearch_documents(
|
||||||
)
|
)
|
||||||
|
|
||||||
documents_processed = 0
|
documents_processed = 0
|
||||||
|
documents_skipped = 0
|
||||||
|
documents_failed = 0
|
||||||
|
|
||||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||||
last_heartbeat_time = time.time()
|
last_heartbeat_time = time.time()
|
||||||
|
|
@ -178,23 +185,22 @@ async def index_elasticsearch_documents(
|
||||||
"max_documents": max_documents,
|
"max_documents": max_documents,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
# Use scroll search for large result sets
|
|
||||||
|
# =======================================================================
|
||||||
|
# PHASE 1: Collect all documents from Elasticsearch and create pending documents
|
||||||
|
# This makes ALL documents visible in the UI immediately with pending status
|
||||||
|
# =======================================================================
|
||||||
|
docs_to_process = [] # List of dicts with document and ES data
|
||||||
|
new_documents_created = False
|
||||||
|
hits_collected = 0
|
||||||
|
|
||||||
async for hit in es_connector.scroll_search(
|
async for hit in es_connector.scroll_search(
|
||||||
index=index_name,
|
index=index_name,
|
||||||
query=query,
|
query=query,
|
||||||
size=min(max_documents, 100), # Scroll in batches
|
size=min(max_documents, 100), # Scroll in batches
|
||||||
fields=config.get("ELASTICSEARCH_FIELDS"),
|
fields=config.get("ELASTICSEARCH_FIELDS"),
|
||||||
):
|
):
|
||||||
# Check if it's time for a heartbeat update
|
if hits_collected >= max_documents:
|
||||||
if (
|
|
||||||
on_heartbeat_callback
|
|
||||||
and (time.time() - last_heartbeat_time)
|
|
||||||
>= HEARTBEAT_INTERVAL_SECONDS
|
|
||||||
):
|
|
||||||
await on_heartbeat_callback(documents_processed)
|
|
||||||
last_heartbeat_time = time.time()
|
|
||||||
|
|
||||||
if documents_processed >= max_documents:
|
|
||||||
break
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -220,26 +226,12 @@ async def index_elasticsearch_documents(
|
||||||
|
|
||||||
if not content.strip():
|
if not content.strip():
|
||||||
logger.warning(f"Skipping document {doc_id} - no content found")
|
logger.warning(f"Skipping document {doc_id} - no content found")
|
||||||
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Create content hash
|
# Create content hash
|
||||||
content_hash = generate_content_hash(content, search_space_id)
|
content_hash = generate_content_hash(content, search_space_id)
|
||||||
|
|
||||||
# Build metadata
|
|
||||||
metadata = {
|
|
||||||
"elasticsearch_id": doc_id,
|
|
||||||
"elasticsearch_index": hit.get("_index", index_name),
|
|
||||||
"elasticsearch_score": hit.get("_score"),
|
|
||||||
"indexed_at": datetime.now().isoformat(),
|
|
||||||
"source": "ELASTICSEARCH_CONNECTOR",
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add any additional metadata fields specified in config
|
|
||||||
if "ELASTICSEARCH_METADATA_FIELDS" in config:
|
|
||||||
for field in config["ELASTICSEARCH_METADATA_FIELDS"]:
|
|
||||||
if field in source:
|
|
||||||
metadata[f"es_{field}"] = source[field]
|
|
||||||
|
|
||||||
# Build source-unique identifier and hash (prefer source id dedupe)
|
# Build source-unique identifier and hash (prefer source id dedupe)
|
||||||
source_identifier = f"{hit.get('_index', index_name)}:{doc_id}"
|
source_identifier = f"{hit.get('_index', index_name)}:{doc_id}"
|
||||||
unique_identifier_hash = generate_unique_identifier_hash(
|
unique_identifier_hash = generate_unique_identifier_hash(
|
||||||
|
|
@ -258,98 +250,209 @@ async def index_elasticsearch_documents(
|
||||||
)
|
)
|
||||||
|
|
||||||
if existing_doc:
|
if existing_doc:
|
||||||
# If content is unchanged, skip. Otherwise update the existing document.
|
# If content is unchanged, skip. Otherwise queue for update.
|
||||||
if existing_doc.content_hash == content_hash:
|
if existing_doc.content_hash == content_hash:
|
||||||
|
# Ensure status is ready (might have been stuck in processing/pending)
|
||||||
|
if not DocumentStatus.is_state(existing_doc.status, DocumentStatus.READY):
|
||||||
|
existing_doc.status = DocumentStatus.ready()
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})"
|
f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})"
|
||||||
)
|
)
|
||||||
continue
|
documents_skipped += 1
|
||||||
else:
|
|
||||||
logger.info(
|
|
||||||
f"Updating existing document {existing_doc.id} for ES doc {doc_id}"
|
|
||||||
)
|
|
||||||
existing_doc.title = title
|
|
||||||
existing_doc.content = content
|
|
||||||
existing_doc.content_hash = content_hash
|
|
||||||
existing_doc.document_metadata = metadata
|
|
||||||
existing_doc.unique_identifier_hash = unique_identifier_hash
|
|
||||||
chunks = await create_document_chunks(content)
|
|
||||||
existing_doc.chunks = chunks
|
|
||||||
existing_doc.updated_at = get_current_timestamp()
|
|
||||||
await session.flush()
|
|
||||||
documents_processed += 1
|
|
||||||
if documents_processed % 10 == 0:
|
|
||||||
await session.commit()
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Create document
|
# Queue existing document for update (will be set to processing in Phase 2)
|
||||||
|
docs_to_process.append({
|
||||||
|
'document': existing_doc,
|
||||||
|
'is_new': False,
|
||||||
|
'doc_id': doc_id,
|
||||||
|
'title': title,
|
||||||
|
'content': content,
|
||||||
|
'content_hash': content_hash,
|
||||||
|
'unique_identifier_hash': unique_identifier_hash,
|
||||||
|
'hit': hit,
|
||||||
|
'source': source,
|
||||||
|
})
|
||||||
|
hits_collected += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Build metadata for new document
|
||||||
|
metadata = {
|
||||||
|
"elasticsearch_id": doc_id,
|
||||||
|
"elasticsearch_index": hit.get("_index", index_name),
|
||||||
|
"elasticsearch_score": hit.get("_score"),
|
||||||
|
"source": "ELASTICSEARCH_CONNECTOR",
|
||||||
|
"connector_id": connector_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add any additional metadata fields specified in config
|
||||||
|
if "ELASTICSEARCH_METADATA_FIELDS" in config:
|
||||||
|
for field in config["ELASTICSEARCH_METADATA_FIELDS"]:
|
||||||
|
if field in source:
|
||||||
|
metadata[f"es_{field}"] = source[field]
|
||||||
|
|
||||||
|
# Create new document with PENDING status (visible in UI immediately)
|
||||||
document = Document(
|
document = Document(
|
||||||
title=title,
|
title=title,
|
||||||
content=content,
|
content="Pending...", # Placeholder until processed
|
||||||
content_hash=content_hash,
|
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||||
unique_identifier_hash=unique_identifier_hash,
|
unique_identifier_hash=unique_identifier_hash,
|
||||||
document_type=DocumentType.ELASTICSEARCH_CONNECTOR,
|
document_type=DocumentType.ELASTICSEARCH_CONNECTOR,
|
||||||
document_metadata=metadata,
|
document_metadata=metadata,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
|
embedding=None,
|
||||||
|
chunks=[], # Empty at creation - safe for async
|
||||||
|
status=DocumentStatus.pending(), # Pending until processing starts
|
||||||
updated_at=get_current_timestamp(),
|
updated_at=get_current_timestamp(),
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create chunks and attach to document (persist via relationship)
|
|
||||||
chunks = await create_document_chunks(content)
|
|
||||||
document.chunks = chunks
|
|
||||||
session.add(document)
|
session.add(document)
|
||||||
await session.flush()
|
new_documents_created = True
|
||||||
|
|
||||||
|
docs_to_process.append({
|
||||||
|
'document': document,
|
||||||
|
'is_new': True,
|
||||||
|
'doc_id': doc_id,
|
||||||
|
'title': title,
|
||||||
|
'content': content,
|
||||||
|
'content_hash': content_hash,
|
||||||
|
'unique_identifier_hash': unique_identifier_hash,
|
||||||
|
'hit': hit,
|
||||||
|
'source': source,
|
||||||
|
})
|
||||||
|
hits_collected += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in Phase 1 for ES doc: {e!s}", exc_info=True)
|
||||||
|
documents_failed += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Commit all pending documents - they all appear in UI now
|
||||||
|
if new_documents_created:
|
||||||
|
logger.info(f"Phase 1: Committing {len([d for d in docs_to_process if d['is_new']])} pending documents")
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# =======================================================================
|
||||||
|
# PHASE 2: Process each document one by one
|
||||||
|
# Each document transitions: pending → processing → ready/failed
|
||||||
|
# =======================================================================
|
||||||
|
logger.info(f"Phase 2: Processing {len(docs_to_process)} documents")
|
||||||
|
|
||||||
|
for item in docs_to_process:
|
||||||
|
# Send heartbeat periodically
|
||||||
|
if on_heartbeat_callback:
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||||
|
await on_heartbeat_callback(documents_processed)
|
||||||
|
last_heartbeat_time = current_time
|
||||||
|
|
||||||
|
document = item['document']
|
||||||
|
try:
|
||||||
|
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||||
|
document.status = DocumentStatus.processing()
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# Build metadata
|
||||||
|
metadata = {
|
||||||
|
"elasticsearch_id": item['doc_id'],
|
||||||
|
"elasticsearch_index": item['hit'].get("_index", index_name),
|
||||||
|
"elasticsearch_score": item['hit'].get("_score"),
|
||||||
|
"indexed_at": datetime.now().isoformat(),
|
||||||
|
"source": "ELASTICSEARCH_CONNECTOR",
|
||||||
|
"connector_id": connector_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add any additional metadata fields specified in config
|
||||||
|
if "ELASTICSEARCH_METADATA_FIELDS" in config:
|
||||||
|
for field in config["ELASTICSEARCH_METADATA_FIELDS"]:
|
||||||
|
if field in item['source']:
|
||||||
|
metadata[f"es_{field}"] = item['source'][field]
|
||||||
|
|
||||||
|
# Create chunks
|
||||||
|
chunks = await create_document_chunks(item['content'])
|
||||||
|
|
||||||
|
# Update document to READY with actual content
|
||||||
|
document.title = item['title']
|
||||||
|
document.content = item['content']
|
||||||
|
document.content_hash = item['content_hash']
|
||||||
|
document.unique_identifier_hash = item['unique_identifier_hash']
|
||||||
|
document.document_metadata = metadata
|
||||||
|
safe_set_chunks(document, chunks)
|
||||||
|
document.updated_at = get_current_timestamp()
|
||||||
|
document.status = DocumentStatus.ready()
|
||||||
|
|
||||||
documents_processed += 1
|
documents_processed += 1
|
||||||
|
|
||||||
|
# Batch commit every 10 documents (for ready status updates)
|
||||||
if documents_processed % 10 == 0:
|
if documents_processed % 10 == 0:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Processed {documents_processed} Elasticsearch documents"
|
f"Committing batch: {documents_processed} Elasticsearch documents processed so far"
|
||||||
)
|
)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
msg = f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}"
|
msg = f"Error processing Elasticsearch document {item.get('doc_id', 'unknown')}: {e}"
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
await task_logger.log_task_failure(
|
# Mark document as failed with reason (visible in UI)
|
||||||
log_entry,
|
try:
|
||||||
"Document processing error",
|
document.status = DocumentStatus.failed(str(e))
|
||||||
msg,
|
document.updated_at = get_current_timestamp()
|
||||||
{
|
except Exception as status_error:
|
||||||
"document_id": hit.get("_id", "unknown"),
|
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||||
"error_type": type(e).__name__,
|
documents_failed += 1
|
||||||
},
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Final commit
|
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
|
||||||
await session.commit()
|
# This ensures the UI shows "Last indexed" instead of "Never indexed"
|
||||||
|
if update_last_indexed:
|
||||||
|
connector.last_indexed_at = (
|
||||||
|
datetime.now(UTC).isoformat().replace("+00:00", "Z")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Final commit for any remaining documents not yet committed in batches
|
||||||
|
logger.info(f"Final commit: Total {documents_processed} Elasticsearch documents processed")
|
||||||
|
try:
|
||||||
|
await session.commit()
|
||||||
|
logger.info("Successfully committed all Elasticsearch document changes to database")
|
||||||
|
except Exception as e:
|
||||||
|
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||||
|
if (
|
||||||
|
"duplicate key value violates unique constraint" in str(e).lower()
|
||||||
|
or "uniqueviolationerror" in str(e).lower()
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
f"Duplicate content_hash detected during final commit. "
|
||||||
|
f"This may occur if the same document was indexed by multiple connectors. "
|
||||||
|
f"Rolling back and continuing. Error: {e!s}"
|
||||||
|
)
|
||||||
|
await session.rollback()
|
||||||
|
# Don't fail the entire task - some documents may have been successfully indexed
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Build warning message if there were issues
|
||||||
|
warning_parts = []
|
||||||
|
if documents_failed > 0:
|
||||||
|
warning_parts.append(f"{documents_failed} failed")
|
||||||
|
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||||
|
|
||||||
await task_logger.log_task_success(
|
await task_logger.log_task_success(
|
||||||
log_entry,
|
log_entry,
|
||||||
f"Successfully indexed {documents_processed} documents from Elasticsearch",
|
f"Successfully indexed {documents_processed} documents from Elasticsearch",
|
||||||
{"documents_indexed": documents_processed, "index": index_name},
|
{
|
||||||
|
"documents_indexed": documents_processed,
|
||||||
|
"documents_skipped": documents_skipped,
|
||||||
|
"documents_failed": documents_failed,
|
||||||
|
"index": index_name,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully indexed {documents_processed} documents from Elasticsearch"
|
f"Elasticsearch indexing completed: {documents_processed} ready, "
|
||||||
|
f"{documents_skipped} skipped, {documents_failed} failed"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update last indexed timestamp if requested
|
return documents_processed, warning_message
|
||||||
if update_last_indexed and documents_processed > 0:
|
|
||||||
# connector.last_indexed_at = datetime.now()
|
|
||||||
connector.last_indexed_at = (
|
|
||||||
datetime.now(UTC).isoformat().replace("+00:00", "Z")
|
|
||||||
)
|
|
||||||
await session.commit()
|
|
||||||
await task_logger.log_task_progress(
|
|
||||||
log_entry,
|
|
||||||
"Updated connector.last_indexed_at",
|
|
||||||
{"last_indexed_at": connector.last_indexed_at},
|
|
||||||
)
|
|
||||||
|
|
||||||
return documents_processed, None
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Clean up Elasticsearch connection
|
# Clean up Elasticsearch connection
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,9 @@
|
||||||
"""
|
"""
|
||||||
Luma connector indexer.
|
Luma connector indexer.
|
||||||
|
|
||||||
|
Implements 2-phase document status updates for real-time UI feedback:
|
||||||
|
- Phase 1: Collect all events and create pending documents (visible in UI immediately)
|
||||||
|
- Phase 2: Process each event: pending → processing → ready/failed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.config import config
|
from app.config import config
|
||||||
from app.connectors.luma_connector import LumaConnector
|
from app.connectors.luma_connector import LumaConnector
|
||||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
from app.services.llm_service import get_user_long_context_llm
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
|
|
@ -27,6 +31,7 @@ from .base import (
|
||||||
get_connector_by_id,
|
get_connector_by_id,
|
||||||
get_current_timestamp,
|
get_current_timestamp,
|
||||||
logger,
|
logger,
|
||||||
|
safe_set_chunks,
|
||||||
update_connector_last_indexed,
|
update_connector_last_indexed,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -227,21 +232,22 @@ async def index_luma_events(
|
||||||
logger.error(f"Error fetching Luma events: {e!s}", exc_info=True)
|
logger.error(f"Error fetching Luma events: {e!s}", exc_info=True)
|
||||||
return 0, f"Error fetching Luma events: {e!s}"
|
return 0, f"Error fetching Luma events: {e!s}"
|
||||||
|
|
||||||
|
# =======================================================================
|
||||||
|
# PHASE 1: Analyze all events, create pending documents
|
||||||
|
# This makes ALL documents visible in the UI immediately with pending status
|
||||||
|
# =======================================================================
|
||||||
documents_indexed = 0
|
documents_indexed = 0
|
||||||
documents_skipped = 0
|
documents_skipped = 0
|
||||||
|
documents_failed = 0
|
||||||
skipped_events = []
|
skipped_events = []
|
||||||
|
|
||||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||||
last_heartbeat_time = time.time()
|
last_heartbeat_time = time.time()
|
||||||
|
|
||||||
|
events_to_process = [] # List of dicts with document and event data
|
||||||
|
new_documents_created = False
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
# Check if it's time for a heartbeat update
|
|
||||||
if (
|
|
||||||
on_heartbeat_callback
|
|
||||||
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
|
|
||||||
):
|
|
||||||
await on_heartbeat_callback(documents_indexed)
|
|
||||||
last_heartbeat_time = time.time()
|
|
||||||
try:
|
try:
|
||||||
# Luma event structure fields - events have nested 'event' field
|
# Luma event structure fields - events have nested 'event' field
|
||||||
event_data = event.get("event", {})
|
event_data = event.get("event", {})
|
||||||
|
|
@ -298,91 +304,34 @@ async def index_luma_events(
|
||||||
if existing_document:
|
if existing_document:
|
||||||
# Document exists - check if content has changed
|
# Document exists - check if content has changed
|
||||||
if existing_document.content_hash == content_hash:
|
if existing_document.content_hash == content_hash:
|
||||||
|
# Ensure status is ready (might have been stuck in processing/pending)
|
||||||
|
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
|
||||||
|
existing_document.status = DocumentStatus.ready()
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Document for Luma event {event_name} unchanged. Skipping."
|
f"Document for Luma event {event_name} unchanged. Skipping."
|
||||||
)
|
)
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
else:
|
|
||||||
# Content has changed - update the existing document
|
|
||||||
logger.info(
|
|
||||||
f"Content changed for Luma event {event_name}. Updating document."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Generate summary with metadata
|
# Queue existing document for update (will be set to processing in Phase 2)
|
||||||
user_llm = await get_user_long_context_llm(
|
events_to_process.append({
|
||||||
session, user_id, search_space_id
|
'document': existing_document,
|
||||||
)
|
'is_new': False,
|
||||||
|
'event_id': event_id,
|
||||||
if user_llm:
|
'event_name': event_name,
|
||||||
document_metadata = {
|
'event_url': event_url,
|
||||||
"event_id": event_id,
|
'event_markdown': event_markdown,
|
||||||
"event_name": event_name,
|
'content_hash': content_hash,
|
||||||
"event_url": event_url,
|
'start_at': start_at,
|
||||||
"start_at": start_at,
|
'end_at': end_at,
|
||||||
"end_at": end_at,
|
'timezone': timezone,
|
||||||
"timezone": timezone,
|
'location': location,
|
||||||
"location": location or "No location",
|
'city': city,
|
||||||
"city": city,
|
'host_names': host_names,
|
||||||
"hosts": host_names,
|
'description': description,
|
||||||
"document_type": "Luma Event",
|
'cover_url': cover_url,
|
||||||
"connector_type": "Luma",
|
})
|
||||||
}
|
continue
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
event_markdown, user_llm, document_metadata
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = f"Luma Event: {event_name}\n\n"
|
|
||||||
if event_url:
|
|
||||||
summary_content += f"URL: {event_url}\n"
|
|
||||||
summary_content += f"Start: {start_at}\n"
|
|
||||||
summary_content += f"End: {end_at}\n"
|
|
||||||
if timezone:
|
|
||||||
summary_content += f"Timezone: {timezone}\n"
|
|
||||||
if location:
|
|
||||||
summary_content += f"Location: {location}\n"
|
|
||||||
if city:
|
|
||||||
summary_content += f"City: {city}\n"
|
|
||||||
if host_names:
|
|
||||||
summary_content += f"Hosts: {host_names}\n"
|
|
||||||
if description:
|
|
||||||
desc_preview = description[:1000]
|
|
||||||
if len(description) > 1000:
|
|
||||||
desc_preview += "..."
|
|
||||||
summary_content += f"Description: {desc_preview}\n"
|
|
||||||
summary_embedding = config.embedding_model_instance.embed(
|
|
||||||
summary_content
|
|
||||||
)
|
|
||||||
|
|
||||||
# Process chunks
|
|
||||||
chunks = await create_document_chunks(event_markdown)
|
|
||||||
|
|
||||||
# Update existing document
|
|
||||||
existing_document.title = event_name
|
|
||||||
existing_document.content = summary_content
|
|
||||||
existing_document.content_hash = content_hash
|
|
||||||
existing_document.embedding = summary_embedding
|
|
||||||
existing_document.document_metadata = {
|
|
||||||
"event_id": event_id,
|
|
||||||
"event_name": event_name,
|
|
||||||
"event_url": event_url,
|
|
||||||
"start_at": start_at,
|
|
||||||
"end_at": end_at,
|
|
||||||
"timezone": timezone,
|
|
||||||
"location": location,
|
|
||||||
"city": city,
|
|
||||||
"hosts": host_names,
|
|
||||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
||||||
}
|
|
||||||
existing_document.chunks = chunks
|
|
||||||
existing_document.updated_at = get_current_timestamp()
|
|
||||||
|
|
||||||
documents_indexed += 1
|
|
||||||
logger.info(f"Successfully updated Luma event {event_name}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Document doesn't exist by unique_identifier_hash
|
# Document doesn't exist by unique_identifier_hash
|
||||||
# Check if a document with the same content_hash exists (from another connector)
|
# Check if a document with the same content_hash exists (from another connector)
|
||||||
|
|
@ -400,59 +349,7 @@ async def index_luma_events(
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Document doesn't exist - create new one
|
# Create new document with PENDING status (visible in UI immediately)
|
||||||
# Generate summary with metadata
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm:
|
|
||||||
document_metadata = {
|
|
||||||
"event_id": event_id,
|
|
||||||
"event_name": event_name,
|
|
||||||
"event_url": event_url,
|
|
||||||
"start_at": start_at,
|
|
||||||
"end_at": end_at,
|
|
||||||
"timezone": timezone,
|
|
||||||
"location": location or "No location",
|
|
||||||
"city": city,
|
|
||||||
"hosts": host_names,
|
|
||||||
"document_type": "Luma Event",
|
|
||||||
"connector_type": "Luma",
|
|
||||||
}
|
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
event_markdown, user_llm, document_metadata
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Fallback to simple summary if no LLM configured
|
|
||||||
summary_content = f"Luma Event: {event_name}\n\n"
|
|
||||||
if event_url:
|
|
||||||
summary_content += f"URL: {event_url}\n"
|
|
||||||
summary_content += f"Start: {start_at}\n"
|
|
||||||
summary_content += f"End: {end_at}\n"
|
|
||||||
if timezone:
|
|
||||||
summary_content += f"Timezone: {timezone}\n"
|
|
||||||
if location:
|
|
||||||
summary_content += f"Location: {location}\n"
|
|
||||||
if city:
|
|
||||||
summary_content += f"City: {city}\n"
|
|
||||||
if host_names:
|
|
||||||
summary_content += f"Hosts: {host_names}\n"
|
|
||||||
if description:
|
|
||||||
desc_preview = description[:1000]
|
|
||||||
if len(description) > 1000:
|
|
||||||
desc_preview += "..."
|
|
||||||
summary_content += f"Description: {desc_preview}\n"
|
|
||||||
|
|
||||||
summary_embedding = config.embedding_model_instance.embed(
|
|
||||||
summary_content
|
|
||||||
)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(event_markdown)
|
|
||||||
|
|
||||||
document = Document(
|
document = Document(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
title=event_name,
|
title=event_name,
|
||||||
|
|
@ -468,23 +365,147 @@ async def index_luma_events(
|
||||||
"city": city,
|
"city": city,
|
||||||
"hosts": host_names,
|
"hosts": host_names,
|
||||||
"cover_url": cover_url,
|
"cover_url": cover_url,
|
||||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
"connector_id": connector_id,
|
||||||
},
|
},
|
||||||
content=summary_content,
|
content="Pending...", # Placeholder until processed
|
||||||
content_hash=content_hash,
|
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||||
unique_identifier_hash=unique_identifier_hash,
|
unique_identifier_hash=unique_identifier_hash,
|
||||||
embedding=summary_embedding,
|
embedding=None,
|
||||||
chunks=chunks,
|
chunks=[], # Empty at creation - safe for async
|
||||||
|
status=DocumentStatus.pending(), # Pending until processing starts
|
||||||
updated_at=get_current_timestamp(),
|
updated_at=get_current_timestamp(),
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
session.add(document)
|
session.add(document)
|
||||||
documents_indexed += 1
|
new_documents_created = True
|
||||||
logger.info(f"Successfully indexed new event {event_name}")
|
|
||||||
|
|
||||||
# Batch commit every 10 documents
|
events_to_process.append({
|
||||||
|
'document': document,
|
||||||
|
'is_new': True,
|
||||||
|
'event_id': event_id,
|
||||||
|
'event_name': event_name,
|
||||||
|
'event_url': event_url,
|
||||||
|
'event_markdown': event_markdown,
|
||||||
|
'content_hash': content_hash,
|
||||||
|
'start_at': start_at,
|
||||||
|
'end_at': end_at,
|
||||||
|
'timezone': timezone,
|
||||||
|
'location': location,
|
||||||
|
'city': city,
|
||||||
|
'host_names': host_names,
|
||||||
|
'description': description,
|
||||||
|
'cover_url': cover_url,
|
||||||
|
})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
|
||||||
|
documents_failed += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Commit all pending documents - they all appear in UI now
|
||||||
|
if new_documents_created:
|
||||||
|
logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents")
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# =======================================================================
|
||||||
|
# PHASE 2: Process each document one by one
|
||||||
|
# Each document transitions: pending → processing → ready/failed
|
||||||
|
# =======================================================================
|
||||||
|
logger.info(f"Phase 2: Processing {len(events_to_process)} documents")
|
||||||
|
|
||||||
|
for item in events_to_process:
|
||||||
|
# Send heartbeat periodically
|
||||||
|
if on_heartbeat_callback:
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||||
|
await on_heartbeat_callback(documents_indexed)
|
||||||
|
last_heartbeat_time = current_time
|
||||||
|
|
||||||
|
document = item['document']
|
||||||
|
try:
|
||||||
|
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||||
|
document.status = DocumentStatus.processing()
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# Heavy processing (LLM, embeddings, chunks)
|
||||||
|
user_llm = await get_user_long_context_llm(
|
||||||
|
session, user_id, search_space_id
|
||||||
|
)
|
||||||
|
|
||||||
|
if user_llm:
|
||||||
|
document_metadata_for_summary = {
|
||||||
|
"event_id": item['event_id'],
|
||||||
|
"event_name": item['event_name'],
|
||||||
|
"event_url": item['event_url'],
|
||||||
|
"start_at": item['start_at'],
|
||||||
|
"end_at": item['end_at'],
|
||||||
|
"timezone": item['timezone'],
|
||||||
|
"location": item['location'] or "No location",
|
||||||
|
"city": item['city'],
|
||||||
|
"hosts": item['host_names'],
|
||||||
|
"document_type": "Luma Event",
|
||||||
|
"connector_type": "Luma",
|
||||||
|
}
|
||||||
|
(
|
||||||
|
summary_content,
|
||||||
|
summary_embedding,
|
||||||
|
) = await generate_document_summary(
|
||||||
|
item['event_markdown'], user_llm, document_metadata_for_summary
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Fallback to simple summary if no LLM configured
|
||||||
|
summary_content = f"Luma Event: {item['event_name']}\n\n"
|
||||||
|
if item['event_url']:
|
||||||
|
summary_content += f"URL: {item['event_url']}\n"
|
||||||
|
summary_content += f"Start: {item['start_at']}\n"
|
||||||
|
summary_content += f"End: {item['end_at']}\n"
|
||||||
|
if item['timezone']:
|
||||||
|
summary_content += f"Timezone: {item['timezone']}\n"
|
||||||
|
if item['location']:
|
||||||
|
summary_content += f"Location: {item['location']}\n"
|
||||||
|
if item['city']:
|
||||||
|
summary_content += f"City: {item['city']}\n"
|
||||||
|
if item['host_names']:
|
||||||
|
summary_content += f"Hosts: {item['host_names']}\n"
|
||||||
|
if item['description']:
|
||||||
|
desc_preview = item['description'][:1000]
|
||||||
|
if len(item['description']) > 1000:
|
||||||
|
desc_preview += "..."
|
||||||
|
summary_content += f"Description: {desc_preview}\n"
|
||||||
|
|
||||||
|
summary_embedding = config.embedding_model_instance.embed(
|
||||||
|
summary_content
|
||||||
|
)
|
||||||
|
|
||||||
|
chunks = await create_document_chunks(item['event_markdown'])
|
||||||
|
|
||||||
|
# Update document to READY with actual content
|
||||||
|
document.title = item['event_name']
|
||||||
|
document.content = summary_content
|
||||||
|
document.content_hash = item['content_hash']
|
||||||
|
document.embedding = summary_embedding
|
||||||
|
document.document_metadata = {
|
||||||
|
"event_id": item['event_id'],
|
||||||
|
"event_name": item['event_name'],
|
||||||
|
"event_url": item['event_url'],
|
||||||
|
"start_at": item['start_at'],
|
||||||
|
"end_at": item['end_at'],
|
||||||
|
"timezone": item['timezone'],
|
||||||
|
"location": item['location'],
|
||||||
|
"city": item['city'],
|
||||||
|
"hosts": item['host_names'],
|
||||||
|
"cover_url": item['cover_url'],
|
||||||
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
"connector_id": connector_id,
|
||||||
|
}
|
||||||
|
safe_set_chunks(document, chunks)
|
||||||
|
document.updated_at = get_current_timestamp()
|
||||||
|
document.status = DocumentStatus.ready()
|
||||||
|
|
||||||
|
documents_indexed += 1
|
||||||
|
|
||||||
|
# Batch commit every 10 documents (for ready status updates)
|
||||||
if documents_indexed % 10 == 0:
|
if documents_indexed % 10 == 0:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Committing batch: {documents_indexed} Luma events processed so far"
|
f"Committing batch: {documents_indexed} Luma events processed so far"
|
||||||
|
|
@ -493,38 +514,69 @@ async def index_luma_events(
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Error processing event {event.get('name', 'Unknown')}: {e!s}",
|
f"Error processing event {item.get('event_name', 'Unknown')}: {e!s}",
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
# Mark document as failed with reason (visible in UI)
|
||||||
|
try:
|
||||||
|
document.status = DocumentStatus.failed(str(e))
|
||||||
|
document.updated_at = get_current_timestamp()
|
||||||
|
except Exception as status_error:
|
||||||
|
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||||
skipped_events.append(
|
skipped_events.append(
|
||||||
f"{event.get('name', 'Unknown')} (processing error)"
|
f"{item.get('event_name', 'Unknown')} (processing error)"
|
||||||
)
|
)
|
||||||
documents_skipped += 1
|
documents_failed += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
total_processed = documents_indexed
|
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
|
||||||
if total_processed > 0:
|
# This ensures the UI shows "Last indexed" instead of "Never indexed"
|
||||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||||
|
|
||||||
# Final commit for any remaining documents not yet committed in batches
|
# Final commit for any remaining documents not yet committed in batches
|
||||||
logger.info(f"Final commit: Total {documents_indexed} Luma events processed")
|
logger.info(f"Final commit: Total {documents_indexed} Luma events processed")
|
||||||
await session.commit()
|
try:
|
||||||
|
await session.commit()
|
||||||
|
logger.info("Successfully committed all Luma document changes to database")
|
||||||
|
except Exception as e:
|
||||||
|
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||||
|
if (
|
||||||
|
"duplicate key value violates unique constraint" in str(e).lower()
|
||||||
|
or "uniqueviolationerror" in str(e).lower()
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
f"Duplicate content_hash detected during final commit. "
|
||||||
|
f"This may occur if the same event was indexed by multiple connectors. "
|
||||||
|
f"Rolling back and continuing. Error: {e!s}"
|
||||||
|
)
|
||||||
|
await session.rollback()
|
||||||
|
# Don't fail the entire task - some documents may have been successfully indexed
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Build warning message if there were issues
|
||||||
|
warning_parts = []
|
||||||
|
if documents_failed > 0:
|
||||||
|
warning_parts.append(f"{documents_failed} failed")
|
||||||
|
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||||
|
|
||||||
await task_logger.log_task_success(
|
await task_logger.log_task_success(
|
||||||
log_entry,
|
log_entry,
|
||||||
f"Successfully completed Luma indexing for connector {connector_id}",
|
f"Successfully completed Luma indexing for connector {connector_id}",
|
||||||
{
|
{
|
||||||
"events_processed": total_processed,
|
"events_processed": documents_indexed,
|
||||||
"documents_indexed": documents_indexed,
|
"documents_indexed": documents_indexed,
|
||||||
"documents_skipped": documents_skipped,
|
"documents_skipped": documents_skipped,
|
||||||
|
"documents_failed": documents_failed,
|
||||||
"skipped_events_count": len(skipped_events),
|
"skipped_events_count": len(skipped_events),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Luma indexing completed: {documents_indexed} new events, {documents_skipped} skipped"
|
f"Luma indexing completed: {documents_indexed} ready, "
|
||||||
|
f"{documents_skipped} skipped, {documents_failed} failed"
|
||||||
)
|
)
|
||||||
return total_processed, None
|
return documents_indexed, warning_message
|
||||||
|
|
||||||
except SQLAlchemyError as db_error:
|
except SQLAlchemyError as db_error:
|
||||||
await session.rollback()
|
await session.rollback()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue