diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 81e3fdc72..be2f1c8c7 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -2,17 +2,14 @@ import asyncio from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile -from litellm import atranscription from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload -from app.config import config as app_config from app.db import ( Chunk, Document, DocumentType, - Log, SearchSpace, User, get_async_session, @@ -24,16 +21,6 @@ from app.schemas import ( DocumentWithChunksRead, PaginatedResponse, ) -from app.services.task_logging_service import TaskLoggingService -from app.tasks.document_processors import ( - add_crawled_url_document, - add_extension_received_document, - add_received_file_document_using_docling, - add_received_file_document_using_llamacloud, - add_received_file_document_using_unstructured, - add_received_markdown_file_document, - add_youtube_video_document, -) from app.users import current_active_user from app.utils.check_ownership import check_ownership @@ -166,6 +153,7 @@ async def read_documents( page: int | None = None, page_size: int = 50, search_space_id: int | None = None, + document_types: str | None = None, session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), ): @@ -177,6 +165,7 @@ async def read_documents( page: Zero-based page index used when 'skip' is not provided. page_size: Number of items per page (default: 50). Use -1 to return all remaining items after the offset. search_space_id: If provided, restrict results to a specific search space. + document_types: Comma-separated list of document types to filter by (e.g., "EXTENSION,FILE,SLACK_CONNECTOR"). session: Database session (injected). user: Current authenticated user (injected). @@ -198,6 +187,12 @@ async def read_documents( if search_space_id is not None: query = query.filter(Document.search_space_id == search_space_id) + # Filter by document_types if provided + if document_types is not None and document_types.strip(): + type_list = [t.strip() for t in document_types.split(",") if t.strip()] + if type_list: + query = query.filter(Document.document_type.in_(type_list)) + # Get total count count_query = ( select(func.count()) @@ -209,6 +204,10 @@ async def read_documents( count_query = count_query.filter( Document.search_space_id == search_space_id ) + if document_types is not None and document_types.strip(): + type_list = [t.strip() for t in document_types.split(",") if t.strip()] + if type_list: + count_query = count_query.filter(Document.document_type.in_(type_list)) total_result = await session.execute(count_query) total = total_result.scalar() or 0 @@ -256,11 +255,12 @@ async def search_documents( page: int | None = None, page_size: int = 50, search_space_id: int | None = None, + document_types: str | None = None, session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), ): """ - Search documents by title substring, optionally filtered by search_space_id. + Search documents by title substring, optionally filtered by search_space_id and document_types. Args: title: Case-insensitive substring to match against document titles. Required. @@ -268,6 +268,7 @@ async def search_documents( page: Zero-based page index used when 'skip' is not provided. Default: None. page_size: Number of items per page. Use -1 to return all remaining items after the offset. Default: 50. search_space_id: Filter results to a specific search space. Default: None. + document_types: Comma-separated list of document types to filter by (e.g., "EXTENSION,FILE,SLACK_CONNECTOR"). session: Database session (injected). user: Current authenticated user (injected). @@ -290,6 +291,12 @@ async def search_documents( # Only search by title (case-insensitive) query = query.filter(Document.title.ilike(f"%{title}%")) + # Filter by document_types if provided + if document_types is not None and document_types.strip(): + type_list = [t.strip() for t in document_types.split(",") if t.strip()] + if type_list: + query = query.filter(Document.document_type.in_(type_list)) + # Get total count count_query = ( select(func.count()) @@ -302,6 +309,10 @@ async def search_documents( Document.search_space_id == search_space_id ) count_query = count_query.filter(Document.title.ilike(f"%{title}%")) + if document_types is not None and document_types.strip(): + type_list = [t.strip() for t in document_types.split(",") if t.strip()] + if type_list: + count_query = count_query.filter(Document.document_type.in_(type_list)) total_result = await session.execute(count_query) total = total_result.scalar() or 0 @@ -455,6 +466,46 @@ async def delete_document( ) from e +@router.get("/documents/type-counts/") +async def get_document_type_counts( + search_space_id: int | None = None, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """ + Get counts of documents by type for the current user. + + Args: + search_space_id: If provided, restrict counts to a specific search space. + session: Database session (injected). + user: Current authenticated user (injected). + + Returns: + Dict mapping document types to their counts. + """ + try: + from sqlalchemy import func + + query = ( + select(Document.document_type, func.count(Document.id)) + .join(SearchSpace) + .filter(SearchSpace.user_id == user.id) + .group_by(Document.document_type) + ) + + if search_space_id is not None: + query = query.filter(Document.search_space_id == search_space_id) + + result = await session.execute(query) + type_counts = dict(result.all()) + + return type_counts + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to fetch document type counts: {e!s}" + ) from e + + @router.get("/documents/by-chunk/{chunk_id}", response_model=DocumentWithChunksRead) async def get_document_by_chunk_id( chunk_id: int, @@ -510,623 +561,3 @@ async def get_document_by_chunk_id( raise HTTPException( status_code=500, detail=f"Failed to retrieve document: {e!s}" ) from e - - -async def process_extension_document_with_new_session( - individual_document, search_space_id: int, user_id: str -): - """Create a new session and process extension document.""" - from app.db import async_session_maker - from app.services.task_logging_service import TaskLoggingService - - async with async_session_maker() as session: - # Initialize task logging service - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="process_extension_document", - source="document_processor", - message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}", - metadata={ - "document_type": "EXTENSION", - "url": individual_document.metadata.VisitedWebPageURL, - "title": individual_document.metadata.VisitedWebPageTitle, - "user_id": user_id, - }, - ) - - try: - result = await add_extension_received_document( - session, individual_document, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}", - {"document_id": result.id, "content_hash": result.content_hash}, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}", - {"duplicate_detected": True}, - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}", - str(e), - {"error_type": type(e).__name__}, - ) - import logging - - logging.error(f"Error processing extension document: {e!s}") - - -async def process_crawled_url_with_new_session( - url: str, search_space_id: int, user_id: str -): - """Create a new session and process crawled URL.""" - from app.db import async_session_maker - from app.services.task_logging_service import TaskLoggingService - - async with async_session_maker() as session: - # Initialize task logging service - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="process_crawled_url", - source="document_processor", - message=f"Starting URL crawling and processing for: {url}", - metadata={"document_type": "CRAWLED_URL", "url": url, "user_id": user_id}, - ) - - try: - result = await add_crawled_url_document( - session, url, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully crawled and processed URL: {url}", - { - "document_id": result.id, - "title": result.title, - "content_hash": result.content_hash, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"URL document already exists (duplicate): {url}", - {"duplicate_detected": True}, - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to crawl URL: {url}", - str(e), - {"error_type": type(e).__name__}, - ) - import logging - - logging.error(f"Error processing crawled URL: {e!s}") - - -async def process_file_in_background_with_new_session( - file_path: str, filename: str, search_space_id: int, user_id: str -): - """Create a new session and process file.""" - from app.db import async_session_maker - from app.services.task_logging_service import TaskLoggingService - - async with async_session_maker() as session: - # Initialize task logging service - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="process_file_upload", - source="document_processor", - message=f"Starting file processing for: {filename}", - metadata={ - "document_type": "FILE", - "filename": filename, - "file_path": file_path, - "user_id": user_id, - }, - ) - - try: - await process_file_in_background( - file_path, - filename, - search_space_id, - user_id, - session, - task_logger, - log_entry, - ) - - # Note: success/failure logging is handled within process_file_in_background - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to process file: {filename}", - str(e), - {"error_type": type(e).__name__}, - ) - import logging - - logging.error(f"Error processing file: {e!s}") - - -async def process_youtube_video_with_new_session( - url: str, search_space_id: int, user_id: str -): - """Create a new session and process YouTube video.""" - from app.db import async_session_maker - from app.services.task_logging_service import TaskLoggingService - - async with async_session_maker() as session: - # Initialize task logging service - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="process_youtube_video", - source="document_processor", - message=f"Starting YouTube video processing for: {url}", - metadata={"document_type": "YOUTUBE_VIDEO", "url": url, "user_id": user_id}, - ) - - try: - result = await add_youtube_video_document( - session, url, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully processed YouTube video: {result.title}", - { - "document_id": result.id, - "video_id": result.document_metadata.get("video_id"), - "content_hash": result.content_hash, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"YouTube video document already exists (duplicate): {url}", - {"duplicate_detected": True}, - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to process YouTube video: {url}", - str(e), - {"error_type": type(e).__name__}, - ) - import logging - - logging.error(f"Error processing YouTube video: {e!s}") - - -async def process_file_in_background( - file_path: str, - filename: str, - search_space_id: int, - user_id: str, - session: AsyncSession, - task_logger: TaskLoggingService, - log_entry: Log, -): - try: - # Check if the file is a markdown or text file - if filename.lower().endswith((".md", ".markdown", ".txt")): - await task_logger.log_task_progress( - log_entry, - f"Processing markdown/text file: {filename}", - {"file_type": "markdown", "processing_stage": "reading_file"}, - ) - - # For markdown files, read the content directly - with open(file_path, encoding="utf-8") as f: - markdown_content = f.read() - - # Clean up the temp file - import os - - try: - os.unlink(file_path) - except Exception as e: - print("Error deleting temp file", e) - pass - - await task_logger.log_task_progress( - log_entry, - f"Creating document from markdown content: {filename}", - { - "processing_stage": "creating_document", - "content_length": len(markdown_content), - }, - ) - - # Process markdown directly through specialized function - result = await add_received_markdown_file_document( - session, filename, markdown_content, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully processed markdown file: {filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "markdown", - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Markdown file already exists (duplicate): {filename}", - {"duplicate_detected": True, "file_type": "markdown"}, - ) - - # Check if the file is an audio file - elif filename.lower().endswith( - (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm") - ): - await task_logger.log_task_progress( - log_entry, - f"Processing audio file for transcription: {filename}", - {"file_type": "audio", "processing_stage": "starting_transcription"}, - ) - - # Determine STT service type - stt_service_type = ( - "local" - if app_config.STT_SERVICE - and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - - # Check if using local STT service - if stt_service_type == "local": - # Use local Faster-Whisper for transcription - from app.services.stt_service import stt_service - - try: - result = stt_service.transcribe_file(file_path) - transcribed_text = result.get("text", "") - - if not transcribed_text: - raise ValueError("Transcription returned empty text") - - # Add metadata about the transcription - transcribed_text = ( - f"# Transcription of {filename}\n\n{transcribed_text}" - ) - except Exception as e: - raise HTTPException( - status_code=422, - detail=f"Failed to transcribe audio file {filename}: {e!s}", - ) from e - - await task_logger.log_task_progress( - log_entry, - f"Local STT transcription completed: {filename}", - { - "processing_stage": "local_transcription_complete", - "language": result.get("language"), - "confidence": result.get("language_probability"), - "duration": result.get("duration"), - }, - ) - else: - # Use LiteLLM for audio transcription - with open(file_path, "rb") as audio_file: - transcription_kwargs = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - transcription_kwargs["api_base"] = ( - app_config.STT_SERVICE_API_BASE - ) - - transcription_response = await atranscription( - **transcription_kwargs - ) - - # Extract the transcribed text - transcribed_text = transcription_response.get("text", "") - - if not transcribed_text: - raise ValueError("Transcription returned empty text") - - # Add metadata about the transcription - transcribed_text = ( - f"# Transcription of {filename}\n\n{transcribed_text}" - ) - - await task_logger.log_task_progress( - log_entry, - f"Transcription completed, creating document: {filename}", - { - "processing_stage": "transcription_complete", - "transcript_length": len(transcribed_text), - }, - ) - - # Clean up the temp file - try: - os.unlink(file_path) - except Exception as e: - print("Error deleting temp file", e) - pass - - # Process transcription as markdown document - result = await add_received_markdown_file_document( - session, filename, transcribed_text, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully transcribed and processed audio file: {filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "audio", - "transcript_length": len(transcribed_text), - "stt_service": stt_service_type, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Audio file transcript already exists (duplicate): {filename}", - {"duplicate_detected": True, "file_type": "audio"}, - ) - - else: - if app_config.ETL_SERVICE == "UNSTRUCTURED": - await task_logger.log_task_progress( - log_entry, - f"Processing file with Unstructured ETL: {filename}", - { - "file_type": "document", - "etl_service": "UNSTRUCTURED", - "processing_stage": "loading", - }, - ) - - from langchain_unstructured import UnstructuredLoader - - # Process the file - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - - docs = await loader.aload() - - await task_logger.log_task_progress( - log_entry, - f"Unstructured ETL completed, creating document: {filename}", - {"processing_stage": "etl_complete", "elements_count": len(docs)}, - ) - - # Clean up the temp file - import os - - try: - os.unlink(file_path) - except Exception as e: - print("Error deleting temp file", e) - pass - - # Pass the documents to the existing background task - result = await add_received_file_document_using_unstructured( - session, filename, docs, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully processed file with Unstructured: {filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "document", - "etl_service": "UNSTRUCTURED", - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Document already exists (duplicate): {filename}", - { - "duplicate_detected": True, - "file_type": "document", - "etl_service": "UNSTRUCTURED", - }, - ) - - elif app_config.ETL_SERVICE == "LLAMACLOUD": - await task_logger.log_task_progress( - log_entry, - f"Processing file with LlamaCloud ETL: {filename}", - { - "file_type": "document", - "etl_service": "LLAMACLOUD", - "processing_stage": "parsing", - }, - ) - - from llama_cloud_services import LlamaParse - from llama_cloud_services.parse.utils import ResultType - - # Create LlamaParse parser instance - parser = LlamaParse( - api_key=app_config.LLAMA_CLOUD_API_KEY, - num_workers=1, # Use single worker for file processing - verbose=True, - language="en", - result_type=ResultType.MD, - ) - - # Parse the file asynchronously - result = await parser.aparse(file_path) - - # Clean up the temp file - import os - - try: - os.unlink(file_path) - except Exception as e: - print("Error deleting temp file", e) - pass - - # Get markdown documents from the result - markdown_documents = await result.aget_markdown_documents( - split_by_page=False - ) - - await task_logger.log_task_progress( - log_entry, - f"LlamaCloud parsing completed, creating documents: {filename}", - { - "processing_stage": "parsing_complete", - "documents_count": len(markdown_documents), - }, - ) - - for doc in markdown_documents: - # Extract text content from the markdown documents - markdown_content = doc.text - - # Process the documents using our LlamaCloud background task - doc_result = await add_received_file_document_using_llamacloud( - session, - filename, - llamacloud_markdown_document=markdown_content, - search_space_id=search_space_id, - user_id=user_id, - ) - - if doc_result: - await task_logger.log_task_success( - log_entry, - f"Successfully processed file with LlamaCloud: {filename}", - { - "document_id": doc_result.id, - "content_hash": doc_result.content_hash, - "file_type": "document", - "etl_service": "LLAMACLOUD", - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Document already exists (duplicate): {filename}", - { - "duplicate_detected": True, - "file_type": "document", - "etl_service": "LLAMACLOUD", - }, - ) - - elif app_config.ETL_SERVICE == "DOCLING": - await task_logger.log_task_progress( - log_entry, - f"Processing file with Docling ETL: {filename}", - { - "file_type": "document", - "etl_service": "DOCLING", - "processing_stage": "parsing", - }, - ) - - # Use Docling service for document processing - from app.services.docling_service import create_docling_service - - # Create Docling service - docling_service = create_docling_service() - - # Process the document - result = await docling_service.process_document(file_path, filename) - - # Clean up the temp file - import os - - try: - os.unlink(file_path) - except Exception as e: - print("Error deleting temp file", e) - pass - - await task_logger.log_task_progress( - log_entry, - f"Docling parsing completed, creating document: {filename}", - { - "processing_stage": "parsing_complete", - "content_length": len(result["content"]), - }, - ) - - # Process the document using our Docling background task - doc_result = await add_received_file_document_using_docling( - session, - filename, - docling_markdown_document=result["content"], - search_space_id=search_space_id, - user_id=user_id, - ) - - if doc_result: - await task_logger.log_task_success( - log_entry, - f"Successfully processed file with Docling: {filename}", - { - "document_id": doc_result.id, - "content_hash": doc_result.content_hash, - "file_type": "document", - "etl_service": "DOCLING", - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Document already exists (duplicate): {filename}", - { - "duplicate_detected": True, - "file_type": "document", - "etl_service": "DOCLING", - }, - ) - except Exception as e: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Failed to process file: {filename}", - str(e), - {"error_type": type(e).__name__, "filename": filename}, - ) - import logging - - logging.error(f"Error processing file in background: {e!s}") - raise # Re-raise so the wrapper can also handle it diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index d09467be0..8ccdde4f1 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -280,7 +280,7 @@ async def _process_file_upload( file_path: str, filename: str, search_space_id: int, user_id: str ): """Process file upload with new session.""" - from app.routes.documents_routes import process_file_in_background + from app.tasks.document_processors.file_processors import process_file_in_background async with get_celery_session_maker()() as session: task_logger = TaskLoggingService(session, search_space_id) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index f509e700b..2ae6b4bba 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -4,12 +4,16 @@ File document processors for different ETL services (Unstructured, LlamaCloud, D import logging +from fastapi import HTTPException from langchain_core.documents import Document as LangChainDocument +from litellm import atranscription from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Document, DocumentType +from app.config import config as app_config +from app.db import Document, DocumentType, Log from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( convert_document_to_markdown, create_document_chunks, @@ -21,6 +25,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, ) +from .markdown_processor import add_received_markdown_file_document async def add_received_file_document_using_unstructured( @@ -391,3 +396,418 @@ async def add_received_file_document_using_docling( raise RuntimeError( f"Failed to process file document using Docling: {e!s}" ) from e + + +async def process_file_in_background( + file_path: str, + filename: str, + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, +): + try: + # Check if the file is a markdown or text file + if filename.lower().endswith((".md", ".markdown", ".txt")): + await task_logger.log_task_progress( + log_entry, + f"Processing markdown/text file: {filename}", + {"file_type": "markdown", "processing_stage": "reading_file"}, + ) + + # For markdown files, read the content directly + with open(file_path, encoding="utf-8") as f: + markdown_content = f.read() + + # Clean up the temp file + import os + + try: + os.unlink(file_path) + except Exception as e: + print("Error deleting temp file", e) + pass + + await task_logger.log_task_progress( + log_entry, + f"Creating document from markdown content: {filename}", + { + "processing_stage": "creating_document", + "content_length": len(markdown_content), + }, + ) + + # Process markdown directly through specialized function + result = await add_received_markdown_file_document( + session, filename, markdown_content, search_space_id, user_id + ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed markdown file: {filename}", + { + "document_id": result.id, + "content_hash": result.content_hash, + "file_type": "markdown", + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Markdown file already exists (duplicate): {filename}", + {"duplicate_detected": True, "file_type": "markdown"}, + ) + + # Check if the file is an audio file + elif filename.lower().endswith( + (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm") + ): + await task_logger.log_task_progress( + log_entry, + f"Processing audio file for transcription: {filename}", + {"file_type": "audio", "processing_stage": "starting_transcription"}, + ) + + # Determine STT service type + stt_service_type = ( + "local" + if app_config.STT_SERVICE + and app_config.STT_SERVICE.startswith("local/") + else "external" + ) + + # Check if using local STT service + if stt_service_type == "local": + # Use local Faster-Whisper for transcription + from app.services.stt_service import stt_service + + try: + result = stt_service.transcribe_file(file_path) + transcribed_text = result.get("text", "") + + if not transcribed_text: + raise ValueError("Transcription returned empty text") + + # Add metadata about the transcription + transcribed_text = ( + f"# Transcription of {filename}\n\n{transcribed_text}" + ) + except Exception as e: + raise HTTPException( + status_code=422, + detail=f"Failed to transcribe audio file {filename}: {e!s}", + ) from e + + await task_logger.log_task_progress( + log_entry, + f"Local STT transcription completed: {filename}", + { + "processing_stage": "local_transcription_complete", + "language": result.get("language"), + "confidence": result.get("language_probability"), + "duration": result.get("duration"), + }, + ) + else: + # Use LiteLLM for audio transcription + with open(file_path, "rb") as audio_file: + transcription_kwargs = { + "model": app_config.STT_SERVICE, + "file": audio_file, + "api_key": app_config.STT_SERVICE_API_KEY, + } + if app_config.STT_SERVICE_API_BASE: + transcription_kwargs["api_base"] = ( + app_config.STT_SERVICE_API_BASE + ) + + transcription_response = await atranscription( + **transcription_kwargs + ) + + # Extract the transcribed text + transcribed_text = transcription_response.get("text", "") + + if not transcribed_text: + raise ValueError("Transcription returned empty text") + + # Add metadata about the transcription + transcribed_text = ( + f"# Transcription of {filename}\n\n{transcribed_text}" + ) + + await task_logger.log_task_progress( + log_entry, + f"Transcription completed, creating document: {filename}", + { + "processing_stage": "transcription_complete", + "transcript_length": len(transcribed_text), + }, + ) + + # Clean up the temp file + try: + os.unlink(file_path) + except Exception as e: + print("Error deleting temp file", e) + pass + + # Process transcription as markdown document + result = await add_received_markdown_file_document( + session, filename, transcribed_text, search_space_id, user_id + ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully transcribed and processed audio file: {filename}", + { + "document_id": result.id, + "content_hash": result.content_hash, + "file_type": "audio", + "transcript_length": len(transcribed_text), + "stt_service": stt_service_type, + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Audio file transcript already exists (duplicate): {filename}", + {"duplicate_detected": True, "file_type": "audio"}, + ) + + else: + if app_config.ETL_SERVICE == "UNSTRUCTURED": + await task_logger.log_task_progress( + log_entry, + f"Processing file with Unstructured ETL: {filename}", + { + "file_type": "document", + "etl_service": "UNSTRUCTURED", + "processing_stage": "loading", + }, + ) + + from langchain_unstructured import UnstructuredLoader + + # Process the file + loader = UnstructuredLoader( + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", + ) + + docs = await loader.aload() + + await task_logger.log_task_progress( + log_entry, + f"Unstructured ETL completed, creating document: {filename}", + {"processing_stage": "etl_complete", "elements_count": len(docs)}, + ) + + # Clean up the temp file + import os + + try: + os.unlink(file_path) + except Exception as e: + print("Error deleting temp file", e) + pass + + # Pass the documents to the existing background task + result = await add_received_file_document_using_unstructured( + session, filename, docs, search_space_id, user_id + ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed file with Unstructured: {filename}", + { + "document_id": result.id, + "content_hash": result.content_hash, + "file_type": "document", + "etl_service": "UNSTRUCTURED", + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Document already exists (duplicate): {filename}", + { + "duplicate_detected": True, + "file_type": "document", + "etl_service": "UNSTRUCTURED", + }, + ) + + elif app_config.ETL_SERVICE == "LLAMACLOUD": + await task_logger.log_task_progress( + log_entry, + f"Processing file with LlamaCloud ETL: {filename}", + { + "file_type": "document", + "etl_service": "LLAMACLOUD", + "processing_stage": "parsing", + }, + ) + + from llama_cloud_services import LlamaParse + from llama_cloud_services.parse.utils import ResultType + + # Create LlamaParse parser instance + parser = LlamaParse( + api_key=app_config.LLAMA_CLOUD_API_KEY, + num_workers=1, # Use single worker for file processing + verbose=True, + language="en", + result_type=ResultType.MD, + ) + + # Parse the file asynchronously + result = await parser.aparse(file_path) + + # Clean up the temp file + import os + + try: + os.unlink(file_path) + except Exception as e: + print("Error deleting temp file", e) + pass + + # Get markdown documents from the result + markdown_documents = await result.aget_markdown_documents( + split_by_page=False + ) + + await task_logger.log_task_progress( + log_entry, + f"LlamaCloud parsing completed, creating documents: {filename}", + { + "processing_stage": "parsing_complete", + "documents_count": len(markdown_documents), + }, + ) + + for doc in markdown_documents: + # Extract text content from the markdown documents + markdown_content = doc.text + + # Process the documents using our LlamaCloud background task + doc_result = await add_received_file_document_using_llamacloud( + session, + filename, + llamacloud_markdown_document=markdown_content, + search_space_id=search_space_id, + user_id=user_id, + ) + + if doc_result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed file with LlamaCloud: {filename}", + { + "document_id": doc_result.id, + "content_hash": doc_result.content_hash, + "file_type": "document", + "etl_service": "LLAMACLOUD", + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Document already exists (duplicate): {filename}", + { + "duplicate_detected": True, + "file_type": "document", + "etl_service": "LLAMACLOUD", + }, + ) + + elif app_config.ETL_SERVICE == "DOCLING": + await task_logger.log_task_progress( + log_entry, + f"Processing file with Docling ETL: {filename}", + { + "file_type": "document", + "etl_service": "DOCLING", + "processing_stage": "parsing", + }, + ) + + # Use Docling service for document processing + from app.services.docling_service import create_docling_service + + # Create Docling service + docling_service = create_docling_service() + + # Process the document + result = await docling_service.process_document(file_path, filename) + + # Clean up the temp file + import os + + try: + os.unlink(file_path) + except Exception as e: + print("Error deleting temp file", e) + pass + + await task_logger.log_task_progress( + log_entry, + f"Docling parsing completed, creating document: {filename}", + { + "processing_stage": "parsing_complete", + "content_length": len(result["content"]), + }, + ) + + # Process the document using our Docling background task + doc_result = await add_received_file_document_using_docling( + session, + filename, + docling_markdown_document=result["content"], + search_space_id=search_space_id, + user_id=user_id, + ) + + if doc_result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed file with Docling: {filename}", + { + "document_id": doc_result.id, + "content_hash": doc_result.content_hash, + "file_type": "document", + "etl_service": "DOCLING", + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Document already exists (duplicate): {filename}", + { + "duplicate_detected": True, + "file_type": "document", + "etl_service": "DOCLING", + }, + ) + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to process file: {filename}", + str(e), + {"error_type": type(e).__name__, "filename": filename}, + ) + import logging + + logging.error(f"Error processing file in background: {e!s}") + raise # Re-raise so the wrapper can also handle it diff --git a/surfsense_web/app/(home)/login/LocalLoginForm.tsx b/surfsense_web/app/(home)/login/LocalLoginForm.tsx index 7d79a0c35..21737714c 100644 --- a/surfsense_web/app/(home)/login/LocalLoginForm.tsx +++ b/surfsense_web/app/(home)/login/LocalLoginForm.tsx @@ -1,10 +1,10 @@ "use client"; +import { Eye, EyeOff } from "lucide-react"; import { AnimatePresence, motion } from "motion/react"; import Link from "next/link"; import { useRouter } from "next/navigation"; import { useEffect, useState } from "react"; import { toast } from "sonner"; -import { Eye, EyeOff } from "lucide-react"; import { getAuthErrorDetails, isNetworkError, shouldRetry } from "@/lib/auth-errors"; export function LocalLoginForm() { @@ -191,36 +191,36 @@ export function LocalLoginForm() { /> -
+
-
- setPassword(e.target.value)} - className={`mt-1 block w-full rounded-md border pr-10 px-3 py-2 shadow-sm focus:outline-none focus:ring-2 focus:ring-offset-2 dark:bg-gray-800 dark:text-white transition-colors ${ - error - ? "border-red-300 focus:border-red-500 focus:ring-red-500 dark:border-red-700" - : "border-gray-300 focus:border-blue-500 focus:ring-blue-500 dark:border-gray-700" - }`} - disabled={isLoading} - /> - -
+
+ setPassword(e.target.value)} + className={`mt-1 block w-full rounded-md border pr-10 px-3 py-2 shadow-sm focus:outline-none focus:ring-2 focus:ring-offset-2 dark:bg-gray-800 dark:text-white transition-colors ${ + error + ? "border-red-300 focus:border-red-500 focus:ring-red-500 dark:border-red-700" + : "border-gray-300 focus:border-blue-500 focus:ring-blue-500 dark:border-gray-700" + }`} + disabled={isLoading} + /> + +