chore: ran linting

This commit is contained in:
Anish Sarkar 2026-02-06 05:35:15 +05:30
parent 00a617ef17
commit aa66928154
44 changed files with 2025 additions and 1658 deletions

View file

@ -548,11 +548,11 @@ def process_file_upload_with_document_task(
):
"""
Celery task to process uploaded file with existing pending document.
This task is used by the 2-phase document upload flow:
- Phase 1 (API): Creates pending document (visible in UI immediately)
- Phase 2 (this task): Updates document status: pending processing ready/failed
Args:
document_id: ID of the pending document created in Phase 1
temp_path: Path to the uploaded file
@ -634,7 +634,7 @@ async def _process_file_with_document(
):
"""
Process file and update existing pending document status.
This function implements Phase 2 of the 2-phase document upload:
- Sets document status to 'processing' (shows spinner in UI)
- Processes the file (parsing, embedding, chunking)
@ -669,11 +669,15 @@ async def _process_file_with_document(
file_size = os.path.getsize(temp_path)
logger.info(f"[_process_file_with_document] File size: {file_size} bytes")
except Exception as e:
logger.warning(f"[_process_file_with_document] Could not get file size: {e}")
logger.warning(
f"[_process_file_with_document] Could not get file size: {e}"
)
file_size = None
# Create notification for document processing
logger.info(f"[_process_file_with_document] Creating notification for: {filename}")
logger.info(
f"[_process_file_with_document] Creating notification for: {filename}"
)
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
@ -822,7 +826,9 @@ async def _process_file_with_document(
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
logger.info(f"[_process_file_with_document] Cleaned up temp file: {temp_path}")
logger.info(
f"[_process_file_with_document] Cleaned up temp file: {temp_path}"
)
except Exception as cleanup_error:
logger.warning(
f"[_process_file_with_document] Failed to clean up temp file: {cleanup_error}"

View file

@ -154,9 +154,7 @@ async def _cleanup_stale_notifications():
f"Found {len(stale_notification_ids)} stale connector indexing notifications "
f"(no Redis heartbeat key): {stale_notification_ids}"
)
logger.info(
f"Connector IDs for document cleanup: {stale_connector_ids}"
)
logger.info(f"Connector IDs for document cleanup: {stale_connector_ids}")
# O(1) Batch UPDATE notifications using JSONB || operator
# This merges the update data into existing notification_metadata

View file

@ -140,7 +140,9 @@ async def index_airtable_records(
log_entry, success_msg, {"bases_count": 0}
)
# CRITICAL: Update timestamp even when no bases found so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
return 0, None # Return None (not error) when no items found
@ -277,22 +279,28 @@ async def index_airtable_records(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = (
DocumentStatus.ready()
)
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
records_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'record_id': record_id,
'record': record,
'base_name': base_name,
'table_name': table_name,
})
records_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"record_id": record_id,
"record": record,
"base_name": base_name,
"table_name": table_name,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -339,25 +347,31 @@ async def index_airtable_records(
session.add(document)
new_documents_created = True
records_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'record_id': record_id,
'record': record,
'base_name': base_name,
'table_name': table_name,
})
records_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"record_id": record_id,
"record": record,
"base_name": base_name,
"table_name": table_name,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for record: {e!s}", exc_info=True)
logger.error(
f"Error in Phase 1 for record: {e!s}", exc_info=True
)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([r for r in records_to_process if r['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([r for r in records_to_process if r['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -374,7 +388,7 @@ async def index_airtable_records(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -387,13 +401,18 @@ async def index_airtable_records(
if user_llm:
document_metadata_for_summary = {
"record_id": item['record_id'],
"created_time": item['record'].get("CREATED_TIME()", ""),
"record_id": item["record_id"],
"created_time": item["record"].get("CREATED_TIME()", ""),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
# Fallback to simple summary if no LLM configured
@ -402,18 +421,18 @@ async def index_airtable_records(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item['record_id']
document.title = item["record_id"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"record_id": item['record_id'],
"created_time": item['record'].get("CREATED_TIME()", ""),
"base_name": item['base_name'],
"table_name": item['table_name'],
"record_id": item["record_id"],
"created_time": item["record"].get("CREATED_TIME()", ""),
"base_name": item["base_name"],
"table_name": item["table_name"],
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
@ -430,13 +449,17 @@ async def index_airtable_records(
await session.commit()
except Exception as e:
logger.error(f"Error processing Airtable record: {e!s}", exc_info=True)
logger.error(
f"Error processing Airtable record: {e!s}", exc_info=True
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
@ -446,7 +469,9 @@ async def index_airtable_records(
total_processed = documents_indexed
# Final commit to ensure all documents are persisted (safety net)
logger.info(f"Final commit: Total {documents_indexed} Airtable records processed")
logger.info(
f"Final commit: Total {documents_indexed} Airtable records processed"
)
try:
await session.commit()
logger.info(

View file

@ -31,29 +31,30 @@ def get_current_timestamp() -> datetime:
def safe_set_chunks(document: Document, chunks: list) -> None:
"""
Safely assign chunks to a document without triggering lazy loading.
ALWAYS use this instead of `document.chunks = chunks` to avoid
SQLAlchemy async errors (MissingGreenlet / greenlet_spawn).
Why this is needed:
- Direct assignment `document.chunks = chunks` triggers SQLAlchemy to
load the OLD chunks first (for comparison/orphan detection)
- This lazy loading fails in async context with asyncpg driver
- set_committed_value bypasses this by setting the value directly
This function is safe regardless of how the document was loaded
(with or without selectinload).
Args:
document: The Document object to update
chunks: List of Chunk objects to assign
Example:
# Instead of: document.chunks = chunks (DANGEROUS!)
safe_set_chunks(document, chunks) # Always safe
"""
from sqlalchemy.orm.attributes import set_committed_value
set_committed_value(document, 'chunks', chunks)
set_committed_value(document, "chunks", chunks)
async def check_duplicate_document_by_hash(

View file

@ -261,7 +261,9 @@ async def index_bookstack_pages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for BookStack page {page_name} unchanged. Skipping."
@ -270,20 +272,22 @@ async def index_bookstack_pages(
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append({
'document': existing_document,
'is_new': False,
'page_id': page_id,
'page_name': page_name,
'page_slug': page_slug,
'book_id': book_id,
'book_slug': book_slug,
'chapter_id': chapter_id,
'page_url': page_url,
'page_content': page_content,
'full_content': full_content,
'content_hash': content_hash,
})
pages_to_process.append(
{
"document": existing_document,
"is_new": False,
"page_id": page_id,
"page_name": page_name,
"page_slug": page_slug,
"book_id": book_id,
"book_slug": book_slug,
"chapter_id": chapter_id,
"page_url": page_url,
"page_content": page_content,
"full_content": full_content,
"content_hash": content_hash,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -331,20 +335,22 @@ async def index_bookstack_pages(
session.add(document)
new_documents_created = True
pages_to_process.append({
'document': document,
'is_new': True,
'page_id': page_id,
'page_name': page_name,
'page_slug': page_slug,
'book_id': book_id,
'book_slug': book_slug,
'chapter_id': chapter_id,
'page_url': page_url,
'page_content': page_content,
'full_content': full_content,
'content_hash': content_hash,
})
pages_to_process.append(
{
"document": document,
"is_new": True,
"page_id": page_id,
"page_name": page_name,
"page_slug": page_slug,
"book_id": book_id,
"book_slug": book_slug,
"chapter_id": chapter_id,
"page_url": page_url,
"page_content": page_content,
"full_content": full_content,
"content_hash": content_hash,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
@ -353,7 +359,9 @@ async def index_bookstack_pages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -370,7 +378,7 @@ async def index_bookstack_pages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -383,23 +391,23 @@ async def index_bookstack_pages(
# Build document metadata
doc_metadata = {
"page_id": item['page_id'],
"page_name": item['page_name'],
"page_slug": item['page_slug'],
"book_id": item['book_id'],
"book_slug": item['book_slug'],
"chapter_id": item['chapter_id'],
"page_id": item["page_id"],
"page_name": item["page_name"],
"page_slug": item["page_slug"],
"book_id": item["book_id"],
"book_slug": item["book_slug"],
"chapter_id": item["chapter_id"],
"base_url": bookstack_base_url,
"page_url": item['page_url'],
"page_url": item["page_url"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
if user_llm:
summary_metadata = {
"page_name": item['page_name'],
"page_id": item['page_id'],
"book_id": item['book_id'],
"page_name": item["page_name"],
"page_id": item["page_id"],
"book_id": item["book_id"],
"document_type": "BookStack Page",
"connector_type": "BookStack",
}
@ -407,17 +415,15 @@ async def index_bookstack_pages(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['full_content'], user_llm, summary_metadata
item["full_content"], user_llm, summary_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = (
f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n"
)
if item['page_content']:
summary_content = f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n"
if item["page_content"]:
# Take first 1000 characters of content for summary
content_preview = item['page_content'][:1000]
if len(item['page_content']) > 1000:
content_preview = item["page_content"][:1000]
if len(item["page_content"]) > 1000:
content_preview += "..."
summary_content += f"Content Preview: {content_preview}\n\n"
summary_embedding = config.embedding_model_instance.embed(
@ -425,12 +431,12 @@ async def index_bookstack_pages(
)
# Process chunks - using the full page content
chunks = await create_document_chunks(item['full_content'])
chunks = await create_document_chunks(item["full_content"])
# Update document to READY with actual content
document.title = item['page_name']
document.title = item["page_name"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = doc_metadata
safe_set_chunks(document, chunks)
@ -456,7 +462,9 @@ async def index_bookstack_pages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
skipped_pages.append(
f"{item.get('page_name', 'Unknown')} (processing error)"
)
@ -473,7 +481,9 @@ async def index_bookstack_pages(
)
try:
await session.commit()
logger.info("Successfully committed all BookStack document changes to database")
logger.info(
"Successfully committed all BookStack document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (

View file

@ -260,7 +260,9 @@ async def index_clickup_tasks(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for ClickUp task {task_name} unchanged. Skipping."
@ -272,22 +274,24 @@ async def index_clickup_tasks(
logger.info(
f"Content changed for ClickUp task {task_name}. Queuing for update."
)
tasks_to_process.append({
'document': existing_document,
'is_new': False,
'task_content': task_content,
'content_hash': content_hash,
'task_id': task_id,
'task_name': task_name,
'task_status': task_status,
'task_priority': task_priority,
'task_list_name': task_list_name,
'task_space_name': task_space_name,
'task_assignees': task_assignees,
'task_due_date': task_due_date,
'task_created': task_created,
'task_updated': task_updated,
})
tasks_to_process.append(
{
"document": existing_document,
"is_new": False,
"task_content": task_content,
"content_hash": content_hash,
"task_id": task_id,
"task_name": task_name,
"task_status": task_status,
"task_priority": task_priority,
"task_list_name": task_list_name,
"task_space_name": task_space_name,
"task_assignees": task_assignees,
"task_due_date": task_due_date,
"task_created": task_created,
"task_updated": task_updated,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -335,22 +339,24 @@ async def index_clickup_tasks(
session.add(document)
new_documents_created = True
tasks_to_process.append({
'document': document,
'is_new': True,
'task_content': task_content,
'content_hash': content_hash,
'task_id': task_id,
'task_name': task_name,
'task_status': task_status,
'task_priority': task_priority,
'task_list_name': task_list_name,
'task_space_name': task_space_name,
'task_assignees': task_assignees,
'task_due_date': task_due_date,
'task_created': task_created,
'task_updated': task_updated,
})
tasks_to_process.append(
{
"document": document,
"is_new": True,
"task_content": task_content,
"content_hash": content_hash,
"task_id": task_id,
"task_name": task_name,
"task_status": task_status,
"task_priority": task_priority,
"task_list_name": task_list_name,
"task_space_name": task_space_name,
"task_assignees": task_assignees,
"task_due_date": task_due_date,
"task_created": task_created,
"task_updated": task_updated,
}
)
except Exception as e:
logger.error(
@ -362,7 +368,9 @@ async def index_clickup_tasks(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([t for t in tasks_to_process if t['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([t for t in tasks_to_process if t['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -379,7 +387,7 @@ async def index_clickup_tasks(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -392,13 +400,13 @@ async def index_clickup_tasks(
if user_llm:
document_metadata_for_summary = {
"task_id": item['task_id'],
"task_name": item['task_name'],
"task_status": item['task_status'],
"task_priority": item['task_priority'],
"task_list": item['task_list_name'],
"task_space": item['task_space_name'],
"assignees": len(item['task_assignees']),
"task_id": item["task_id"],
"task_name": item["task_name"],
"task_status": item["task_status"],
"task_priority": item["task_priority"],
"task_list": item["task_list_name"],
"task_space": item["task_space_name"],
"assignees": len(item["task_assignees"]),
"document_type": "ClickUp Task",
"connector_type": "ClickUp",
}
@ -406,30 +414,30 @@ async def index_clickup_tasks(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['task_content'], user_llm, document_metadata_for_summary
item["task_content"], user_llm, document_metadata_for_summary
)
else:
summary_content = item['task_content']
summary_content = item["task_content"]
summary_embedding = config.embedding_model_instance.embed(
item['task_content']
item["task_content"]
)
chunks = await create_document_chunks(item['task_content'])
chunks = await create_document_chunks(item["task_content"])
# Update document to READY with actual content
document.title = item['task_name']
document.title = item["task_name"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"task_id": item['task_id'],
"task_name": item['task_name'],
"task_status": item['task_status'],
"task_priority": item['task_priority'],
"task_assignees": item['task_assignees'],
"task_due_date": item['task_due_date'],
"task_created": item['task_created'],
"task_updated": item['task_updated'],
"task_id": item["task_id"],
"task_name": item["task_name"],
"task_status": item["task_status"],
"task_priority": item["task_priority"],
"task_assignees": item["task_assignees"],
"task_due_date": item["task_due_date"],
"task_created": item["task_created"],
"task_updated": item["task_updated"],
"connector_id": connector_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
@ -456,7 +464,9 @@ async def index_clickup_tasks(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue

View file

@ -262,23 +262,27 @@ async def index_confluence_pages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append({
'document': existing_document,
'is_new': False,
'full_content': full_content,
'page_content': page_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
'space_id': space_id,
'comment_count': comment_count,
})
pages_to_process.append(
{
"document": existing_document,
"is_new": False,
"full_content": full_content,
"page_content": page_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -323,17 +327,19 @@ async def index_confluence_pages(
session.add(document)
new_documents_created = True
pages_to_process.append({
'document': document,
'is_new': True,
'full_content': full_content,
'page_content': page_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
'space_id': space_id,
'comment_count': comment_count,
})
pages_to_process.append(
{
"document": document,
"is_new": True,
"full_content": full_content,
"page_content": page_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
@ -342,7 +348,9 @@ async def index_confluence_pages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -359,7 +367,7 @@ async def index_confluence_pages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -372,10 +380,10 @@ async def index_confluence_pages(
if user_llm:
document_metadata = {
"page_title": item['page_title'],
"page_id": item['page_id'],
"space_id": item['space_id'],
"comment_count": item['comment_count'],
"page_title": item["page_title"],
"page_id": item["page_id"],
"space_id": item["space_id"],
"comment_count": item["comment_count"],
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
@ -383,17 +391,15 @@ async def index_confluence_pages(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['full_content'], user_llm, document_metadata
item["full_content"], user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = (
f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n"
)
if item['page_content']:
summary_content = f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n"
if item["page_content"]:
# Take first 1000 characters of content for summary
content_preview = item['page_content'][:1000]
if len(item['page_content']) > 1000:
content_preview = item["page_content"][:1000]
if len(item["page_content"]) > 1000:
content_preview += "..."
summary_content += f"Content Preview: {content_preview}\n\n"
summary_content += f"Comments: {item['comment_count']}"
@ -402,18 +408,18 @@ async def index_confluence_pages(
)
# Process chunks - using the full page content with comments
chunks = await create_document_chunks(item['full_content'])
chunks = await create_document_chunks(item["full_content"])
# Update document to READY with actual content
document.title = item['page_title']
document.title = item["page_title"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"page_id": item['page_id'],
"page_title": item['page_title'],
"space_id": item['space_id'],
"comment_count": item['comment_count'],
"page_id": item["page_id"],
"page_title": item["page_title"],
"space_id": item["space_id"],
"comment_count": item["comment_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -440,7 +446,9 @@ async def index_confluence_pages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue # Skip this page and continue with others

View file

@ -352,9 +352,7 @@ async def index_discord_messages(
try:
channels = await discord_client.get_text_channels(guild_id)
if not channels:
logger.info(
f"No channels found in guild {guild_name}. Skipping."
)
logger.info(f"No channels found in guild {guild_name}. Skipping.")
skipped_channels.append(f"{guild_name} (no channels)")
else:
for channel in channels:
@ -456,25 +454,31 @@ async def index_discord_messages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = (
DocumentStatus.ready()
)
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'guild_name': guild_name,
'guild_id': guild_id,
'channel_name': channel_name,
'channel_id': channel_id,
'message_id': msg_id,
'message_timestamp': msg_timestamp,
'message_user_name': msg_user_name,
})
messages_to_process.append(
{
"document": existing_document,
"is_new": False,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -522,19 +526,21 @@ async def index_discord_messages(
session.add(document)
new_documents_created = True
messages_to_process.append({
'document': document,
'is_new': True,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'guild_name': guild_name,
'guild_id': guild_id,
'channel_name': channel_name,
'channel_id': channel_id,
'message_id': msg_id,
'message_timestamp': msg_timestamp,
'message_user_name': msg_user_name,
})
messages_to_process.append(
{
"document": document,
"is_new": True,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
}
)
except Exception as e:
logger.error(
@ -547,7 +553,9 @@ async def index_discord_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -564,31 +572,31 @@ async def index_discord_messages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (embeddings, chunks)
chunks = await create_document_chunks(item['combined_document_string'])
chunks = await create_document_chunks(item["combined_document_string"])
doc_embedding = config.embedding_model_instance.embed(
item['combined_document_string']
item["combined_document_string"]
)
# Update document to READY with actual content
document.title = f"{item['guild_name']}#{item['channel_name']}"
document.content = item['combined_document_string']
document.content_hash = item['content_hash']
document.content = item["combined_document_string"]
document.content_hash = item["content_hash"]
document.embedding = doc_embedding
document.document_metadata = {
"guild_name": item['guild_name'],
"guild_id": item['guild_id'],
"channel_name": item['channel_name'],
"channel_id": item['channel_id'],
"message_id": item['message_id'],
"message_timestamp": item['message_timestamp'],
"message_user_name": item['message_user_name'],
"guild_name": item["guild_name"],
"guild_id": item["guild_id"],
"channel_name": item["channel_name"],
"channel_id": item["channel_id"],
"message_id": item["message_id"],
"message_timestamp": item["message_timestamp"],
"message_user_name": item["message_user_name"],
"indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -612,7 +620,9 @@ async def index_discord_messages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue

View file

@ -253,7 +253,9 @@ async def index_elasticsearch_documents(
# If content is unchanged, skip. Otherwise queue for update.
if existing_doc.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_doc.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_doc.status, DocumentStatus.READY
):
existing_doc.status = DocumentStatus.ready()
logger.info(
f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})"
@ -262,17 +264,19 @@ async def index_elasticsearch_documents(
continue
# Queue existing document for update (will be set to processing in Phase 2)
docs_to_process.append({
'document': existing_doc,
'is_new': False,
'doc_id': doc_id,
'title': title,
'content': content,
'content_hash': content_hash,
'unique_identifier_hash': unique_identifier_hash,
'hit': hit,
'source': source,
})
docs_to_process.append(
{
"document": existing_doc,
"is_new": False,
"doc_id": doc_id,
"title": title,
"content": content,
"content_hash": content_hash,
"unique_identifier_hash": unique_identifier_hash,
"hit": hit,
"source": source,
}
)
hits_collected += 1
continue
@ -310,17 +314,19 @@ async def index_elasticsearch_documents(
session.add(document)
new_documents_created = True
docs_to_process.append({
'document': document,
'is_new': True,
'doc_id': doc_id,
'title': title,
'content': content,
'content_hash': content_hash,
'unique_identifier_hash': unique_identifier_hash,
'hit': hit,
'source': source,
})
docs_to_process.append(
{
"document": document,
"is_new": True,
"doc_id": doc_id,
"title": title,
"content": content,
"content_hash": content_hash,
"unique_identifier_hash": unique_identifier_hash,
"hit": hit,
"source": source,
}
)
hits_collected += 1
except Exception as e:
@ -330,7 +336,9 @@ async def index_elasticsearch_documents(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([d for d in docs_to_process if d['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([d for d in docs_to_process if d['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -347,7 +355,7 @@ async def index_elasticsearch_documents(
await on_heartbeat_callback(documents_processed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -355,9 +363,9 @@ async def index_elasticsearch_documents(
# Build metadata
metadata = {
"elasticsearch_id": item['doc_id'],
"elasticsearch_index": item['hit'].get("_index", index_name),
"elasticsearch_score": item['hit'].get("_score"),
"elasticsearch_id": item["doc_id"],
"elasticsearch_index": item["hit"].get("_index", index_name),
"elasticsearch_score": item["hit"].get("_score"),
"indexed_at": datetime.now().isoformat(),
"source": "ELASTICSEARCH_CONNECTOR",
"connector_id": connector_id,
@ -366,17 +374,17 @@ async def index_elasticsearch_documents(
# Add any additional metadata fields specified in config
if "ELASTICSEARCH_METADATA_FIELDS" in config:
for field in config["ELASTICSEARCH_METADATA_FIELDS"]:
if field in item['source']:
metadata[f"es_{field}"] = item['source'][field]
if field in item["source"]:
metadata[f"es_{field}"] = item["source"][field]
# Create chunks
chunks = await create_document_chunks(item['content'])
chunks = await create_document_chunks(item["content"])
# Update document to READY with actual content
document.title = item['title']
document.content = item['content']
document.content_hash = item['content_hash']
document.unique_identifier_hash = item['unique_identifier_hash']
document.title = item["title"]
document.content = item["content"]
document.content_hash = item["content_hash"]
document.unique_identifier_hash = item["unique_identifier_hash"]
document.document_metadata = metadata
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
@ -399,7 +407,9 @@ async def index_elasticsearch_documents(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
@ -411,10 +421,14 @@ async def index_elasticsearch_documents(
)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_processed} Elasticsearch documents processed")
logger.info(
f"Final commit: Total {documents_processed} Elasticsearch documents processed"
)
try:
await session.commit()
logger.info("Successfully committed all Elasticsearch document changes to database")
logger.info(
"Successfully committed all Elasticsearch document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (

View file

@ -17,7 +17,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.github_connector import GitHubConnector, RepositoryDigest
from app.connectors.github_connector import GitHubConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -237,7 +237,9 @@ async def index_github_repos(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(f"Repository {repo_full_name} unchanged. Skipping.")
documents_skipped += 1
@ -247,14 +249,16 @@ async def index_github_repos(
logger.info(
f"Content changed for repository {repo_full_name}. Queuing for update."
)
repos_to_process.append({
'document': existing_document,
'is_new': False,
'digest': digest,
'content_hash': content_hash,
'repo_full_name': repo_full_name,
'unique_identifier_hash': unique_identifier_hash,
})
repos_to_process.append(
{
"document": existing_document,
"is_new": False,
"digest": digest,
"content_hash": content_hash,
"repo_full_name": repo_full_name,
"unique_identifier_hash": unique_identifier_hash,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -298,14 +302,16 @@ async def index_github_repos(
session.add(document)
new_documents_created = True
repos_to_process.append({
'document': document,
'is_new': True,
'digest': digest,
'content_hash': content_hash,
'repo_full_name': repo_full_name,
'unique_identifier_hash': unique_identifier_hash,
})
repos_to_process.append(
{
"document": document,
"is_new": True,
"digest": digest,
"content_hash": content_hash,
"repo_full_name": repo_full_name,
"unique_identifier_hash": unique_identifier_hash,
}
)
except Exception as repo_err:
logger.error(
@ -317,7 +323,9 @@ async def index_github_repos(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([r for r in repos_to_process if r['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([r for r in repos_to_process if r['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -334,9 +342,9 @@ async def index_github_repos(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
digest = item['digest']
repo_full_name = item['repo_full_name']
document = item["document"]
digest = item["digest"]
repo_full_name = item["repo_full_name"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
@ -353,7 +361,9 @@ async def index_github_repos(
"document_type": "GitHub Repository",
"connector_type": "GitHub",
"ingestion_method": "gitingest",
"file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree,
"file_tree": digest.tree[:2000]
if len(digest.tree) > 2000
else digest.tree,
"estimated_tokens": digest.estimated_tokens,
}
@ -377,13 +387,17 @@ async def index_github_repos(
f"## Summary\n{digest.summary}\n\n"
f"## File Structure\n{digest.tree[:3000]}"
)
summary_embedding = config.embedding_model_instance.embed(summary_text)
summary_embedding = config.embedding_model_instance.embed(
summary_text
)
# Chunk the full digest content for granular search
try:
chunks_data = await create_document_chunks(digest.content)
except Exception as chunk_err:
logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}")
logger.error(
f"Failed to chunk repository {repo_full_name}: {chunk_err}"
)
chunks_data = await _simple_chunk_content(digest.content)
# Update document to READY with actual content
@ -401,7 +415,7 @@ async def index_github_repos(
document.title = repo_full_name
document.content = summary_text
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = doc_metadata
safe_set_chunks(document, chunks_data)
@ -433,7 +447,9 @@ async def index_github_repos(
document.status = DocumentStatus.failed(str(repo_err))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
documents_failed += 1
continue
@ -442,7 +458,9 @@ async def index_github_repos(
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit
logger.info(f"Final commit: Total {documents_processed} GitHub repositories processed")
logger.info(
f"Final commit: Total {documents_processed} GitHub repositories processed"
)
try:
await session.commit()
logger.info(

View file

@ -345,25 +345,29 @@ async def index_google_calendar_events(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
events_to_process.append({
'document': existing_document,
'is_new': False,
'event_markdown': event_markdown,
'content_hash': content_hash,
'event_id': event_id,
'event_summary': event_summary,
'calendar_id': calendar_id,
'start_time': start_time,
'end_time': end_time,
'location': location,
'description': description,
})
events_to_process.append(
{
"document": existing_document,
"is_new": False,
"event_markdown": event_markdown,
"content_hash": content_hash,
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"description": description,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -411,19 +415,21 @@ async def index_google_calendar_events(
session.add(document)
new_documents_created = True
events_to_process.append({
'document': document,
'is_new': True,
'event_markdown': event_markdown,
'content_hash': content_hash,
'event_id': event_id,
'event_summary': event_summary,
'calendar_id': calendar_id,
'start_time': start_time,
'end_time': end_time,
'location': location,
'description': description,
})
events_to_process.append(
{
"document": document,
"is_new": True,
"event_markdown": event_markdown,
"content_hash": content_hash,
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"description": description,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
@ -432,7 +438,9 @@ async def index_google_calendar_events(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -449,7 +457,7 @@ async def index_google_calendar_events(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -462,48 +470,53 @@ async def index_google_calendar_events(
if user_llm:
document_metadata_for_summary = {
"event_id": item['event_id'],
"event_summary": item['event_summary'],
"calendar_id": item['calendar_id'],
"start_time": item['start_time'],
"end_time": item['end_time'],
"location": item['location'] or "No location",
"event_id": item["event_id"],
"event_summary": item["event_summary"],
"calendar_id": item["calendar_id"],
"start_time": item["start_time"],
"end_time": item["end_time"],
"location": item["location"] or "No location",
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
summary_content, summary_embedding = await generate_document_summary(
item['event_markdown'], user_llm, document_metadata_for_summary
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["event_markdown"], user_llm, document_metadata_for_summary
)
else:
summary_content = f"Google Calendar Event: {item['event_summary']}\n\n"
summary_content = (
f"Google Calendar Event: {item['event_summary']}\n\n"
)
summary_content += f"Calendar: {item['calendar_id']}\n"
summary_content += f"Start: {item['start_time']}\n"
summary_content += f"End: {item['end_time']}\n"
if item['location']:
if item["location"]:
summary_content += f"Location: {item['location']}\n"
if item['description']:
desc_preview = item['description'][:1000]
if len(item['description']) > 1000:
if item["description"]:
desc_preview = item["description"][:1000]
if len(item["description"]) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['event_markdown'])
chunks = await create_document_chunks(item["event_markdown"])
# Update document to READY with actual content
document.title = item['event_summary']
document.title = item["event_summary"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"event_id": item['event_id'],
"event_summary": item['event_summary'],
"calendar_id": item['calendar_id'],
"start_time": item['start_time'],
"end_time": item['end_time'],
"location": item['location'],
"event_id": item["event_id"],
"event_summary": item["event_summary"],
"calendar_id": item["calendar_id"],
"start_time": item["start_time"],
"end_time": item["end_time"],
"location": item["location"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -527,7 +540,9 @@ async def index_google_calendar_events(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue

View file

@ -435,7 +435,7 @@ async def _index_full_scan(
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int]:
"""Perform full scan indexing of a folder.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Collect all files and create pending documents (visible in UI immediately)
- Phase 2: Process each file: pending processing ready/failed
@ -533,7 +533,9 @@ async def _index_full_scan(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents")
logger.info(
f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents"
)
await session.commit()
# =======================================================================
@ -568,9 +570,7 @@ async def _index_full_scan(
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
)
logger.info(f"Committed batch: {documents_indexed} files indexed so far")
logger.info(
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed"
@ -597,7 +597,7 @@ async def _index_with_delta_sync(
Note: include_subfolders is accepted for API consistency but delta sync
automatically tracks changes across all folders including subfolders.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Collect all changes and create pending documents (visible in UI immediately)
- Phase 2: Process each file: pending processing ready/failed
@ -676,7 +676,7 @@ async def _index_with_delta_sync(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing pending documents")
logger.info("Phase 1: Committing pending documents")
await session.commit()
# =======================================================================
@ -685,7 +685,7 @@ async def _index_with_delta_sync(
# =======================================================================
logger.info(f"Phase 2: Processing {len(changes_to_process)} changes")
for change, file, pending_doc in changes_to_process:
for _, file, pending_doc in changes_to_process:
# Check if it's time for a heartbeat update
if on_heartbeat_callback:
current_time = time.time()
@ -728,17 +728,17 @@ async def _create_pending_document_for_file(
) -> tuple[Document | None, bool]:
"""
Create a pending document for a Google Drive file if it doesn't exist.
This is Phase 1 of the 2-phase document status update pattern.
Creates documents with 'pending' status so they appear in UI immediately.
Args:
session: Database session
file: File metadata from Google Drive API
connector_id: ID of the Drive connector
search_space_id: ID of the search space
user_id: ID of the user
Returns:
Tuple of (document, should_skip):
- (existing_doc, False): Existing document that needs update
@ -746,28 +746,28 @@ async def _create_pending_document_for_file(
- (None, True): File should be skipped (unchanged, rename-only, or folder)
"""
from app.connectors.google_drive.file_types import should_skip_file
file_id = file.get("id")
file_name = file.get("name", "Unknown")
mime_type = file.get("mimeType", "")
# Skip folders and shortcuts
if should_skip_file(mime_type):
return None, True
if not file_id:
return None, True
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
# Check if document exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Check if this is a rename-only update (content unchanged)
incoming_md5 = file.get("md5Checksum")
@ -775,7 +775,7 @@ async def _create_pending_document_for_file(
doc_metadata = existing_document.document_metadata or {}
stored_md5 = doc_metadata.get("md5_checksum")
stored_modified_time = doc_metadata.get("modified_time")
# Determine if content changed
content_unchanged = False
if incoming_md5 and stored_md5:
@ -783,16 +783,18 @@ async def _create_pending_document_for_file(
elif not incoming_md5 and incoming_modified_time and stored_modified_time:
# Google Workspace file - use modifiedTime as fallback
content_unchanged = incoming_modified_time == stored_modified_time
if content_unchanged:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
return None, True
# Content changed - return existing document for update
return existing_document, False
# Create new pending document
document = Document(
search_space_id=search_space_id,
@ -815,7 +817,7 @@ async def _create_pending_document_for_file(
connector_id=connector_id,
)
session.add(document)
return document, False
@ -958,7 +960,7 @@ async def _process_single_file(
) -> tuple[int, int, int]:
"""
Process a single file by downloading and using Surfsense's file processor.
Implements Phase 2 of the 2-phase document status update pattern.
Updates document status: pending processing ready/failed
@ -1042,12 +1044,13 @@ async def _process_single_file(
processed_doc = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if processed_doc:
# Ensure status is READY
if not DocumentStatus.is_state(processed_doc.status, DocumentStatus.READY):
processed_doc.status = DocumentStatus.ready()
processed_doc.updated_at = get_current_timestamp()
await session.commit()
# Ensure status is READY
if processed_doc and not DocumentStatus.is_state(
processed_doc.status, DocumentStatus.READY
):
processed_doc.status = DocumentStatus.ready()
processed_doc.updated_at = get_current_timestamp()
await session.commit()
logger.info(f"Successfully indexed Google Drive file: {file_name}")
return 1, 0, 0
@ -1061,7 +1064,9 @@ async def _process_single_file(
pending_document.updated_at = get_current_timestamp()
await session.commit()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
return 0, 0, 1

View file

@ -228,7 +228,9 @@ async def index_google_gmail_messages(
documents_indexed = 0
documents_skipped = 0
documents_failed = 0 # Track messages that failed processing
duplicate_content_count = 0 # Track messages skipped due to duplicate content_hash
duplicate_content_count = (
0 # Track messages skipped due to duplicate content_hash
)
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
@ -294,23 +296,27 @@ async def index_google_gmail_messages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'message_id': message_id,
'thread_id': thread_id,
'subject': subject,
'sender': sender,
'date_str': date_str,
})
messages_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date_str": date_str,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -356,17 +362,19 @@ async def index_google_gmail_messages(
session.add(document)
new_documents_created = True
messages_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'message_id': message_id,
'thread_id': thread_id,
'subject': subject,
'sender': sender,
'date_str': date_str,
})
messages_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date_str": date_str,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True)
@ -375,7 +383,9 @@ async def index_google_gmail_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -392,7 +402,7 @@ async def index_google_gmail_messages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -405,16 +415,21 @@ async def index_google_gmail_messages(
if user_llm:
document_metadata_for_summary = {
"message_id": item['message_id'],
"thread_id": item['thread_id'],
"subject": item['subject'],
"sender": item['sender'],
"date": item['date_str'],
"message_id": item["message_id"],
"thread_id": item["thread_id"],
"subject": item["subject"],
"sender": item["sender"],
"date": item["date_str"],
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
summary_content = f"Google Gmail Message: {item['subject']}\n\n"
@ -424,19 +439,19 @@ async def index_google_gmail_messages(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item['subject']
document.title = item["subject"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"message_id": item['message_id'],
"thread_id": item['thread_id'],
"subject": item['subject'],
"sender": item['sender'],
"date": item['date_str'],
"message_id": item["message_id"],
"thread_id": item["thread_id"],
"subject": item["subject"],
"sender": item["sender"],
"date": item["date_str"],
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
@ -459,7 +474,9 @@ async def index_google_gmail_messages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue

View file

@ -239,23 +239,27 @@ async def index_jira_issues(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
issues_to_process.append({
'document': existing_document,
'is_new': False,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'formatted_issue': formatted_issue,
'comment_count': comment_count,
})
issues_to_process.append(
{
"document": existing_document,
"is_new": False,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"formatted_issue": formatted_issue,
"comment_count": comment_count,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -301,17 +305,19 @@ async def index_jira_issues(
session.add(document)
new_documents_created = True
issues_to_process.append({
'document': document,
'is_new': True,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'formatted_issue': formatted_issue,
'comment_count': comment_count,
})
issues_to_process.append(
{
"document": document,
"is_new": True,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"formatted_issue": formatted_issue,
"comment_count": comment_count,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True)
@ -320,7 +326,9 @@ async def index_jira_issues(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -337,7 +345,7 @@ async def index_jira_issues(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -350,11 +358,11 @@ async def index_jira_issues(
if user_llm:
document_metadata = {
"issue_key": item['issue_identifier'],
"issue_title": item['issue_title'],
"status": item['formatted_issue'].get("status", "Unknown"),
"priority": item['formatted_issue'].get("priority", "Unknown"),
"comment_count": item['comment_count'],
"issue_key": item["issue_identifier"],
"issue_title": item["issue_title"],
"status": item["formatted_issue"].get("status", "Unknown"),
"priority": item["formatted_issue"].get("priority", "Unknown"),
"comment_count": item["comment_count"],
"document_type": "Jira Issue",
"connector_type": "Jira",
}
@ -362,34 +370,32 @@ async def index_jira_issues(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['issue_content'], user_llm, document_metadata
item["issue_content"], user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Jira Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['formatted_issue'].get('status', 'Unknown')}\n\n"
if item['formatted_issue'].get("description"):
summary_content += (
f"Description: {item['formatted_issue'].get('description')}\n\n"
)
if item["formatted_issue"].get("description"):
summary_content += f"Description: {item['formatted_issue'].get('description')}\n\n"
summary_content += f"Comments: {item['comment_count']}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full issue content with comments
chunks = await create_document_chunks(item['issue_content'])
chunks = await create_document_chunks(item["issue_content"])
# Update document to READY with actual content
document.title = f"{item['issue_identifier']}: {item['issue_title']}"
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"issue_id": item['issue_id'],
"issue_identifier": item['issue_identifier'],
"issue_title": item['issue_title'],
"state": item['formatted_issue'].get("status", "Unknown"),
"comment_count": item['comment_count'],
"issue_id": item["issue_id"],
"issue_identifier": item["issue_identifier"],
"issue_title": item["issue_title"],
"state": item["formatted_issue"].get("status", "Unknown"),
"comment_count": item["comment_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -416,7 +422,9 @@ async def index_jira_issues(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue # Skip this issue and continue with others

View file

@ -272,7 +272,9 @@ async def index_linear_issues(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Linear issue {issue_identifier} unchanged. Skipping."
@ -281,19 +283,21 @@ async def index_linear_issues(
continue
# Queue existing document for update (will be set to processing in Phase 2)
issues_to_process.append({
'document': existing_document,
'is_new': False,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'state': state,
'description': description,
'comment_count': comment_count,
'priority': priority,
})
issues_to_process.append(
{
"document": existing_document,
"is_new": False,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"description": description,
"comment_count": comment_count,
"priority": priority,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -338,19 +342,21 @@ async def index_linear_issues(
session.add(document)
new_documents_created = True
issues_to_process.append({
'document': document,
'is_new': True,
'issue_content': issue_content,
'content_hash': content_hash,
'issue_id': issue_id,
'issue_identifier': issue_identifier,
'issue_title': issue_title,
'state': state,
'description': description,
'comment_count': comment_count,
'priority': priority,
})
issues_to_process.append(
{
"document": document,
"is_new": True,
"issue_content": issue_content,
"content_hash": content_hash,
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"description": description,
"comment_count": comment_count,
"priority": priority,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True)
@ -359,7 +365,9 @@ async def index_linear_issues(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -376,7 +384,7 @@ async def index_linear_issues(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -389,20 +397,23 @@ async def index_linear_issues(
if user_llm:
document_metadata_for_summary = {
"issue_id": item['issue_identifier'],
"issue_title": item['issue_title'],
"state": item['state'],
"priority": item['priority'],
"comment_count": item['comment_count'],
"issue_id": item["issue_identifier"],
"issue_title": item["issue_title"],
"state": item["state"],
"priority": item["priority"],
"comment_count": item["comment_count"],
"document_type": "Linear Issue",
"connector_type": "Linear",
}
summary_content, summary_embedding = await generate_document_summary(
item['issue_content'], user_llm, document_metadata_for_summary
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["issue_content"], user_llm, document_metadata_for_summary
)
else:
# Fallback to simple summary if no LLM configured
description = item['description']
description = item["description"]
if description and len(description) > 1000:
description = description[:997] + "..."
summary_content = f"Linear Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['state']}\n\n"
@ -413,19 +424,19 @@ async def index_linear_issues(
summary_content
)
chunks = await create_document_chunks(item['issue_content'])
chunks = await create_document_chunks(item["issue_content"])
# Update document to READY with actual content
document.title = f"{item['issue_identifier']}: {item['issue_title']}"
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"issue_id": item['issue_id'],
"issue_identifier": item['issue_identifier'],
"issue_title": item['issue_title'],
"state": item['state'],
"comment_count": item['comment_count'],
"issue_id": item["issue_id"],
"issue_identifier": item["issue_identifier"],
"issue_title": item["issue_title"],
"state": item["state"],
"comment_count": item["comment_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -452,7 +463,9 @@ async def index_linear_issues(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
skipped_issues.append(
f"{item.get('issue_identifier', 'Unknown')} (processing error)"
)
@ -466,7 +479,9 @@ async def index_linear_issues(
logger.info(f"Final commit: Total {documents_indexed} Linear issues processed")
try:
await session.commit()
logger.info("Successfully committed all Linear document changes to database")
logger.info(
"Successfully committed all Linear document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (

View file

@ -305,7 +305,9 @@ async def index_luma_events(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Luma event {event_name} unchanged. Skipping."
@ -314,23 +316,25 @@ async def index_luma_events(
continue
# Queue existing document for update (will be set to processing in Phase 2)
events_to_process.append({
'document': existing_document,
'is_new': False,
'event_id': event_id,
'event_name': event_name,
'event_url': event_url,
'event_markdown': event_markdown,
'content_hash': content_hash,
'start_at': start_at,
'end_at': end_at,
'timezone': timezone,
'location': location,
'city': city,
'host_names': host_names,
'description': description,
'cover_url': cover_url,
})
events_to_process.append(
{
"document": existing_document,
"is_new": False,
"event_id": event_id,
"event_name": event_name,
"event_url": event_url,
"event_markdown": event_markdown,
"content_hash": content_hash,
"start_at": start_at,
"end_at": end_at,
"timezone": timezone,
"location": location,
"city": city,
"host_names": host_names,
"description": description,
"cover_url": cover_url,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -380,23 +384,25 @@ async def index_luma_events(
session.add(document)
new_documents_created = True
events_to_process.append({
'document': document,
'is_new': True,
'event_id': event_id,
'event_name': event_name,
'event_url': event_url,
'event_markdown': event_markdown,
'content_hash': content_hash,
'start_at': start_at,
'end_at': end_at,
'timezone': timezone,
'location': location,
'city': city,
'host_names': host_names,
'description': description,
'cover_url': cover_url,
})
events_to_process.append(
{
"document": document,
"is_new": True,
"event_id": event_id,
"event_name": event_name,
"event_url": event_url,
"event_markdown": event_markdown,
"content_hash": content_hash,
"start_at": start_at,
"end_at": end_at,
"timezone": timezone,
"location": location,
"city": city,
"host_names": host_names,
"description": description,
"cover_url": cover_url,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
@ -405,7 +411,9 @@ async def index_luma_events(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -422,7 +430,7 @@ async def index_luma_events(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -435,15 +443,15 @@ async def index_luma_events(
if user_llm:
document_metadata_for_summary = {
"event_id": item['event_id'],
"event_name": item['event_name'],
"event_url": item['event_url'],
"start_at": item['start_at'],
"end_at": item['end_at'],
"timezone": item['timezone'],
"location": item['location'] or "No location",
"city": item['city'],
"hosts": item['host_names'],
"event_id": item["event_id"],
"event_name": item["event_name"],
"event_url": item["event_url"],
"start_at": item["start_at"],
"end_at": item["end_at"],
"timezone": item["timezone"],
"location": item["location"] or "No location",
"city": item["city"],
"hosts": item["host_names"],
"document_type": "Luma Event",
"connector_type": "Luma",
}
@ -451,26 +459,26 @@ async def index_luma_events(
summary_content,
summary_embedding,
) = await generate_document_summary(
item['event_markdown'], user_llm, document_metadata_for_summary
item["event_markdown"], user_llm, document_metadata_for_summary
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Luma Event: {item['event_name']}\n\n"
if item['event_url']:
if item["event_url"]:
summary_content += f"URL: {item['event_url']}\n"
summary_content += f"Start: {item['start_at']}\n"
summary_content += f"End: {item['end_at']}\n"
if item['timezone']:
if item["timezone"]:
summary_content += f"Timezone: {item['timezone']}\n"
if item['location']:
if item["location"]:
summary_content += f"Location: {item['location']}\n"
if item['city']:
if item["city"]:
summary_content += f"City: {item['city']}\n"
if item['host_names']:
if item["host_names"]:
summary_content += f"Hosts: {item['host_names']}\n"
if item['description']:
desc_preview = item['description'][:1000]
if len(item['description']) > 1000:
if item["description"]:
desc_preview = item["description"][:1000]
if len(item["description"]) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
@ -478,24 +486,24 @@ async def index_luma_events(
summary_content
)
chunks = await create_document_chunks(item['event_markdown'])
chunks = await create_document_chunks(item["event_markdown"])
# Update document to READY with actual content
document.title = item['event_name']
document.title = item["event_name"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"event_id": item['event_id'],
"event_name": item['event_name'],
"event_url": item['event_url'],
"start_at": item['start_at'],
"end_at": item['end_at'],
"timezone": item['timezone'],
"location": item['location'],
"city": item['city'],
"hosts": item['host_names'],
"cover_url": item['cover_url'],
"event_id": item["event_id"],
"event_name": item["event_name"],
"event_url": item["event_url"],
"start_at": item["start_at"],
"end_at": item["end_at"],
"timezone": item["timezone"],
"location": item["location"],
"city": item["city"],
"hosts": item["host_names"],
"cover_url": item["cover_url"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -522,7 +530,9 @@ async def index_luma_events(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
skipped_events.append(
f"{item.get('event_name', 'Unknown')} (processing error)"
)

View file

@ -354,20 +354,24 @@ async def index_notion_pages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
pages_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
})
pages_to_process.append(
{
"document": existing_document,
"is_new": False,
"markdown_content": markdown_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -410,14 +414,16 @@ async def index_notion_pages(
session.add(document)
new_documents_created = True
pages_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'page_id': page_id,
'page_title': page_title,
})
pages_to_process.append(
{
"document": document,
"is_new": True,
"markdown_content": markdown_content,
"content_hash": content_hash,
"page_id": page_id,
"page_title": page_title,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True)
@ -426,7 +432,9 @@ async def index_notion_pages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -443,7 +451,7 @@ async def index_notion_pages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
@ -456,13 +464,18 @@ async def index_notion_pages(
if user_llm:
document_metadata_for_summary = {
"page_title": item['page_title'],
"page_id": item['page_id'],
"page_title": item["page_title"],
"page_id": item["page_id"],
"document_type": "Notion Page",
"connector_type": "Notion",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
# Fallback to simple summary if no LLM configured
@ -471,16 +484,16 @@ async def index_notion_pages(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
chunks = await create_document_chunks(item["markdown_content"])
# Update document to READY with actual content
document.title = item['page_title']
document.title = item["page_title"]
document.content = summary_content
document.content_hash = item['content_hash']
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = {
"page_title": item['page_title'],
"page_id": item['page_id'],
"page_title": item["page_title"],
"page_id": item["page_id"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -504,7 +517,9 @@ async def index_notion_pages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
skipped_pages.append(f"{item['page_title']} (processing error)")
documents_failed += 1
continue

View file

@ -382,27 +382,31 @@ async def index_obsidian_vault(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.debug(f"Note {title} unchanged, skipping")
skipped_count += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
files_to_process.append({
'document': existing_document,
'is_new': False,
'file_info': file_info,
'content': content,
'body_content': body_content,
'frontmatter': frontmatter,
'wiki_links': wiki_links,
'tags': tags,
'title': title,
'relative_path': relative_path,
'content_hash': content_hash,
'unique_identifier_hash': unique_identifier_hash,
})
files_to_process.append(
{
"document": existing_document,
"is_new": False,
"file_info": file_info,
"content": content,
"body_content": body_content,
"frontmatter": frontmatter,
"wiki_links": wiki_links,
"tags": tags,
"title": title,
"relative_path": relative_path,
"content_hash": content_hash,
"unique_identifier_hash": unique_identifier_hash,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -445,20 +449,22 @@ async def index_obsidian_vault(
session.add(document)
new_documents_created = True
files_to_process.append({
'document': document,
'is_new': True,
'file_info': file_info,
'content': content,
'body_content': body_content,
'frontmatter': frontmatter,
'wiki_links': wiki_links,
'tags': tags,
'title': title,
'relative_path': relative_path,
'content_hash': content_hash,
'unique_identifier_hash': unique_identifier_hash,
})
files_to_process.append(
{
"document": document,
"is_new": True,
"file_info": file_info,
"content": content,
"body_content": body_content,
"frontmatter": frontmatter,
"wiki_links": wiki_links,
"tags": tags,
"title": title,
"relative_path": relative_path,
"content_hash": content_hash,
"unique_identifier_hash": unique_identifier_hash,
}
)
except Exception as e:
logger.exception(
@ -469,7 +475,9 @@ async def index_obsidian_vault(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -491,22 +499,22 @@ async def index_obsidian_vault(
await on_heartbeat_callback(indexed_count)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Extract data from item
title = item['title']
relative_path = item['relative_path']
content = item['content']
body_content = item['body_content']
frontmatter = item['frontmatter']
wiki_links = item['wiki_links']
tags = item['tags']
content_hash = item['content_hash']
file_info = item['file_info']
title = item["title"]
relative_path = item["relative_path"]
content = item["content"]
body_content = item["body_content"]
frontmatter = item["frontmatter"]
wiki_links = item["wiki_links"]
tags = item["tags"]
content_hash = item["content_hash"]
file_info = item["file_info"]
# Build metadata
document_metadata = {
@ -584,7 +592,9 @@ async def index_obsidian_vault(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
failed_count += 1
continue
@ -592,9 +602,7 @@ async def index_obsidian_vault(
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {indexed_count} Obsidian notes processed"
)
logger.info(f"Final commit: Total {indexed_count} Obsidian notes processed")
try:
await session.commit()
logger.info(

View file

@ -314,7 +314,9 @@ async def index_slack_messages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping."
@ -323,18 +325,20 @@ async def index_slack_messages(
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'channel_name': channel_name,
'channel_id': channel_id,
'msg_ts': msg_ts,
'start_date': start_date_str,
'end_date': end_date_str,
'message_count': len(formatted_messages),
})
messages_to_process.append(
{
"document": existing_document,
"is_new": False,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"channel_name": channel_name,
"channel_id": channel_id,
"msg_ts": msg_ts,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -377,18 +381,20 @@ async def index_slack_messages(
session.add(document)
new_documents_created = True
messages_to_process.append({
'document': document,
'is_new': True,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'channel_name': channel_name,
'channel_id': channel_id,
'msg_ts': msg_ts,
'start_date': start_date_str,
'end_date': end_date_str,
'message_count': len(formatted_messages),
})
messages_to_process.append(
{
"document": document,
"is_new": True,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"channel_name": channel_name,
"channel_id": channel_id,
"msg_ts": msg_ts,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
}
)
logger.info(
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}"
@ -409,7 +415,9 @@ async def index_slack_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -426,29 +434,29 @@ async def index_slack_messages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (embeddings, chunks)
chunks = await create_document_chunks(item['combined_document_string'])
chunks = await create_document_chunks(item["combined_document_string"])
doc_embedding = config.embedding_model_instance.embed(
item['combined_document_string']
item["combined_document_string"]
)
# Update document to READY with actual content
document.title = item['channel_name']
document.content = item['combined_document_string']
document.content_hash = item['content_hash']
document.title = item["channel_name"]
document.content = item["combined_document_string"]
document.content_hash = item["content_hash"]
document.embedding = doc_embedding
document.document_metadata = {
"channel_name": item['channel_name'],
"channel_id": item['channel_id'],
"start_date": item['start_date'],
"end_date": item['end_date'],
"message_count": item['message_count'],
"channel_name": item["channel_name"],
"channel_id": item["channel_id"],
"start_date": item["start_date"],
"end_date": item["end_date"],
"message_count": item["message_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -475,7 +483,9 @@ async def index_slack_messages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue

View file

@ -332,25 +332,31 @@ async def index_teams_messages(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = (
DocumentStatus.ready()
)
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'team_name': team_name,
'team_id': team_id,
'channel_name': channel_name,
'channel_id': channel_id,
'message_id': message_id,
'start_date': start_date_str,
'end_date': end_date_str,
})
messages_to_process.append(
{
"document": existing_document,
"is_new": False,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": message_id,
"start_date": start_date_str,
"end_date": end_date_str,
}
)
continue
# Document doesn't exist by unique_identifier_hash
@ -400,19 +406,21 @@ async def index_teams_messages(
session.add(document)
new_documents_created = True
messages_to_process.append({
'document': document,
'is_new': True,
'combined_document_string': combined_document_string,
'content_hash': content_hash,
'team_name': team_name,
'team_id': team_id,
'channel_name': channel_name,
'channel_id': channel_id,
'message_id': message_id,
'start_date': start_date_str,
'end_date': end_date_str,
})
messages_to_process.append(
{
"document": document,
"is_new": True,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": message_id,
"start_date": start_date_str,
"end_date": end_date_str,
}
)
except Exception as e:
logger.error(
@ -432,7 +440,9 @@ async def index_teams_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -449,30 +459,30 @@ async def index_teams_messages(
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
document = item["document"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (embeddings, chunks)
chunks = await create_document_chunks(item['combined_document_string'])
chunks = await create_document_chunks(item["combined_document_string"])
doc_embedding = config.embedding_model_instance.embed(
item['combined_document_string']
item["combined_document_string"]
)
# Update document to READY with actual content
document.title = f"{item['team_name']} - {item['channel_name']}"
document.content = item['combined_document_string']
document.content_hash = item['content_hash']
document.content = item["combined_document_string"]
document.content_hash = item["content_hash"]
document.embedding = doc_embedding
document.document_metadata = {
"team_name": item['team_name'],
"team_id": item['team_id'],
"channel_name": item['channel_name'],
"channel_id": item['channel_id'],
"start_date": item['start_date'],
"end_date": item['end_date'],
"team_name": item["team_name"],
"team_id": item["team_id"],
"channel_name": item["channel_name"],
"channel_id": item["channel_id"],
"start_date": item["start_date"],
"end_date": item["end_date"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -497,7 +507,9 @@ async def index_teams_messages(
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
@ -510,9 +522,7 @@ async def index_teams_messages(
)
try:
await session.commit()
logger.info(
"Successfully committed all Teams document changes to database"
)
logger.info("Successfully committed all Teams document changes to database")
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (

View file

@ -184,22 +184,28 @@ async def index_crawled_urls(
if existing_document:
# Document exists - check if it's already being processed
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING):
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
):
logger.info(f"URL {url} already pending. Skipping.")
documents_skipped += 1
continue
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING):
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
logger.info(f"URL {url} already processing. Skipping.")
documents_skipped += 1
continue
# Queue existing document for potential update check
urls_to_process.append({
'document': existing_document,
'is_new': False,
'url': url,
'unique_identifier_hash': unique_identifier_hash,
})
urls_to_process.append(
{
"document": existing_document,
"is_new": False,
"url": url,
"unique_identifier_hash": unique_identifier_hash,
}
)
continue
# Create new document with PENDING status (visible in UI immediately)
@ -224,12 +230,14 @@ async def index_crawled_urls(
session.add(document)
new_documents_created = True
urls_to_process.append({
'document': document,
'is_new': True,
'url': url,
'unique_identifier_hash': unique_identifier_hash,
})
urls_to_process.append(
{
"document": document,
"is_new": True,
"url": url,
"unique_identifier_hash": unique_identifier_hash,
}
)
except Exception as e:
logger.error(f"Error in Phase 1 for URL {url}: {e!s}", exc_info=True)
@ -238,7 +246,9 @@ async def index_crawled_urls(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([u for u in urls_to_process if u['is_new']])} pending documents")
logger.info(
f"Phase 1: Committing {len([u for u in urls_to_process if u['is_new']])} pending documents"
)
await session.commit()
# =======================================================================
@ -255,9 +265,9 @@ async def index_crawled_urls(
await on_heartbeat_callback(documents_indexed + documents_updated)
last_heartbeat_time = current_time
document = item['document']
url = item['url']
is_new = item['is_new']
document = item["document"]
url = item["url"]
is_new = item["is_new"]
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
@ -298,7 +308,9 @@ async def index_crawled_urls(
continue
# Format content as structured document for summary generation
structured_document = crawler.format_to_structured_document(crawl_result)
structured_document = crawler.format_to_structured_document(
crawl_result
)
# Generate content hash using a version WITHOUT metadata
structured_document_for_hash = crawler.format_to_structured_document(
@ -339,7 +351,9 @@ async def index_crawled_urls(
f"(existing document ID: {duplicate_by_content.id}). "
f"Marking as failed."
)
document.status = DocumentStatus.failed("Duplicate content exists")
document.status = DocumentStatus.failed(
"Duplicate content exists"
)
document.updated_at = get_current_timestamp()
await session.commit()
duplicate_content_count += 1
@ -360,7 +374,10 @@ async def index_crawled_urls(
"document_type": "Crawled URL",
"crawler_type": crawler_type,
}
summary_content, summary_embedding = await generate_document_summary(
(
summary_content,
summary_embedding,
) = await generate_document_summary(
structured_document, user_llm, document_metadata_for_summary
)
else:
@ -423,7 +440,9 @@ async def index_crawled_urls(
document.updated_at = get_current_timestamp()
await session.commit()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
documents_failed += 1
continue
@ -438,7 +457,9 @@ async def index_crawled_urls(
)
try:
await session.commit()
logger.info("Successfully committed all webcrawler document changes to database")
logger.info(
"Successfully committed all webcrawler document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully
if "duplicate key value violates unique constraint" in str(e).lower():

View file

@ -17,29 +17,30 @@ md = MarkdownifyTransformer()
def safe_set_chunks(document: Document, chunks: list) -> None:
"""
Safely assign chunks to a document without triggering lazy loading.
ALWAYS use this instead of `document.chunks = chunks` to avoid
SQLAlchemy async errors (MissingGreenlet / greenlet_spawn).
Why this is needed:
- Direct assignment `document.chunks = chunks` triggers SQLAlchemy to
load the OLD chunks first (for comparison/orphan detection)
- This lazy loading fails in async context with asyncpg driver
- set_committed_value bypasses this by setting the value directly
This function is safe regardless of how the document was loaded
(with or without selectinload).
Args:
document: The Document object to update
chunks: List of Chunk objects to assign
Example:
# Instead of: document.chunks = chunks (DANGEROUS!)
safe_set_chunks(document, chunks) # Always safe
"""
from sqlalchemy.orm.attributes import set_committed_value
set_committed_value(document, 'chunks', chunks)
set_committed_value(document, "chunks", chunks)
def get_current_timestamp() -> datetime:

View file

@ -91,7 +91,9 @@ async def add_circleback_meeting_document(
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
await session.commit()
logger.info(f"Circleback meeting {meeting_id} unchanged. Skipping.")
@ -110,7 +112,7 @@ async def add_circleback_meeting_document(
# PHASE 1: Create document with PENDING status
# This makes the document visible in the UI immediately
# =======================================================================
# Fetch the user who set up the Circleback connector (preferred)
# or fall back to search space owner if no connector found
created_by_user_id = None
@ -173,7 +175,7 @@ async def add_circleback_meeting_document(
# =======================================================================
# PHASE 3: Process the document content
# =======================================================================
# Get LLM for generating summary
llm = await get_document_summary_llm(session, search_space_id)
if not llm:
@ -243,7 +245,7 @@ async def add_circleback_meeting_document(
await session.commit()
await session.refresh(document)
if existing_document:
logger.info(
f"Updated Circleback meeting document {meeting_id} in search space {search_space_id}"
@ -267,7 +269,9 @@ async def add_circleback_meeting_document(
document.updated_at = get_current_timestamp()
await session.commit()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
raise db_error
except Exception as e:
await session.rollback()
@ -279,5 +283,7 @@ async def add_circleback_meeting_document(
document.updated_at = get_current_timestamp()
await session.commit()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
logger.error(
f"Failed to update document status to failed: {status_error}"
)
raise RuntimeError(f"Failed to process Circleback meeting: {e!s}") from e

View file

@ -1629,16 +1629,16 @@ async def process_file_in_background_with_document(
) -> Document | None:
"""
Process file and update existing pending document (2-phase pattern).
This function is Phase 2 of the real-time document status updates:
- Phase 1 (API): Created document with pending status
- Phase 2 (this): Process file and update document to ready/failed
The document already exists with pending status. This function:
1. Parses the file content (markdown, audio, or ETL services)
2. Updates the document with content, embeddings, and chunks
3. Sets status to 'ready' on success
Args:
document: Existing document with pending status
file_path: Path to the uploaded file
@ -1650,7 +1650,7 @@ async def process_file_in_background_with_document(
log_entry: Log entry for this task
connector: Optional connector info for Google Drive files
notification: Optional notification for progress updates
Returns:
Updated Document object if successful, None if duplicate content detected
"""
@ -1665,13 +1665,18 @@ async def process_file_in_background_with_document(
etl_service = None
# ===== STEP 1: Parse file content based on type =====
# Check if the file is a markdown or text file
if filename.lower().endswith((".md", ".markdown", ".txt")):
# Update notification: parsing stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Reading file"
await (
NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading file",
)
)
await task_logger.log_task_progress(
@ -1695,8 +1700,13 @@ async def process_file_in_background_with_document(
):
# Update notification: parsing stage (transcription)
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Transcribing audio"
await (
NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Transcribing audio",
)
)
await task_logger.log_task_progress(
@ -1708,7 +1718,8 @@ async def process_file_in_background_with_document(
# Transcribe audio
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
if app_config.STT_SERVICE
and app_config.STT_SERVICE.startswith("local/")
else "external"
)
@ -1719,7 +1730,9 @@ async def process_file_in_background_with_document(
transcribed_text = result.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}"
markdown_content = (
f"# Transcription of {filename}\n\n{transcribed_text}"
)
else:
with open(file_path, "rb") as audio_file:
transcription_kwargs = {
@ -1728,12 +1741,18 @@ async def process_file_in_background_with_document(
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
transcription_response = await atranscription(**transcription_kwargs)
transcription_kwargs["api_base"] = (
app_config.STT_SERVICE_API_BASE
)
transcription_response = await atranscription(
**transcription_kwargs
)
transcribed_text = transcription_response.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}"
markdown_content = (
f"# Transcription of {filename}\n\n{transcribed_text}"
)
etl_service = "AUDIO_TRANSCRIPTION"
# Clean up temp file
@ -1742,13 +1761,18 @@ async def process_file_in_background_with_document(
else:
# Document files - use ETL service
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
from app.services.page_limit_service import (
PageLimitExceededError,
PageLimitService,
)
page_limit_service = PageLimitService(session)
# Estimate page count
try:
estimated_pages = page_limit_service.estimate_pages_before_processing(file_path)
estimated_pages = page_limit_service.estimate_pages_before_processing(
file_path
)
except Exception:
file_size = os.path.getsize(file_path)
estimated_pages = max(1, file_size // (80 * 1024))
@ -1759,14 +1783,22 @@ async def process_file_in_background_with_document(
if app_config.ETL_SERVICE == "UNSTRUCTURED":
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Extracting content"
session,
notification,
stage="parsing",
stage_message="Extracting content",
)
from langchain_unstructured import UnstructuredLoader
loader = UnstructuredLoader(
file_path, mode="elements", post_processors=[], languages=["eng"],
include_orig_elements=False, include_metadata=False, strategy="auto"
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
markdown_content = await convert_document_to_markdown(docs)
@ -1775,37 +1807,55 @@ async def process_file_in_background_with_document(
etl_service = "UNSTRUCTURED"
# Update page usage
await page_limit_service.update_page_usage(user_id, final_page_count, allow_exceed=True)
await page_limit_service.update_page_usage(
user_id, final_page_count, allow_exceed=True
)
elif app_config.ETL_SERVICE == "LLAMACLOUD":
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Extracting content"
session,
notification,
stage="parsing",
stage_message="Extracting content",
)
result = await parse_with_llamacloud_retry(
file_path=file_path, estimated_pages=estimated_pages,
task_logger=task_logger, log_entry=log_entry
file_path=file_path,
estimated_pages=estimated_pages,
task_logger=task_logger,
log_entry=log_entry,
)
markdown_documents = await result.aget_markdown_documents(
split_by_page=False
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}")
raise RuntimeError(
f"LlamaCloud parsing returned no documents: {filename}"
)
markdown_content = markdown_documents[0].text
etl_service = "LLAMACLOUD"
# Update page usage
await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True)
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
elif app_config.ETL_SERVICE == "DOCLING":
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Extracting content"
session,
notification,
stage="parsing",
stage_message="Extracting content",
)
# Suppress logging during Docling import
getLogger("docling.pipeline.base_pipeline").setLevel(ERROR)
getLogger("docling.document_converter").setLevel(ERROR)
getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel(ERROR)
getLogger(
"docling_core.transforms.chunker.hierarchical_chunker"
).setLevel(ERROR)
from docling.document_converter import DocumentConverter
@ -1815,7 +1865,9 @@ async def process_file_in_background_with_document(
etl_service = "DOCLING"
# Update page usage
await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True)
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
else:
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
@ -1829,7 +1881,7 @@ async def process_file_in_background_with_document(
# ===== STEP 2: Check for duplicate content =====
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_by_content = await check_duplicate_document(session, content_hash)
if existing_by_content and existing_by_content.id != document.id:
# Duplicate content found - mark this document as failed
@ -1846,7 +1898,7 @@ async def process_file_in_background_with_document(
)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata = {
"file_name": filename,
@ -1881,10 +1933,10 @@ async def process_file_in_background_with_document(
**(document.document_metadata or {}),
}
flag_modified(document, "document_metadata")
# Use safe_set_chunks to avoid async issues
safe_set_chunks(document, chunks)
document.blocknote_document = blocknote_json
document.content_needs_reindexing = False
document.updated_at = get_current_timestamp()
@ -1922,7 +1974,11 @@ async def process_file_in_background_with_document(
log_entry,
error_message,
str(e),
{"error_type": type(e).__name__, "filename": filename, "document_id": document.id},
{
"error_type": type(e).__name__,
"filename": filename,
"document_id": document.id,
},
)
logging.error(f"Error processing file with document: {error_message}")
raise

View file

@ -136,11 +136,19 @@ async def add_youtube_video_document(
document = existing_document
is_new_document = False
# Check if already being processed
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING):
logging.info(f"YouTube video {video_id} already pending. Returning existing.")
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
):
logging.info(
f"YouTube video {video_id} already pending. Returning existing."
)
return existing_document
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING):
logging.info(f"YouTube video {video_id} already processing. Returning existing.")
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
logging.info(
f"YouTube video {video_id} already processing. Returning existing."
)
return existing_document
else:
# Create new document with PENDING status (visible in UI immediately)
@ -300,7 +308,9 @@ async def add_youtube_video_document(
"video_id": video_id,
},
)
logging.info(f"Document for YouTube video {video_id} unchanged. Marking as ready.")
logging.info(
f"Document for YouTube video {video_id} unchanged. Marking as ready."
)
document.status = DocumentStatus.ready()
await session.commit()
return document
@ -408,7 +418,9 @@ async def add_youtube_video_document(
# Mark document as failed if it exists
if document:
try:
document.status = DocumentStatus.failed(f"Database error: {str(db_error)[:150]}")
document.status = DocumentStatus.failed(
f"Database error: {str(db_error)[:150]}"
)
document.updated_at = get_current_timestamp()
await session.commit()
except Exception: