diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index b905ebf91..00c80dcb5 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -113,9 +113,23 @@ async def create_documents_file_upload( user: User = Depends(current_active_user), ): """ - Upload files as documents. + Upload files as documents with real-time status tracking. + + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Create all documents with 'pending' status (visible in UI immediately via ElectricSQL) + - Phase 2: Celery processes each file: pending → processing → ready/failed + Requires DOCUMENTS_CREATE permission. """ + from datetime import datetime + + from app.db import DocumentStatus + from app.tasks.document_processors.base import ( + check_document_by_unique_identifier, + get_current_timestamp, + ) + from app.utils.document_converters import generate_unique_identifier_hash + try: # Check permission await check_permission( @@ -129,38 +143,101 @@ async def create_documents_file_upload( if not files: raise HTTPException(status_code=400, detail="No files provided") + created_documents: list[Document] = [] + files_to_process: list[tuple[Document, str, str]] = [] # (document, temp_path, filename) + skipped_duplicates = 0 + + # ===== PHASE 1: Create pending documents for all files ===== + # This makes ALL documents visible in the UI immediately with pending status for file in files: try: - # Save file to a temporary location to avoid stream issues import os import tempfile - # Create temp file + # Save file to temp location with tempfile.NamedTemporaryFile( - delete=False, suffix=os.path.splitext(file.filename)[1] + delete=False, suffix=os.path.splitext(file.filename or "")[1] ) as temp_file: temp_path = temp_file.name - # Write uploaded file to temp file content = await file.read() with open(temp_path, "wb") as f: f.write(content) - from app.tasks.celery_tasks.document_tasks import ( - process_file_upload_task, + file_size = len(content) + + # Generate unique identifier for deduplication check + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.FILE, file.filename or "unknown", search_space_id ) - process_file_upload_task.delay( - temp_path, file.filename, search_space_id, str(user.id) + # Check if document already exists (by unique identifier) + existing = await check_document_by_unique_identifier( + session, unique_identifier_hash ) + if existing: + # Clean up temp file for duplicates + os.unlink(temp_path) + skipped_duplicates += 1 + continue + + # Create pending document (visible immediately in UI via ElectricSQL) + document = Document( + search_space_id=search_space_id, + title=file.filename or "Uploaded File", + document_type=DocumentType.FILE, + document_metadata={ + "FILE_NAME": file.filename, + "file_size": file_size, + "upload_time": datetime.now().isoformat(), + }, + content="Processing...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary, updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + status=DocumentStatus.pending(), # Shows "pending" in UI + updated_at=get_current_timestamp(), + created_by_id=str(user.id), + ) + session.add(document) + created_documents.append(document) + files_to_process.append((document, temp_path, file.filename or "unknown")) + except Exception as e: raise HTTPException( status_code=422, detail=f"Failed to process file {file.filename}: {e!s}", ) from e - await session.commit() - return {"message": "Files uploaded for processing"} + # Commit all pending documents - they appear in UI immediately via ElectricSQL + if created_documents: + await session.commit() + # Refresh to get generated IDs + for doc in created_documents: + await session.refresh(doc) + + # ===== PHASE 2: Dispatch Celery tasks for each file ===== + # Each task will update document status: pending → processing → ready/failed + from app.tasks.celery_tasks.document_tasks import ( + process_file_upload_with_document_task, + ) + + for document, temp_path, filename in files_to_process: + process_file_upload_with_document_task.delay( + document_id=document.id, + temp_path=temp_path, + filename=filename, + search_space_id=search_space_id, + user_id=str(user.id), + ) + + return { + "message": "Files uploaded for processing", + "document_ids": [doc.id for doc in created_documents], + "total_files": len(files), + "pending_files": len(files_to_process), + "skipped_duplicates": skipped_duplicates, + } except HTTPException: raise except Exception as e: diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index f310bb03e..cd5537927 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -537,6 +537,298 @@ async def _process_file_upload( raise +@celery_app.task(name="process_file_upload_with_document", bind=True) +def process_file_upload_with_document_task( + self, + document_id: int, + temp_path: str, + filename: str, + search_space_id: int, + user_id: str, +): + """ + Celery task to process uploaded file with existing pending document. + + This task is used by the 2-phase document upload flow: + - Phase 1 (API): Creates pending document (visible in UI immediately) + - Phase 2 (this task): Updates document status: pending → processing → ready/failed + + Args: + document_id: ID of the pending document created in Phase 1 + temp_path: Path to the uploaded file + filename: Original filename + search_space_id: ID of the search space + user_id: ID of the user + """ + import asyncio + import os + import traceback + + logger.info( + f"[process_file_upload_with_document] Task started - document_id: {document_id}, " + f"file: {filename}, search_space_id: {search_space_id}" + ) + + # Check if file exists and is accessible + if not os.path.exists(temp_path): + logger.error( + f"[process_file_upload_with_document] File does not exist: {temp_path}. " + "The temp file may have been cleaned up before the task ran." + ) + # Mark document as failed since file is missing + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete( + _mark_document_failed( + document_id, + "File not found - temp file may have been cleaned up", + ) + ) + finally: + loop.close() + return + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _process_file_with_document( + document_id, temp_path, filename, search_space_id, user_id + ) + ) + logger.info( + f"[process_file_upload_with_document] Task completed successfully for: {filename}" + ) + except Exception as e: + logger.error( + f"[process_file_upload_with_document] Task failed for {filename}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise + finally: + loop.close() + + +async def _mark_document_failed(document_id: int, reason: str): + """Mark a document as failed when task cannot proceed.""" + from app.db import Document, DocumentStatus + from app.tasks.document_processors.base import get_current_timestamp + + async with get_celery_session_maker()() as session: + document = await session.get(Document, document_id) + if document: + document.status = DocumentStatus.failed(reason) + document.updated_at = get_current_timestamp() + await session.commit() + logger.info(f"Marked document {document_id} as failed: {reason}") + + +async def _process_file_with_document( + document_id: int, + temp_path: str, + filename: str, + search_space_id: int, + user_id: str, +): + """ + Process file and update existing pending document status. + + This function implements Phase 2 of the 2-phase document upload: + - Sets document status to 'processing' (shows spinner in UI) + - Processes the file (parsing, embedding, chunking) + - Updates document to 'ready' on success or 'failed' on error + """ + import os + + from app.db import Document, DocumentStatus + from app.tasks.document_processors.base import get_current_timestamp + from app.tasks.document_processors.file_processors import ( + process_file_in_background_with_document, + ) + + logger.info( + f"[_process_file_with_document] Starting async processing for: {filename}" + ) + + async with get_celery_session_maker()() as session: + logger.info( + f"[_process_file_with_document] Database session created for: {filename}" + ) + task_logger = TaskLoggingService(session, search_space_id) + + # Get the document + document = await session.get(Document, document_id) + if not document: + logger.error(f"Document {document_id} not found") + return + + # Get file size for notification metadata + try: + file_size = os.path.getsize(temp_path) + logger.info(f"[_process_file_with_document] File size: {file_size} bytes") + except Exception as e: + logger.warning(f"[_process_file_with_document] Could not get file size: {e}") + file_size = None + + # Create notification for document processing + logger.info(f"[_process_file_with_document] Creating notification for: {filename}") + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="FILE", + document_name=filename, + search_space_id=search_space_id, + file_size=file_size, + ) + ) + + log_entry = await task_logger.log_task_start( + task_name="process_file_upload_with_document", + source="document_processor", + message=f"Starting file processing for: {filename} (document_id: {document_id})", + metadata={ + "document_type": "FILE", + "document_id": document_id, + "filename": filename, + "file_path": temp_path, + "user_id": user_id, + }, + ) + + try: + # Set status to PROCESSING (shows spinner in UI via ElectricSQL) + document.status = DocumentStatus.processing() + await session.commit() + logger.info( + f"[_process_file_with_document] Document {document_id} status set to 'processing'" + ) + + # Process the file and update document + result = await process_file_in_background_with_document( + document=document, + file_path=temp_path, + filename=filename, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + notification=notification, + ) + + # Update notification on success + if result: + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + document_id=result.id, + chunks_count=None, + ) + ) + logger.info( + f"[_process_file_with_document] Successfully processed document {document_id}" + ) + else: + # Duplicate detected - mark as failed + document.status = DocumentStatus.failed("Duplicate content detected") + document.updated_at = get_current_timestamp() + await session.commit() + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Document already exists (duplicate)", + ) + ) + + except Exception as e: + # Import here to avoid circular dependencies + from fastapi import HTTPException + + from app.services.page_limit_service import PageLimitExceededError + + # Check if this is a page limit error + page_limit_error: PageLimitExceededError | None = None + if isinstance(e, PageLimitExceededError): + page_limit_error = e + elif ( + isinstance(e, HTTPException) + and e.__cause__ + and isinstance(e.__cause__, PageLimitExceededError) + ): + page_limit_error = e.__cause__ + + # Mark document as failed (shows error in UI via ElectricSQL) + error_message = str(e)[:500] + document.status = DocumentStatus.failed(error_message) + document.updated_at = get_current_timestamp() + await session.commit() + logger.info( + f"[_process_file_with_document] Document {document_id} marked as failed: {error_message[:100]}" + ) + + # Handle page limit errors with dedicated notification + if page_limit_error is not None: + try: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Page limit exceeded", + ) + await NotificationService.page_limit.notify_page_limit_exceeded( + session=session, + user_id=UUID(user_id), + document_name=filename, + document_type="FILE", + search_space_id=search_space_id, + pages_used=page_limit_error.pages_used, + pages_limit=page_limit_error.pages_limit, + pages_to_add=page_limit_error.pages_to_add, + ) + except Exception as notif_error: + logger.error( + f"Failed to create page limit notification: {notif_error!s}" + ) + else: + # Update notification on failure + try: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:100], + ) + except Exception as notif_error: + logger.error( + f"Failed to update notification on failure: {notif_error!s}" + ) + + await task_logger.log_task_failure( + log_entry, + error_message[:100], + str(e), + {"error_type": type(e).__name__, "document_id": document_id}, + ) + logger.error(f"Error processing file {filename}: {e!s}") + raise + + finally: + # Clean up temp file + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + logger.info(f"[_process_file_with_document] Cleaned up temp file: {temp_path}") + except Exception as cleanup_error: + logger.warning( + f"[_process_file_with_document] Failed to clean up temp file: {cleanup_error}" + ) + + @celery_app.task(name="process_circleback_meeting", bind=True) def process_circleback_meeting_task( self, diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 4433cb11e..e14dc3f42 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -33,6 +33,7 @@ from .base import ( check_document_by_unique_identifier, check_duplicate_document, get_current_timestamp, + safe_set_chunks, ) from .markdown_processor import add_received_markdown_file_document @@ -1612,3 +1613,316 @@ async def process_file_in_background( logging.error(f"Error processing file in background: {error_message}") raise # Re-raise so the wrapper can also handle it + + +async def process_file_in_background_with_document( + document: Document, + file_path: str, + filename: str, + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, + connector: dict | None = None, + notification: Notification | None = None, +) -> Document | None: + """ + Process file and update existing pending document (2-phase pattern). + + This function is Phase 2 of the real-time document status updates: + - Phase 1 (API): Created document with pending status + - Phase 2 (this): Process file and update document to ready/failed + + The document already exists with pending status. This function: + 1. Parses the file content (markdown, audio, or ETL services) + 2. Updates the document with content, embeddings, and chunks + 3. Sets status to 'ready' on success + + Args: + document: Existing document with pending status + file_path: Path to the uploaded file + filename: Original filename + search_space_id: ID of the search space + user_id: ID of the user + session: Database session + task_logger: Task logging service + log_entry: Log entry for this task + connector: Optional connector info for Google Drive files + notification: Optional notification for progress updates + + Returns: + Updated Document object if successful, None if duplicate content detected + """ + import os + + from app.config import config as app_config + from app.services.llm_service import get_user_long_context_llm + from app.utils.blocknote_converter import convert_markdown_to_blocknote + + try: + markdown_content = None + etl_service = None + + # ===== STEP 1: Parse file content based on type ===== + + # Check if the file is a markdown or text file + if filename.lower().endswith((".md", ".markdown", ".txt")): + # Update notification: parsing stage + if notification: + await NotificationService.document_processing.notify_processing_progress( + session, notification, stage="parsing", stage_message="Reading file" + ) + + await task_logger.log_task_progress( + log_entry, + f"Processing markdown/text file: {filename}", + {"file_type": "markdown", "processing_stage": "reading_file"}, + ) + + # Read markdown content directly + with open(file_path, encoding="utf-8") as f: + markdown_content = f.read() + etl_service = "MARKDOWN" + + # Clean up temp file + with contextlib.suppress(Exception): + os.unlink(file_path) + + # Check if the file is an audio file + elif filename.lower().endswith( + (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm") + ): + # Update notification: parsing stage (transcription) + if notification: + await NotificationService.document_processing.notify_processing_progress( + session, notification, stage="parsing", stage_message="Transcribing audio" + ) + + await task_logger.log_task_progress( + log_entry, + f"Processing audio file for transcription: {filename}", + {"file_type": "audio", "processing_stage": "starting_transcription"}, + ) + + # Transcribe audio + stt_service_type = ( + "local" + if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") + else "external" + ) + + if stt_service_type == "local": + from app.services.stt_service import stt_service + + result = stt_service.transcribe_file(file_path) + transcribed_text = result.get("text", "") + if not transcribed_text: + raise ValueError("Transcription returned empty text") + markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}" + else: + with open(file_path, "rb") as audio_file: + transcription_kwargs = { + "model": app_config.STT_SERVICE, + "file": audio_file, + "api_key": app_config.STT_SERVICE_API_KEY, + } + if app_config.STT_SERVICE_API_BASE: + transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE + transcription_response = await atranscription(**transcription_kwargs) + transcribed_text = transcription_response.get("text", "") + if not transcribed_text: + raise ValueError("Transcription returned empty text") + markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}" + + etl_service = "AUDIO_TRANSCRIPTION" + # Clean up temp file + with contextlib.suppress(Exception): + os.unlink(file_path) + + else: + # Document files - use ETL service + from app.services.page_limit_service import PageLimitExceededError, PageLimitService + + page_limit_service = PageLimitService(session) + + # Estimate page count + try: + estimated_pages = page_limit_service.estimate_pages_before_processing(file_path) + except Exception: + file_size = os.path.getsize(file_path) + estimated_pages = max(1, file_size // (80 * 1024)) + + # Check page limit + await page_limit_service.check_page_limit(user_id, estimated_pages) + + if app_config.ETL_SERVICE == "UNSTRUCTURED": + if notification: + await NotificationService.document_processing.notify_processing_progress( + session, notification, stage="parsing", stage_message="Extracting content" + ) + + from langchain_unstructured import UnstructuredLoader + + loader = UnstructuredLoader( + file_path, mode="elements", post_processors=[], languages=["eng"], + include_orig_elements=False, include_metadata=False, strategy="auto" + ) + docs = await loader.aload() + markdown_content = await convert_document_to_markdown(docs) + actual_pages = page_limit_service.estimate_pages_from_elements(docs) + final_page_count = max(estimated_pages, actual_pages) + etl_service = "UNSTRUCTURED" + + # Update page usage + await page_limit_service.update_page_usage(user_id, final_page_count, allow_exceed=True) + + elif app_config.ETL_SERVICE == "LLAMACLOUD": + if notification: + await NotificationService.document_processing.notify_processing_progress( + session, notification, stage="parsing", stage_message="Extracting content" + ) + + result = await parse_with_llamacloud_retry( + file_path=file_path, estimated_pages=estimated_pages, + task_logger=task_logger, log_entry=log_entry + ) + markdown_documents = await result.aget_markdown_documents(split_by_page=False) + if not markdown_documents: + raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}") + markdown_content = markdown_documents[0].text + etl_service = "LLAMACLOUD" + + # Update page usage + await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True) + + elif app_config.ETL_SERVICE == "DOCLING": + if notification: + await NotificationService.document_processing.notify_processing_progress( + session, notification, stage="parsing", stage_message="Extracting content" + ) + + # Suppress logging during Docling import + getLogger("docling.pipeline.base_pipeline").setLevel(ERROR) + getLogger("docling.document_converter").setLevel(ERROR) + getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel(ERROR) + + from docling.document_converter import DocumentConverter + + converter = DocumentConverter() + result = converter.convert(file_path) + markdown_content = result.document.export_to_markdown() + etl_service = "DOCLING" + + # Update page usage + await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True) + + else: + raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + + # Clean up temp file + with contextlib.suppress(Exception): + os.unlink(file_path) + + if not markdown_content: + raise RuntimeError(f"Failed to extract content from file: {filename}") + + # ===== STEP 2: Check for duplicate content ===== + content_hash = generate_content_hash(markdown_content, search_space_id) + + existing_by_content = await check_duplicate_document(session, content_hash) + if existing_by_content and existing_by_content.id != document.id: + # Duplicate content found - mark this document as failed + logging.info( + f"Duplicate content detected for {filename}, " + f"matches document {existing_by_content.id}" + ) + return None + + # ===== STEP 3: Generate embeddings and chunks ===== + if notification: + await NotificationService.document_processing.notify_processing_progress( + session, notification, stage="chunking" + ) + + user_llm = await get_user_long_context_llm(session, user_id, search_space_id) + + if user_llm: + document_metadata = { + "file_name": filename, + "etl_service": etl_service, + "document_type": "File Document", + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + # Fallback: use truncated content as summary + summary_content = markdown_content[:4000] + from app.config import config + + summary_embedding = config.embedding_model_instance.embed(summary_content) + + chunks = await create_document_chunks(markdown_content) + + # Convert to BlockNote for editing + blocknote_json = await convert_markdown_to_blocknote(markdown_content) + + # ===== STEP 4: Update document to READY ===== + from sqlalchemy.orm.attributes import flag_modified + + document.title = filename + document.content = summary_content + document.content_hash = content_hash + document.embedding = summary_embedding + document.document_metadata = { + "FILE_NAME": filename, + "ETL_SERVICE": etl_service or "UNKNOWN", + **(document.document_metadata or {}), + } + flag_modified(document, "document_metadata") + + # Use safe_set_chunks to avoid async issues + safe_set_chunks(document, chunks) + + document.blocknote_document = blocknote_json + document.content_needs_reindexing = False + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() # Shows checkmark in UI + + await session.commit() + await session.refresh(document) + + await task_logger.log_task_success( + log_entry, + f"Successfully processed file: {filename}", + { + "document_id": document.id, + "content_hash": content_hash, + "file_type": etl_service, + "chunks_count": len(chunks), + }, + ) + + return document + + except Exception as e: + await session.rollback() + + from app.services.page_limit_service import PageLimitExceededError + + if isinstance(e, PageLimitExceededError): + error_message = str(e) + elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower(): + error_message = str(e.detail) + else: + error_message = f"Failed to process file: {filename}" + + await task_logger.log_task_failure( + log_entry, + error_message, + str(e), + {"error_type": type(e).__name__, "filename": filename, "document_id": document.id}, + ) + logging.error(f"Error processing file with document: {error_message}") + raise