diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index cb11a6ec2..5d25b4623 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -1,5 +1,9 @@ """ Webcrawler connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time @@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.webcrawler_connector import WebCrawlerConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -28,6 +32,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -49,7 +54,11 @@ async def index_crawled_urls( on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ - Index web page URLs. + Index web page URLs with real-time document status updates. + + Implements 2-phase approach for real-time UI feedback: + - Phase 1: Create all documents with 'pending' status (visible in UI immediately) + - Phase 2: Process each document: pending → processing → ready/failed Args: session: Database session @@ -138,9 +147,9 @@ async def index_crawled_urls( await task_logger.log_task_progress( log_entry, - f"Starting to crawl {len(urls)} URLs", + f"Starting to process {len(urls)} URLs", { - "stage": "crawling", + "stage": "processing", "total_urls": len(urls), }, ) @@ -148,28 +157,118 @@ async def index_crawled_urls( documents_indexed = 0 documents_updated = 0 documents_skipped = 0 - failed_urls = [] + documents_failed = 0 + duplicate_content_count = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() - for idx, url in enumerate(urls, 1): - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze all URLs, create pending documents for new ones + # This makes ALL new documents visible in the UI immediately with pending status + # ======================================================================= + urls_to_process = [] # List of dicts with document and URL data + new_documents_created = False + + for url in urls: try: - logger.info(f"Processing URL {idx}/{len(urls)}: {url}") + # Generate unique identifier hash for this URL + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CRAWLED_URL, url, search_space_id + ) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if it's already being processed + if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING): + logger.info(f"URL {url} already pending. Skipping.") + documents_skipped += 1 + continue + if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING): + logger.info(f"URL {url} already processing. Skipping.") + documents_skipped += 1 + continue + + # Queue existing document for potential update check + urls_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'url': url, + 'unique_identifier_hash': unique_identifier_hash, + }) + continue + + # Create new document with PENDING status (visible in UI immediately) + document = Document( + search_space_id=search_space_id, + title=url[:100], # Placeholder - URL as title (truncated) + document_type=DocumentType.CRAWLED_URL, + document_metadata={ + "url": url, + "connector_id": connector_id, + }, + content="Pending crawl...", # Placeholder content + content_hash=unique_identifier_hash, # Temporary unique value + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # PENDING status - visible in UI + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + new_documents_created = True + + urls_to_process.append({ + 'document': document, + 'is_new': True, + 'url': url, + 'unique_identifier_hash': unique_identifier_hash, + }) + + except Exception as e: + logger.error(f"Error in Phase 1 for URL {url}: {e!s}", exc_info=True) + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([u for u in urls_to_process if u['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each URL one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(urls_to_process)} URLs") + + for item in urls_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed + documents_updated) + last_heartbeat_time = current_time + + document = item['document'] + url = item['url'] + is_new = item['is_new'] + + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() await task_logger.log_task_progress( log_entry, - f"Crawling URL {idx}/{len(urls)}: {url}", + f"Crawling URL: {url}", { "stage": "crawling_url", - "url_index": idx, "url": url, }, ) @@ -179,7 +278,10 @@ async def index_crawled_urls( if error or not crawl_result: logger.warning(f"Failed to crawl URL {url}: {error}") - failed_urls.append((url, error or "Unknown error")) + document.status = DocumentStatus.failed(error or "Crawl failed") + document.updated_at = get_current_timestamp() + await session.commit() + documents_failed += 1 continue # Extract content and metadata @@ -189,23 +291,16 @@ async def index_crawled_urls( if not content.strip(): logger.warning(f"Skipping URL with no content: {url}") - failed_urls.append((url, "No content extracted")) - documents_skipped += 1 + document.status = DocumentStatus.failed("No content extracted") + document.updated_at = get_current_timestamp() + await session.commit() + documents_failed += 1 continue - # Format content as structured document for summary generation (includes all metadata) - structured_document = crawler.format_to_structured_document( - crawl_result - ) - - # Generate unique identifier hash for this URL - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.CRAWLED_URL, url, search_space_id - ) + # Format content as structured document for summary generation + structured_document = crawler.format_to_structured_document(crawl_result) # Generate content hash using a version WITHOUT metadata - # This ensures the hash only changes when actual content changes, - # not when metadata (which contains dynamic fields like timestamps, IDs, etc.) changes structured_document_for_hash = crawler.format_to_structured_document( crawl_result, exclude_metadata=True ) @@ -213,114 +308,51 @@ async def index_crawled_urls( structured_document_for_hash, search_space_id ) - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - # Extract useful metadata title = metadata.get("title", url) description = metadata.get("description", "") language = metadata.get("language", "") - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - logger.info(f"Document for URL {url} unchanged. Skipping.") - documents_skipped += 1 - continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for URL {url}. Updating document." - ) + # Update title immediately for better UX + document.title = title + await session.commit() - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "url": url, - "title": title, - "description": description, - "language": language, - "document_type": "Crawled URL", - "crawler_type": crawler_type, - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - structured_document, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = f"Crawled URL: {title}\n\n" - summary_content += f"URL: {url}\n" - if description: - summary_content += f"Description: {description}\n" - if language: - summary_content += f"Language: {language}\n" - summary_content += f"Crawler: {crawler_type}\n\n" - - # Add content preview - content_preview = content[:1000] - if len(content) > 1000: - content_preview += "..." - summary_content += f"Content Preview:\n{content_preview}\n" - - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - # Process chunks - chunks = await create_document_chunks(content) - - # Update existing document - existing_document.title = title - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - **metadata, - "crawler_type": crawler_type, - "last_crawled_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_updated += 1 - logger.info(f"Successfully updated URL {url}") - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - - if duplicate_by_content: - logger.info( - f"URL {url} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) + # For existing documents, check if content has changed + if not is_new and document.content_hash == content_hash: + logger.info(f"Document for URL {url} unchanged. Marking as ready.") + # Ensure status is ready (might have been stuck) + document.status = DocumentStatus.ready() + await session.commit() documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata + # For new documents, check if duplicate content exists elsewhere + if is_new: + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"URL {url} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}). " + f"Marking as failed." + ) + document.status = DocumentStatus.failed("Duplicate content exists") + document.updated_at = get_current_timestamp() + await session.commit() + duplicate_content_count += 1 + documents_skipped += 1 + continue + + # Generate summary with LLM user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) if user_llm: - document_metadata = { + document_metadata_for_summary = { "url": url, "title": title, "description": description, @@ -328,11 +360,8 @@ async def index_crawled_urls( "document_type": "Crawled URL", "crawler_type": crawler_type, } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - structured_document, user_llm, document_metadata + summary_content, summary_embedding = await generate_document_summary( + structured_document, user_llm, document_metadata_for_summary ) else: # Fallback to simple summary if no LLM configured @@ -354,32 +383,32 @@ async def index_crawled_urls( summary_content ) + # Process chunks chunks = await create_document_chunks(content) - document = Document( - search_space_id=search_space_id, - title=title, - document_type=DocumentType.CRAWLED_URL, - document_metadata={ - **metadata, - "crawler_type": crawler_type, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) + # Update document to READY with actual content + document.title = title + document.content = summary_content + document.content_hash = content_hash + document.embedding = summary_embedding + document.document_metadata = { + **metadata, + "crawler_type": crawler_type, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.status = DocumentStatus.ready() # READY status + document.updated_at = get_current_timestamp() - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new URL {url}") + if is_new: + documents_indexed += 1 + else: + documents_updated += 1 - # Batch commit every 10 documents + logger.info(f"Successfully processed URL {url}") + + # Batch commit every 10 documents (for ready status updates) if (documents_indexed + documents_updated) % 10 == 0: logger.info( f"Committing batch: {documents_indexed + documents_updated} URLs processed so far" @@ -387,32 +416,47 @@ async def index_crawled_urls( await session.commit() except Exception as e: - logger.error( - f"Error processing URL {url}: {e!s}", - exc_info=True, - ) - failed_urls.append((url, str(e))) + logger.error(f"Error processing URL {url}: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)[:200]) + document.updated_at = get_current_timestamp() + await session.commit() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 continue total_processed = documents_indexed + documents_updated - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info( f"Final commit: Total {documents_indexed} new, {documents_updated} updated URLs processed" ) - await session.commit() + try: + await session.commit() + logger.info("Successfully committed all webcrawler document changes to database") + except Exception as e: + # Handle any remaining integrity errors gracefully + if "duplicate key value violates unique constraint" in str(e).lower(): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise - # Log failed URLs if any (for debugging purposes) - if failed_urls: - failed_summary = "; ".join( - [f"{url}: {error}" for url, error in failed_urls[:5]] - ) - if len(failed_urls) > 5: - failed_summary += f" (and {len(failed_urls) - 5} more)" - logger.warning(f"Some URLs failed to index: {failed_summary}") + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None await task_logger.log_task_success( log_entry, @@ -422,19 +466,21 @@ async def index_crawled_urls( "documents_indexed": documents_indexed, "documents_updated": documents_updated, "documents_skipped": documents_skipped, - "failed_urls_count": len(failed_urls), + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, }, ) logger.info( f"Web page indexing completed: {documents_indexed} new, " f"{documents_updated} updated, {documents_skipped} skipped, " - f"{len(failed_urls)} failed" + f"{documents_failed} failed" ) - return ( - total_processed, - None, - ) # Return None on success (result_message is for logging only) + + if warning_message: + return total_processed, f"Completed with issues: {warning_message}" + + return total_processed, None except SQLAlchemyError as db_error: await session.rollback() @@ -482,9 +528,7 @@ async def get_crawled_url_documents( ) if connector_id: - # Filter by connector if needed - you might need to add a connector_id field to Document - # or filter by some other means depending on your schema - pass + query = query.filter(Document.connector_id == connector_id) result = await session.execute(query) documents = result.scalars().all() diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index 7251fb22f..19092b592 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -1,5 +1,9 @@ """ YouTube video document processor. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create document with 'pending' status (visible in UI immediately) +- Phase 2: Process document: pending → processing → ready/failed """ import logging @@ -10,7 +14,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from youtube_transcript_api import YouTubeTranscriptApi -from app.db import Document, DocumentType +from app.db import Document, DocumentStatus, DocumentType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -23,6 +27,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, get_current_timestamp, + safe_set_chunks, ) @@ -58,6 +63,10 @@ async def add_youtube_video_document( """ Process a YouTube video URL, extract transcripts, and store as a document. + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Create document with 'pending' status (visible in UI immediately) + - Phase 2: Process document: pending → processing → ready/failed + Args: session: Database session for storing the document url: YouTube video URL (supports standard, shortened, and embed formats) @@ -82,15 +91,18 @@ async def add_youtube_video_document( metadata={"url": url, "user_id": str(user_id)}, ) + document = None + video_id = None + is_new_document = False + try: - # Extract video ID from URL + # Extract video ID from URL (lightweight operation) await task_logger.log_task_progress( log_entry, f"Extracting video ID from URL: {url}", {"stage": "video_id_extraction"}, ) - # Get video ID video_id = get_youtube_video_id(url) if not video_id: raise ValueError(f"Could not extract video ID from URL: {url}") @@ -101,13 +113,79 @@ async def add_youtube_video_document( {"stage": "video_id_extracted", "video_id": video_id}, ) - # Get video metadata + # Generate unique identifier hash for this YouTube video + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.YOUTUBE_VIDEO, video_id, search_space_id + ) + + # Check if document with this unique identifier already exists + await task_logger.log_task_progress( + log_entry, + f"Checking for existing video: {video_id}", + {"stage": "duplicate_check", "video_id": video_id}, + ) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # ======================================================================= + # PHASE 1: Create pending document or prepare existing for update + # ======================================================================= + if existing_document: + document = existing_document + is_new_document = False + # Check if already being processed + if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING): + logging.info(f"YouTube video {video_id} already pending. Returning existing.") + return existing_document + if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING): + logging.info(f"YouTube video {video_id} already processing. Returning existing.") + return existing_document + else: + # Create new document with PENDING status (visible in UI immediately) + await task_logger.log_task_progress( + log_entry, + f"Creating pending document for video: {video_id}", + {"stage": "pending_document_creation"}, + ) + + document = Document( + title=f"YouTube Video: {video_id}", # Placeholder title + document_type=DocumentType.YOUTUBE_VIDEO, + document_metadata={ + "url": url, + "video_id": video_id, + }, + content="Processing video...", # Placeholder content + content_hash=unique_identifier_hash, # Temporary unique value + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation + status=DocumentStatus.pending(), # PENDING status - visible in UI + search_space_id=search_space_id, + updated_at=get_current_timestamp(), + created_by_id=user_id, + ) + session.add(document) + await session.commit() # Document visible in UI now with pending status! + is_new_document = True + + logging.info(f"Created pending document for YouTube video {video_id}") + + # ======================================================================= + # PHASE 2: Set to PROCESSING and do heavy work + # ======================================================================= + document.status = DocumentStatus.processing() + await session.commit() # UI shows "processing" status + await task_logger.log_task_progress( log_entry, f"Fetching video metadata for: {video_id}", {"stage": "metadata_fetch"}, ) + # Fetch video metadata params = { "format": "json", "url": f"https://www.youtube.com/watch?v={video_id}", @@ -120,6 +198,10 @@ async def add_youtube_video_document( ): video_data = await response.json() + # Update title immediately for better UX (user sees actual title sooner) + document.title = video_data.get("title", f"YouTube Video: {video_id}") + await session.commit() + await task_logger.log_task_progress( log_entry, f"Video metadata fetched: {video_data.get('title', 'Unknown')}", @@ -204,53 +286,26 @@ async def add_youtube_video_document( document_parts.append("") combined_document_string = "\n".join(document_parts) - # Generate unique identifier hash for this YouTube video - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.YOUTUBE_VIDEO, video_id, search_space_id - ) - # Generate content hash content_hash = generate_content_hash(combined_document_string, search_space_id) - # Check if document with this unique identifier already exists - await task_logger.log_task_progress( - log_entry, - f"Checking for existing video: {video_id}", - {"stage": "duplicate_check", "video_id": video_id}, - ) + # For existing documents, check if content has changed + if not is_new_document and existing_document.content_hash == content_hash: + await task_logger.log_task_success( + log_entry, + f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}", + { + "duplicate_detected": True, + "existing_document_id": existing_document.id, + "video_id": video_id, + }, + ) + logging.info(f"Document for YouTube video {video_id} unchanged. Marking as ready.") + document.status = DocumentStatus.ready() + await session.commit() + return document - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - await task_logger.log_task_success( - log_entry, - f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - "video_id": video_id, - }, - ) - logging.info( - f"Document for YouTube video {video_id} unchanged. Skipping." - ) - return existing_document - else: - # Content has changed - update the existing document - logging.info( - f"Content changed for YouTube video {video_id}. Updating document." - ) - await task_logger.log_task_progress( - log_entry, - f"Updating YouTube video document: {video_data.get('title', 'YouTube Video')}", - {"stage": "document_update", "video_id": video_id}, - ) - - # Get LLM for summary generation (needed for both create and update) + # Get LLM for summary generation await task_logger.log_task_progress( log_entry, f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}", @@ -272,7 +327,7 @@ async def add_youtube_video_document( ) # Generate summary with metadata - document_metadata = { + document_metadata_for_summary = { "url": url, "video_id": video_id, "title": video_data.get("title", "YouTube Video"), @@ -282,7 +337,7 @@ async def add_youtube_video_document( "has_transcript": "No captions available" not in transcript_text, } summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm, document_metadata + combined_document_string, user_llm, document_metadata_for_summary ) # Process chunks @@ -304,65 +359,33 @@ async def add_youtube_video_document( chunks = await create_document_chunks(combined_document_string) - # Update or create document - if existing_document: - # Update existing document - await task_logger.log_task_progress( - log_entry, - f"Updating YouTube video document in database: {video_data.get('title', 'YouTube Video')}", - {"stage": "document_update", "chunks_count": len(chunks)}, - ) + # ======================================================================= + # PHASE 3: Update document to READY with all content + # ======================================================================= + await task_logger.log_task_progress( + log_entry, + f"Finalizing document: {video_data.get('title', 'YouTube Video')}", + {"stage": "document_finalization", "chunks_count": len(chunks)}, + ) - existing_document.title = video_data.get("title", "YouTube Video") - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "url": url, - "video_id": video_id, - "video_title": video_data.get("title", "YouTube Video"), - "author": video_data.get("author_name", "Unknown"), - "thumbnail": video_data.get("thumbnail_url", ""), - } - existing_document.chunks = chunks - existing_document.blocknote_document = blocknote_json - existing_document.updated_at = get_current_timestamp() + document.title = video_data.get("title", "YouTube Video") + document.content = summary_content + document.content_hash = content_hash + document.embedding = summary_embedding + document.document_metadata = { + "url": url, + "video_id": video_id, + "video_title": video_data.get("title", "YouTube Video"), + "author": video_data.get("author_name", "Unknown"), + "thumbnail": video_data.get("thumbnail_url", ""), + } + safe_set_chunks(document, chunks) + document.blocknote_document = blocknote_json + document.status = DocumentStatus.ready() # READY status - fully processed + document.updated_at = get_current_timestamp() - await session.commit() - await session.refresh(existing_document) - document = existing_document - else: - # Create new document - await task_logger.log_task_progress( - log_entry, - f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}", - {"stage": "document_creation", "chunks_count": len(chunks)}, - ) - - document = Document( - title=video_data.get("title", "YouTube Video"), - document_type=DocumentType.YOUTUBE_VIDEO, - document_metadata={ - "url": url, - "video_id": video_id, - "video_title": video_data.get("title", "YouTube Video"), - "author": video_data.get("author_name", "Unknown"), - "thumbnail": video_data.get("thumbnail_url", ""), - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - search_space_id=search_space_id, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - blocknote_document=blocknote_json, - updated_at=get_current_timestamp(), - created_by_id=user_id, - ) - - session.add(document) - await session.commit() - await session.refresh(document) + await session.commit() + await session.refresh(document) # Log success await task_logger.log_task_success( @@ -380,27 +403,49 @@ async def add_youtube_video_document( ) return document + except SQLAlchemyError as db_error: - await session.rollback() + # Mark document as failed if it exists + if document: + try: + document.status = DocumentStatus.failed(f"Database error: {str(db_error)[:150]}") + document.updated_at = get_current_timestamp() + await session.commit() + except Exception: + await session.rollback() + else: + await session.rollback() + await task_logger.log_task_failure( log_entry, f"Database error while processing YouTube video: {url}", str(db_error), { "error_type": "SQLAlchemyError", - "video_id": video_id if "video_id" in locals() else None, + "video_id": video_id, }, ) raise db_error + except Exception as e: - await session.rollback() + # Mark document as failed if it exists + if document: + try: + document.status = DocumentStatus.failed(str(e)[:200]) + document.updated_at = get_current_timestamp() + await session.commit() + except Exception: + await session.rollback() + else: + await session.rollback() + await task_logger.log_task_failure( log_entry, f"Failed to process YouTube video: {url}", str(e), { "error_type": type(e).__name__, - "video_id": video_id if "video_id" in locals() else None, + "video_id": video_id, }, ) logging.error(f"Failed to process YouTube video: {e!s}")