feat: enhance document upload process with two-phase indexing and real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 05:15:47 +05:30
parent f56f5a281e
commit ed2fc5c636
3 changed files with 694 additions and 11 deletions

View file

@ -113,9 +113,23 @@ async def create_documents_file_upload(
user: User = Depends(current_active_user),
):
"""
Upload files as documents.
Upload files as documents with real-time status tracking.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately via ElectricSQL)
- Phase 2: Celery processes each file: pending processing ready/failed
Requires DOCUMENTS_CREATE permission.
"""
from datetime import datetime
from app.db import DocumentStatus
from app.tasks.document_processors.base import (
check_document_by_unique_identifier,
get_current_timestamp,
)
from app.utils.document_converters import generate_unique_identifier_hash
try:
# Check permission
await check_permission(
@ -129,38 +143,101 @@ async def create_documents_file_upload(
if not files:
raise HTTPException(status_code=400, detail="No files provided")
created_documents: list[Document] = []
files_to_process: list[tuple[Document, str, str]] = [] # (document, temp_path, filename)
skipped_duplicates = 0
# ===== PHASE 1: Create pending documents for all files =====
# This makes ALL documents visible in the UI immediately with pending status
for file in files:
try:
# Save file to a temporary location to avoid stream issues
import os
import tempfile
# Create temp file
# Save file to temp location
with tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(file.filename)[1]
delete=False, suffix=os.path.splitext(file.filename or "")[1]
) as temp_file:
temp_path = temp_file.name
# Write uploaded file to temp file
content = await file.read()
with open(temp_path, "wb") as f:
f.write(content)
from app.tasks.celery_tasks.document_tasks import (
process_file_upload_task,
file_size = len(content)
# Generate unique identifier for deduplication check
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file.filename or "unknown", search_space_id
)
process_file_upload_task.delay(
temp_path, file.filename, search_space_id, str(user.id)
# Check if document already exists (by unique identifier)
existing = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing:
# Clean up temp file for duplicates
os.unlink(temp_path)
skipped_duplicates += 1
continue
# Create pending document (visible immediately in UI via ElectricSQL)
document = Document(
search_space_id=search_space_id,
title=file.filename or "Uploaded File",
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file.filename,
"file_size": file_size,
"upload_time": datetime.now().isoformat(),
},
content="Processing...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary, updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
status=DocumentStatus.pending(), # Shows "pending" in UI
updated_at=get_current_timestamp(),
created_by_id=str(user.id),
)
session.add(document)
created_documents.append(document)
files_to_process.append((document, temp_path, file.filename or "unknown"))
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to process file {file.filename}: {e!s}",
) from e
await session.commit()
return {"message": "Files uploaded for processing"}
# Commit all pending documents - they appear in UI immediately via ElectricSQL
if created_documents:
await session.commit()
# Refresh to get generated IDs
for doc in created_documents:
await session.refresh(doc)
# ===== PHASE 2: Dispatch Celery tasks for each file =====
# Each task will update document status: pending → processing → ready/failed
from app.tasks.celery_tasks.document_tasks import (
process_file_upload_with_document_task,
)
for document, temp_path, filename in files_to_process:
process_file_upload_with_document_task.delay(
document_id=document.id,
temp_path=temp_path,
filename=filename,
search_space_id=search_space_id,
user_id=str(user.id),
)
return {
"message": "Files uploaded for processing",
"document_ids": [doc.id for doc in created_documents],
"total_files": len(files),
"pending_files": len(files_to_process),
"skipped_duplicates": skipped_duplicates,
}
except HTTPException:
raise
except Exception as e:

View file

@ -537,6 +537,298 @@ async def _process_file_upload(
raise
@celery_app.task(name="process_file_upload_with_document", bind=True)
def process_file_upload_with_document_task(
self,
document_id: int,
temp_path: str,
filename: str,
search_space_id: int,
user_id: str,
):
"""
Celery task to process uploaded file with existing pending document.
This task is used by the 2-phase document upload flow:
- Phase 1 (API): Creates pending document (visible in UI immediately)
- Phase 2 (this task): Updates document status: pending processing ready/failed
Args:
document_id: ID of the pending document created in Phase 1
temp_path: Path to the uploaded file
filename: Original filename
search_space_id: ID of the search space
user_id: ID of the user
"""
import asyncio
import os
import traceback
logger.info(
f"[process_file_upload_with_document] Task started - document_id: {document_id}, "
f"file: {filename}, search_space_id: {search_space_id}"
)
# Check if file exists and is accessible
if not os.path.exists(temp_path):
logger.error(
f"[process_file_upload_with_document] File does not exist: {temp_path}. "
"The temp file may have been cleaned up before the task ran."
)
# Mark document as failed since file is missing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_mark_document_failed(
document_id,
"File not found - temp file may have been cleaned up",
)
)
finally:
loop.close()
return
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_process_file_with_document(
document_id, temp_path, filename, search_space_id, user_id
)
)
logger.info(
f"[process_file_upload_with_document] Task completed successfully for: {filename}"
)
except Exception as e:
logger.error(
f"[process_file_upload_with_document] Task failed for {filename}: {e}\n"
f"Traceback:\n{traceback.format_exc()}"
)
raise
finally:
loop.close()
async def _mark_document_failed(document_id: int, reason: str):
"""Mark a document as failed when task cannot proceed."""
from app.db import Document, DocumentStatus
from app.tasks.document_processors.base import get_current_timestamp
async with get_celery_session_maker()() as session:
document = await session.get(Document, document_id)
if document:
document.status = DocumentStatus.failed(reason)
document.updated_at = get_current_timestamp()
await session.commit()
logger.info(f"Marked document {document_id} as failed: {reason}")
async def _process_file_with_document(
document_id: int,
temp_path: str,
filename: str,
search_space_id: int,
user_id: str,
):
"""
Process file and update existing pending document status.
This function implements Phase 2 of the 2-phase document upload:
- Sets document status to 'processing' (shows spinner in UI)
- Processes the file (parsing, embedding, chunking)
- Updates document to 'ready' on success or 'failed' on error
"""
import os
from app.db import Document, DocumentStatus
from app.tasks.document_processors.base import get_current_timestamp
from app.tasks.document_processors.file_processors import (
process_file_in_background_with_document,
)
logger.info(
f"[_process_file_with_document] Starting async processing for: {filename}"
)
async with get_celery_session_maker()() as session:
logger.info(
f"[_process_file_with_document] Database session created for: {filename}"
)
task_logger = TaskLoggingService(session, search_space_id)
# Get the document
document = await session.get(Document, document_id)
if not document:
logger.error(f"Document {document_id} not found")
return
# Get file size for notification metadata
try:
file_size = os.path.getsize(temp_path)
logger.info(f"[_process_file_with_document] File size: {file_size} bytes")
except Exception as e:
logger.warning(f"[_process_file_with_document] Could not get file size: {e}")
file_size = None
# Create notification for document processing
logger.info(f"[_process_file_with_document] Creating notification for: {filename}")
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_file_upload_with_document",
source="document_processor",
message=f"Starting file processing for: {filename} (document_id: {document_id})",
metadata={
"document_type": "FILE",
"document_id": document_id,
"filename": filename,
"file_path": temp_path,
"user_id": user_id,
},
)
try:
# Set status to PROCESSING (shows spinner in UI via ElectricSQL)
document.status = DocumentStatus.processing()
await session.commit()
logger.info(
f"[_process_file_with_document] Document {document_id} status set to 'processing'"
)
# Process the file and update document
result = await process_file_in_background_with_document(
document=document,
file_path=temp_path,
filename=filename,
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
notification=notification,
)
# Update notification on success
if result:
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
logger.info(
f"[_process_file_with_document] Successfully processed document {document_id}"
)
else:
# Duplicate detected - mark as failed
document.status = DocumentStatus.failed("Duplicate content detected")
document.updated_at = get_current_timestamp()
await session.commit()
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Document already exists (duplicate)",
)
)
except Exception as e:
# Import here to avoid circular dependencies
from fastapi import HTTPException
from app.services.page_limit_service import PageLimitExceededError
# Check if this is a page limit error
page_limit_error: PageLimitExceededError | None = None
if isinstance(e, PageLimitExceededError):
page_limit_error = e
elif (
isinstance(e, HTTPException)
and e.__cause__
and isinstance(e.__cause__, PageLimitExceededError)
):
page_limit_error = e.__cause__
# Mark document as failed (shows error in UI via ElectricSQL)
error_message = str(e)[:500]
document.status = DocumentStatus.failed(error_message)
document.updated_at = get_current_timestamp()
await session.commit()
logger.info(
f"[_process_file_with_document] Document {document_id} marked as failed: {error_message[:100]}"
)
# Handle page limit errors with dedicated notification
if page_limit_error is not None:
try:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Page limit exceeded",
)
await NotificationService.page_limit.notify_page_limit_exceeded(
session=session,
user_id=UUID(user_id),
document_name=filename,
document_type="FILE",
search_space_id=search_space_id,
pages_used=page_limit_error.pages_used,
pages_limit=page_limit_error.pages_limit,
pages_to_add=page_limit_error.pages_to_add,
)
except Exception as notif_error:
logger.error(
f"Failed to create page limit notification: {notif_error!s}"
)
else:
# Update notification on failure
try:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
await task_logger.log_task_failure(
log_entry,
error_message[:100],
str(e),
{"error_type": type(e).__name__, "document_id": document_id},
)
logger.error(f"Error processing file {filename}: {e!s}")
raise
finally:
# Clean up temp file
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
logger.info(f"[_process_file_with_document] Cleaned up temp file: {temp_path}")
except Exception as cleanup_error:
logger.warning(
f"[_process_file_with_document] Failed to clean up temp file: {cleanup_error}"
)
@celery_app.task(name="process_circleback_meeting", bind=True)
def process_circleback_meeting_task(
self,

View file

@ -33,6 +33,7 @@ from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
safe_set_chunks,
)
from .markdown_processor import add_received_markdown_file_document
@ -1612,3 +1613,316 @@ async def process_file_in_background(
logging.error(f"Error processing file in background: {error_message}")
raise # Re-raise so the wrapper can also handle it
async def process_file_in_background_with_document(
document: Document,
file_path: str,
filename: str,
search_space_id: int,
user_id: str,
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
connector: dict | None = None,
notification: Notification | None = None,
) -> Document | None:
"""
Process file and update existing pending document (2-phase pattern).
This function is Phase 2 of the real-time document status updates:
- Phase 1 (API): Created document with pending status
- Phase 2 (this): Process file and update document to ready/failed
The document already exists with pending status. This function:
1. Parses the file content (markdown, audio, or ETL services)
2. Updates the document with content, embeddings, and chunks
3. Sets status to 'ready' on success
Args:
document: Existing document with pending status
file_path: Path to the uploaded file
filename: Original filename
search_space_id: ID of the search space
user_id: ID of the user
session: Database session
task_logger: Task logging service
log_entry: Log entry for this task
connector: Optional connector info for Google Drive files
notification: Optional notification for progress updates
Returns:
Updated Document object if successful, None if duplicate content detected
"""
import os
from app.config import config as app_config
from app.services.llm_service import get_user_long_context_llm
from app.utils.blocknote_converter import convert_markdown_to_blocknote
try:
markdown_content = None
etl_service = None
# ===== STEP 1: Parse file content based on type =====
# Check if the file is a markdown or text file
if filename.lower().endswith((".md", ".markdown", ".txt")):
# Update notification: parsing stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Reading file"
)
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
)
# Read markdown content directly
with open(file_path, encoding="utf-8") as f:
markdown_content = f.read()
etl_service = "MARKDOWN"
# Clean up temp file
with contextlib.suppress(Exception):
os.unlink(file_path)
# Check if the file is an audio file
elif filename.lower().endswith(
(".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
):
# Update notification: parsing stage (transcription)
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Transcribing audio"
)
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
# Transcribe audio
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
result = stt_service.transcribe_file(file_path)
transcribed_text = result.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}"
else:
with open(file_path, "rb") as audio_file:
transcription_kwargs = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
transcription_response = await atranscription(**transcription_kwargs)
transcribed_text = transcription_response.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}"
etl_service = "AUDIO_TRANSCRIPTION"
# Clean up temp file
with contextlib.suppress(Exception):
os.unlink(file_path)
else:
# Document files - use ETL service
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
page_limit_service = PageLimitService(session)
# Estimate page count
try:
estimated_pages = page_limit_service.estimate_pages_before_processing(file_path)
except Exception:
file_size = os.path.getsize(file_path)
estimated_pages = max(1, file_size // (80 * 1024))
# Check page limit
await page_limit_service.check_page_limit(user_id, estimated_pages)
if app_config.ETL_SERVICE == "UNSTRUCTURED":
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Extracting content"
)
from langchain_unstructured import UnstructuredLoader
loader = UnstructuredLoader(
file_path, mode="elements", post_processors=[], languages=["eng"],
include_orig_elements=False, include_metadata=False, strategy="auto"
)
docs = await loader.aload()
markdown_content = await convert_document_to_markdown(docs)
actual_pages = page_limit_service.estimate_pages_from_elements(docs)
final_page_count = max(estimated_pages, actual_pages)
etl_service = "UNSTRUCTURED"
# Update page usage
await page_limit_service.update_page_usage(user_id, final_page_count, allow_exceed=True)
elif app_config.ETL_SERVICE == "LLAMACLOUD":
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Extracting content"
)
result = await parse_with_llamacloud_retry(
file_path=file_path, estimated_pages=estimated_pages,
task_logger=task_logger, log_entry=log_entry
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}")
markdown_content = markdown_documents[0].text
etl_service = "LLAMACLOUD"
# Update page usage
await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True)
elif app_config.ETL_SERVICE == "DOCLING":
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="parsing", stage_message="Extracting content"
)
# Suppress logging during Docling import
getLogger("docling.pipeline.base_pipeline").setLevel(ERROR)
getLogger("docling.document_converter").setLevel(ERROR)
getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel(ERROR)
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
etl_service = "DOCLING"
# Update page usage
await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True)
else:
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
# Clean up temp file
with contextlib.suppress(Exception):
os.unlink(file_path)
if not markdown_content:
raise RuntimeError(f"Failed to extract content from file: {filename}")
# ===== STEP 2: Check for duplicate content =====
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_by_content = await check_duplicate_document(session, content_hash)
if existing_by_content and existing_by_content.id != document.id:
# Duplicate content found - mark this document as failed
logging.info(
f"Duplicate content detected for {filename}, "
f"matches document {existing_by_content.id}"
)
return None
# ===== STEP 3: Generate embeddings and chunks =====
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="chunking"
)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata = {
"file_name": filename,
"etl_service": etl_service,
"document_type": "File Document",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
# Fallback: use truncated content as summary
summary_content = markdown_content[:4000]
from app.config import config
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
# Convert to BlockNote for editing
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
# ===== STEP 4: Update document to READY =====
from sqlalchemy.orm.attributes import flag_modified
document.title = filename
document.content = summary_content
document.content_hash = content_hash
document.embedding = summary_embedding
document.document_metadata = {
"FILE_NAME": filename,
"ETL_SERVICE": etl_service or "UNKNOWN",
**(document.document_metadata or {}),
}
flag_modified(document, "document_metadata")
# Use safe_set_chunks to avoid async issues
safe_set_chunks(document, chunks)
document.blocknote_document = blocknote_json
document.content_needs_reindexing = False
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready() # Shows checkmark in UI
await session.commit()
await session.refresh(document)
await task_logger.log_task_success(
log_entry,
f"Successfully processed file: {filename}",
{
"document_id": document.id,
"content_hash": content_hash,
"file_type": etl_service,
"chunks_count": len(chunks),
},
)
return document
except Exception as e:
await session.rollback()
from app.services.page_limit_service import PageLimitExceededError
if isinstance(e, PageLimitExceededError):
error_message = str(e)
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
error_message = str(e.detail)
else:
error_message = f"Failed to process file: {filename}"
await task_logger.log_task_failure(
log_entry,
error_message,
str(e),
{"error_type": type(e).__name__, "filename": filename, "document_id": document.id},
)
logging.error(f"Error processing file with document: {error_message}")
raise