feat: implement two-phase document indexing for Airtable and Notion connectors with real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 00:12:48 +05:30
parent 5042fbfb85
commit 3bbac0d4ea
2 changed files with 416 additions and 353 deletions

View file

@ -1,5 +1,9 @@
"""
Airtable connector indexer.
Implements real-time document status updates using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (pending processing ready/failed)
"""
import time
@ -10,7 +14,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.airtable_history import AirtableHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -27,6 +31,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -134,24 +139,30 @@ async def index_airtable_records(
await task_logger.log_task_success(
log_entry, success_msg, {"bases_count": 0}
)
return 0, success_msg
# CRITICAL: Update timestamp even when no bases found so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found
logger.info(f"Found {len(bases)} Airtable bases to process")
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
total_documents_indexed = 0
# Process each base
# Track overall statistics
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
# =======================================================================
# PHASE 1: Collect all records and create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
records_to_process = [] # List of dicts with document and record data
new_documents_created = False
for base in bases:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(total_documents_indexed)
last_heartbeat_time = time.time()
base_id = base.get("id")
base_name = base.get("name", "Unknown Base")
@ -201,7 +212,6 @@ async def index_airtable_records(
max_records=max_records,
)
)
else:
# Fetch all records
records, records_error = airtable_connector.get_all_records(
@ -222,21 +232,14 @@ async def index_airtable_records(
logger.info(f"Found {len(records)} records in table {table_name}")
documents_indexed = 0
skipped_messages = []
documents_skipped = 0
# Process each record
# Phase 1: Analyze each record and create pending documents
for record in records:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(total_documents_indexed)
last_heartbeat_time = time.time()
try:
record_id = record.get("id", "")
if not record_id:
documents_skipped += 1
continue
# Generate markdown content
markdown_content = (
airtable_connector.format_record_to_markdown(
@ -246,16 +249,11 @@ async def index_airtable_records(
if not markdown_content.strip():
logger.warning(
f"Skipping message with no content: {record.get('id')}"
)
skipped_messages.append(
f"{record.get('id')} (no content)"
f"Skipping record with no content: {record_id}"
)
documents_skipped += 1
continue
record_id = record.get("id", "Unknown")
# Generate unique identifier hash for this Airtable record
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.AIRTABLE_CONNECTOR,
@ -278,75 +276,24 @@ async def index_airtable_records(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Airtable record {record_id} unchanged. Skipping."
)
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Airtable record {record_id}. Updating document."
)
# Generate document summary
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"record_id": record_id,
"created_time": record.get(
"CREATED_TIME()", ""
),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content,
user_llm,
document_metadata,
)
else:
summary_content = (
f"Airtable Record: {record_id}\n\n"
)
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
)
)
# Process chunks
chunks = await create_document_chunks(
markdown_content
)
# Update existing document
existing_document.title = record_id
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"record_id": record_id,
"created_time": record.get(
"CREATED_TIME()", ""
),
}
existing_document.chunks = chunks
existing_document.updated_at = (
get_current_timestamp()
)
documents_indexed += 1
logger.info(
f"Successfully updated Airtable record {record_id}"
)
continue
# Queue existing document for update (will be set to processing in Phase 2)
records_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'record_id': record_id,
'record': record,
'base_name': base_name,
'table_name': table_name,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -363,44 +310,11 @@ async def index_airtable_records(
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate document summary
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"record_id": record_id,
"created_time": record.get("CREATED_TIME()", ""),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Airtable Record: {record_id}\n\n"
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
)
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Create and store new document
logger.info(
f"Creating new document for Airtable record: {record_id}"
)
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=record_id,
@ -408,78 +322,181 @@ async def index_airtable_records(
document_metadata={
"record_id": record_id,
"created_time": record.get("CREATED_TIME()", ""),
"base_name": base_name,
"table_name": table_name,
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(
f"Successfully indexed new Airtable record {summary_content}"
)
new_documents_created = True
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Airtable records processed so far"
)
await session.commit()
records_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'record_id': record_id,
'record': record,
'base_name': base_name,
'table_name': table_name,
})
except Exception as e:
logger.error(
f"Error processing the Airtable record {record.get('id', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_messages.append(
f"{record.get('id', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this message and continue with others
logger.error(f"Error in Phase 1 for record: {e!s}", exc_info=True)
documents_failed += 1
continue
# Accumulate total processed across all tables
total_processed += documents_indexed
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([r for r in records_to_process if r['is_new']])} pending documents")
await session.commit()
# Final commit for any remaining documents not yet committed in batches
if documents_indexed > 0:
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(records_to_process)} documents")
for item in records_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata_for_summary = {
"record_id": item['record_id'],
"created_time": item['record'].get("CREATED_TIME()", ""),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Airtable Record: {item['record_id']}\n\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
# Update document to READY with actual content
document.title = item['record_id']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"record_id": item['record_id'],
"created_time": item['record'].get("CREATED_TIME()", ""),
"base_name": item['base_name'],
"table_name": item['table_name'],
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Final commit for table {table_name}: {documents_indexed} Airtable records processed"
f"Committing batch: {documents_indexed} Airtable records processed so far"
)
await session.commit()
logger.info(
f"Successfully committed all Airtable document changes for table {table_name}"
)
# Update the last_indexed_at timestamp for the connector only if requested
# (after all tables in all bases are processed)
if total_processed > 0:
await update_connector_last_indexed(
session, connector, update_last_indexed
except Exception as e:
logger.error(f"Error processing Airtable record: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
total_processed = documents_indexed
# Final commit to ensure all documents are persisted (safety net)
logger.info(f"Final commit: Total {documents_indexed} Airtable records processed")
try:
await session.commit()
logger.info(
"Successfully committed all Airtable document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same record was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success after processing all bases and tables
await task_logger.log_task_success(
log_entry,
f"Successfully completed Airtable indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Airtable indexing completed: {total_processed} total records processed"
f"Airtable indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
warning_message,
)
except Exception as e:
logger.error(

View file

@ -1,5 +1,9 @@
"""
Notion connector indexer.
Implements real-time document status updates using a two-phase approach:
- Phase 1: Create all documents with PENDING status (visible in UI immediately)
- Phase 2: Process each document one by one (pending processing ready/failed)
"""
import time
@ -9,8 +13,9 @@ from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.notion_history import NotionHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -28,6 +33,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -214,12 +220,17 @@ async def index_notion_pages(
{"pages_found": 0},
)
logger.info("No Notion pages found to index")
# CRITICAL: Update timestamp even when no pages found so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
await notion_client.close()
return 0, None # Success with 0 pages, not an error
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
skipped_pages = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -231,22 +242,69 @@ async def index_notion_pages(
{"stage": "process_pages", "total_pages": len(pages)},
)
# Process each page
for page in pages:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all pages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
pages_to_process = [] # List of dicts with document and page data
new_documents_created = False
# Helper function to convert page content to markdown
def process_blocks(blocks, level=0):
result = ""
for block in blocks:
block_type = block.get("type")
block_content = block.get("content", "")
children = block.get("children", [])
# Add indentation based on level
indent = " " * level
# Format based on block type
if block_type in ["paragraph", "text"]:
result += f"{indent}{block_content}\n\n"
elif block_type in ["heading_1", "header"]:
result += f"{indent}# {block_content}\n\n"
elif block_type == "heading_2":
result += f"{indent}## {block_content}\n\n"
elif block_type == "heading_3":
result += f"{indent}### {block_content}\n\n"
elif block_type == "bulleted_list_item":
result += f"{indent}* {block_content}\n"
elif block_type == "numbered_list_item":
result += f"{indent}1. {block_content}\n"
elif block_type == "to_do":
result += f"{indent}- [ ] {block_content}\n"
elif block_type == "toggle":
result += f"{indent}> {block_content}\n"
elif block_type == "code":
result += f"{indent}```\n{block_content}\n```\n\n"
elif block_type == "quote":
result += f"{indent}> {block_content}\n\n"
elif block_type == "callout":
result += f"{indent}> **Note:** {block_content}\n\n"
elif block_type == "image":
result += f"{indent}![Image]({block_content})\n\n"
else:
# Default for other block types
if block_content:
result += f"{indent}{block_content}\n\n"
# Process children recursively
if children:
result += process_blocks(children, level + 1)
return result
for page in pages:
try:
page_id = page.get("page_id")
page_title = page.get("title", f"Untitled page ({page_id})")
page_content = page.get("content", [])
logger.info(f"Processing Notion page: {page_title} ({page_id})")
if not page_id:
documents_skipped += 1
continue
if not page_content:
logger.info(f"No content found in page {page_title}. Skipping.")
@ -256,57 +314,6 @@ async def index_notion_pages(
# Convert page content to markdown format
markdown_content = f"# Notion Page: {page_title}\n\n"
# Process blocks recursively
def process_blocks(blocks, level=0):
result = ""
for block in blocks:
block_type = block.get("type")
block_content = block.get("content", "")
children = block.get("children", [])
# Add indentation based on level
indent = " " * level
# Format based on block type
if block_type in ["paragraph", "text"]:
result += f"{indent}{block_content}\n\n"
elif block_type in ["heading_1", "header"]:
result += f"{indent}# {block_content}\n\n"
elif block_type == "heading_2":
result += f"{indent}## {block_content}\n\n"
elif block_type == "heading_3":
result += f"{indent}### {block_content}\n\n"
elif block_type == "bulleted_list_item":
result += f"{indent}* {block_content}\n"
elif block_type == "numbered_list_item":
result += f"{indent}1. {block_content}\n"
elif block_type == "to_do":
result += f"{indent}- [ ] {block_content}\n"
elif block_type == "toggle":
result += f"{indent}> {block_content}\n"
elif block_type == "code":
result += f"{indent}```\n{block_content}\n```\n\n"
elif block_type == "quote":
result += f"{indent}> {block_content}\n\n"
elif block_type == "callout":
result += f"{indent}> **Note:** {block_content}\n\n"
elif block_type == "image":
result += f"{indent}![Image]({block_content})\n\n"
else:
# Default for other block types
if block_content:
result += f"{indent}{block_content}\n\n"
# Process children recursively
if children:
result += process_blocks(children, level + 1)
return result
logger.debug(
f"Converting {len(page_content)} blocks to markdown for page {page_title}"
)
markdown_content += process_blocks(page_content)
# Format document metadata
@ -346,71 +353,22 @@ async def index_notion_pages(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Notion page {page_title} unchanged. Skipping."
)
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Notion page {page_title}. Updating document."
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if not user_llm:
logger.error(
f"No long context LLM configured for user {user_id}"
)
skipped_pages.append(f"{page_title} (no LLM configured)")
documents_skipped += 1
continue
# Generate summary with metadata
document_metadata = {
"page_title": page_title,
"page_id": page_id,
"document_type": "Notion Page",
"connector_type": "Notion",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Update existing document
existing_document.title = page_title
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"page_title": page_title,
"page_id": page_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
existing_document.connector_id = connector_id
documents_indexed += 1
logger.info(f"Successfully updated Notion page: {page_title}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} documents processed so far"
)
await session.commit()
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -425,37 +383,11 @@ async def index_notion_pages(
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if not user_llm:
logger.error(f"No long context LLM configured for user {user_id}")
skipped_pages.append(f"{page_title} (no LLM configured)")
documents_skipped += 1
continue
# Generate summary with metadata
logger.debug(f"Generating summary for page {page_title}")
document_metadata = {
"page_title": page_title,
"page_id": page_id,
"document_type": "Notion Page",
"connector_type": "Notion",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
# Process chunks
logger.debug(f"Chunking content for page {page_title}")
chunks = await create_document_chunks(markdown_content)
# Create and store new document
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=page_title,
@ -463,53 +395,159 @@ async def index_notion_pages(
document_metadata={
"page_title": page_title,
"page_id": page_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new Notion page: {page_title}")
new_documents_created = True
# Batch commit every 10 documents
pages_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
})
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(pages_to_process)} documents")
for item in pages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata_for_summary = {
"page_title": item['page_title'],
"page_id": item['page_id'],
"document_type": "Notion Page",
"connector_type": "Notion",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Notion Page: {item['page_title']}\n\n{item['markdown_content'][:500]}..."
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
# Update document to READY with actual content
document.title = item['page_title']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"page_title": item['page_title'],
"page_id": item['page_id'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} documents processed so far"
f"Committing batch: {documents_indexed} Notion pages processed so far"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing Notion page {page.get('title', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_pages.append(
f"{page.get('title', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this page and continue with others
logger.error(f"Error processing Notion page: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
skipped_pages.append(f"{item['page_title']} (processing error)")
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one page
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
# Final commit to ensure all documents are persisted (safety net)
logger.info(f"Final commit: Total {documents_indexed} documents processed")
await session.commit()
try:
await session.commit()
logger.info(
"Successfully committed all Notion document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same page was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Get final count of pages with skipped Notion AI content
pages_with_skipped_ai_content = notion_client.get_skipped_content_count()
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Prepare result message with user-friendly notification about skipped content
result_message = None
if skipped_pages:
@ -532,6 +570,8 @@ async def index_notion_pages(
"pages_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_pages_count": len(skipped_pages),
"pages_with_skipped_ai_content": pages_with_skipped_ai_content,
"result_message": result_message,
@ -539,7 +579,9 @@ async def index_notion_pages(
)
logger.info(
f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
f"Notion indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
# Clean up the async client
@ -559,6 +601,10 @@ async def index_notion_pages(
"Using legacy token. Reconnect with OAuth for better reliability."
)
# Include warning message if there were issues
if warning_message:
notification_parts.append(warning_message)
user_notification_message = (
" ".join(notification_parts) if notification_parts else None
)