feat: implement two-phase document indexing for webcrawler and YouTube video processors with real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 04:54:50 +05:30
parent 5d2da0847e
commit cc1e796c12
2 changed files with 375 additions and 286 deletions

View file

@ -1,5 +1,9 @@
"""
Webcrawler connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
import time
@ -11,7 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.webcrawler_connector import WebCrawlerConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -28,6 +32,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -49,7 +54,11 @@ async def index_crawled_urls(
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
"""
Index web page URLs.
Index web page URLs with real-time document status updates.
Implements 2-phase approach for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Args:
session: Database session
@ -138,9 +147,9 @@ async def index_crawled_urls(
await task_logger.log_task_progress(
log_entry,
f"Starting to crawl {len(urls)} URLs",
f"Starting to process {len(urls)} URLs",
{
"stage": "crawling",
"stage": "processing",
"total_urls": len(urls),
},
)
@ -148,28 +157,118 @@ async def index_crawled_urls(
documents_indexed = 0
documents_updated = 0
documents_skipped = 0
failed_urls = []
documents_failed = 0
duplicate_content_count = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
for idx, url in enumerate(urls, 1):
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all URLs, create pending documents for new ones
# This makes ALL new documents visible in the UI immediately with pending status
# =======================================================================
urls_to_process = [] # List of dicts with document and URL data
new_documents_created = False
for url in urls:
try:
logger.info(f"Processing URL {idx}/{len(urls)}: {url}")
# Generate unique identifier hash for this URL
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CRAWLED_URL, url, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if it's already being processed
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING):
logger.info(f"URL {url} already pending. Skipping.")
documents_skipped += 1
continue
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING):
logger.info(f"URL {url} already processing. Skipping.")
documents_skipped += 1
continue
# Queue existing document for potential update check
urls_to_process.append({
'document': existing_document,
'is_new': False,
'url': url,
'unique_identifier_hash': unique_identifier_hash,
})
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=url[:100], # Placeholder - URL as title (truncated)
document_type=DocumentType.CRAWLED_URL,
document_metadata={
"url": url,
"connector_id": connector_id,
},
content="Pending crawl...", # Placeholder content
content_hash=unique_identifier_hash, # Temporary unique value
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # PENDING status - visible in UI
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
urls_to_process.append({
'document': document,
'is_new': True,
'url': url,
'unique_identifier_hash': unique_identifier_hash,
})
except Exception as e:
logger.error(f"Error in Phase 1 for URL {url}: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([u for u in urls_to_process if u['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each URL one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(urls_to_process)} URLs")
for item in urls_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed + documents_updated)
last_heartbeat_time = current_time
document = item['document']
url = item['url']
is_new = item['is_new']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
await task_logger.log_task_progress(
log_entry,
f"Crawling URL {idx}/{len(urls)}: {url}",
f"Crawling URL: {url}",
{
"stage": "crawling_url",
"url_index": idx,
"url": url,
},
)
@ -179,7 +278,10 @@ async def index_crawled_urls(
if error or not crawl_result:
logger.warning(f"Failed to crawl URL {url}: {error}")
failed_urls.append((url, error or "Unknown error"))
document.status = DocumentStatus.failed(error or "Crawl failed")
document.updated_at = get_current_timestamp()
await session.commit()
documents_failed += 1
continue
# Extract content and metadata
@ -189,23 +291,16 @@ async def index_crawled_urls(
if not content.strip():
logger.warning(f"Skipping URL with no content: {url}")
failed_urls.append((url, "No content extracted"))
documents_skipped += 1
document.status = DocumentStatus.failed("No content extracted")
document.updated_at = get_current_timestamp()
await session.commit()
documents_failed += 1
continue
# Format content as structured document for summary generation (includes all metadata)
structured_document = crawler.format_to_structured_document(
crawl_result
)
# Generate unique identifier hash for this URL
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CRAWLED_URL, url, search_space_id
)
# Format content as structured document for summary generation
structured_document = crawler.format_to_structured_document(crawl_result)
# Generate content hash using a version WITHOUT metadata
# This ensures the hash only changes when actual content changes,
# not when metadata (which contains dynamic fields like timestamps, IDs, etc.) changes
structured_document_for_hash = crawler.format_to_structured_document(
crawl_result, exclude_metadata=True
)
@ -213,114 +308,51 @@ async def index_crawled_urls(
structured_document_for_hash, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Extract useful metadata
title = metadata.get("title", url)
description = metadata.get("description", "")
language = metadata.get("language", "")
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(f"Document for URL {url} unchanged. Skipping.")
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for URL {url}. Updating document."
)
# Update title immediately for better UX
document.title = title
await session.commit()
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"url": url,
"title": title,
"description": description,
"language": language,
"document_type": "Crawled URL",
"crawler_type": crawler_type,
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
structured_document, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Crawled URL: {title}\n\n"
summary_content += f"URL: {url}\n"
if description:
summary_content += f"Description: {description}\n"
if language:
summary_content += f"Language: {language}\n"
summary_content += f"Crawler: {crawler_type}\n\n"
# Add content preview
content_preview = content[:1000]
if len(content) > 1000:
content_preview += "..."
summary_content += f"Content Preview:\n{content_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(content)
# Update existing document
existing_document.title = title
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
**metadata,
"crawler_type": crawler_type,
"last_crawled_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_updated += 1
logger.info(f"Successfully updated URL {url}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"URL {url} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
# For existing documents, check if content has changed
if not is_new and document.content_hash == content_hash:
logger.info(f"Document for URL {url} unchanged. Marking as ready.")
# Ensure status is ready (might have been stuck)
document.status = DocumentStatus.ready()
await session.commit()
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
# For new documents, check if duplicate content exists elsewhere
if is_new:
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"URL {url} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}). "
f"Marking as failed."
)
document.status = DocumentStatus.failed("Duplicate content exists")
document.updated_at = get_current_timestamp()
await session.commit()
duplicate_content_count += 1
documents_skipped += 1
continue
# Generate summary with LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
document_metadata_for_summary = {
"url": url,
"title": title,
"description": description,
@ -328,11 +360,8 @@ async def index_crawled_urls(
"document_type": "Crawled URL",
"crawler_type": crawler_type,
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
structured_document, user_llm, document_metadata
summary_content, summary_embedding = await generate_document_summary(
structured_document, user_llm, document_metadata_for_summary
)
else:
# Fallback to simple summary if no LLM configured
@ -354,32 +383,32 @@ async def index_crawled_urls(
summary_content
)
# Process chunks
chunks = await create_document_chunks(content)
document = Document(
search_space_id=search_space_id,
title=title,
document_type=DocumentType.CRAWLED_URL,
document_metadata={
**metadata,
"crawler_type": crawler_type,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
# Update document to READY with actual content
document.title = title
document.content = summary_content
document.content_hash = content_hash
document.embedding = summary_embedding
document.document_metadata = {
**metadata,
"crawler_type": crawler_type,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.status = DocumentStatus.ready() # READY status
document.updated_at = get_current_timestamp()
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new URL {url}")
if is_new:
documents_indexed += 1
else:
documents_updated += 1
# Batch commit every 10 documents
logger.info(f"Successfully processed URL {url}")
# Batch commit every 10 documents (for ready status updates)
if (documents_indexed + documents_updated) % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed + documents_updated} URLs processed so far"
@ -387,32 +416,47 @@ async def index_crawled_urls(
await session.commit()
except Exception as e:
logger.error(
f"Error processing URL {url}: {e!s}",
exc_info=True,
)
failed_urls.append((url, str(e)))
logger.error(f"Error processing URL {url}: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e)[:200])
document.updated_at = get_current_timestamp()
await session.commit()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
total_processed = documents_indexed + documents_updated
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} new, {documents_updated} updated URLs processed"
)
await session.commit()
try:
await session.commit()
logger.info("Successfully committed all webcrawler document changes to database")
except Exception as e:
# Handle any remaining integrity errors gracefully
if "duplicate key value violates unique constraint" in str(e).lower():
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Log failed URLs if any (for debugging purposes)
if failed_urls:
failed_summary = "; ".join(
[f"{url}: {error}" for url, error in failed_urls[:5]]
)
if len(failed_urls) > 5:
failed_summary += f" (and {len(failed_urls) - 5} more)"
logger.warning(f"Some URLs failed to index: {failed_summary}")
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
@ -422,19 +466,21 @@ async def index_crawled_urls(
"documents_indexed": documents_indexed,
"documents_updated": documents_updated,
"documents_skipped": documents_skipped,
"failed_urls_count": len(failed_urls),
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Web page indexing completed: {documents_indexed} new, "
f"{documents_updated} updated, {documents_skipped} skipped, "
f"{len(failed_urls)} failed"
f"{documents_failed} failed"
)
return (
total_processed,
None,
) # Return None on success (result_message is for logging only)
if warning_message:
return total_processed, f"Completed with issues: {warning_message}"
return total_processed, None
except SQLAlchemyError as db_error:
await session.rollback()
@ -482,9 +528,7 @@ async def get_crawled_url_documents(
)
if connector_id:
# Filter by connector if needed - you might need to add a connector_id field to Document
# or filter by some other means depending on your schema
pass
query = query.filter(Document.connector_id == connector_id)
result = await session.execute(query)
documents = result.scalars().all()

View file

@ -1,5 +1,9 @@
"""
YouTube video document processor.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create document with 'pending' status (visible in UI immediately)
- Phase 2: Process document: pending processing ready/failed
"""
import logging
@ -10,7 +14,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from youtube_transcript_api import YouTubeTranscriptApi
from app.db import Document, DocumentType
from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -23,6 +27,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
get_current_timestamp,
safe_set_chunks,
)
@ -58,6 +63,10 @@ async def add_youtube_video_document(
"""
Process a YouTube video URL, extract transcripts, and store as a document.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create document with 'pending' status (visible in UI immediately)
- Phase 2: Process document: pending processing ready/failed
Args:
session: Database session for storing the document
url: YouTube video URL (supports standard, shortened, and embed formats)
@ -82,15 +91,18 @@ async def add_youtube_video_document(
metadata={"url": url, "user_id": str(user_id)},
)
document = None
video_id = None
is_new_document = False
try:
# Extract video ID from URL
# Extract video ID from URL (lightweight operation)
await task_logger.log_task_progress(
log_entry,
f"Extracting video ID from URL: {url}",
{"stage": "video_id_extraction"},
)
# Get video ID
video_id = get_youtube_video_id(url)
if not video_id:
raise ValueError(f"Could not extract video ID from URL: {url}")
@ -101,13 +113,79 @@ async def add_youtube_video_document(
{"stage": "video_id_extracted", "video_id": video_id},
)
# Get video metadata
# Generate unique identifier hash for this YouTube video
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.YOUTUBE_VIDEO, video_id, search_space_id
)
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing video: {video_id}",
{"stage": "duplicate_check", "video_id": video_id},
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# =======================================================================
# PHASE 1: Create pending document or prepare existing for update
# =======================================================================
if existing_document:
document = existing_document
is_new_document = False
# Check if already being processed
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING):
logging.info(f"YouTube video {video_id} already pending. Returning existing.")
return existing_document
if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING):
logging.info(f"YouTube video {video_id} already processing. Returning existing.")
return existing_document
else:
# Create new document with PENDING status (visible in UI immediately)
await task_logger.log_task_progress(
log_entry,
f"Creating pending document for video: {video_id}",
{"stage": "pending_document_creation"},
)
document = Document(
title=f"YouTube Video: {video_id}", # Placeholder title
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
"url": url,
"video_id": video_id,
},
content="Processing video...", # Placeholder content
content_hash=unique_identifier_hash, # Temporary unique value
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation
status=DocumentStatus.pending(), # PENDING status - visible in UI
search_space_id=search_space_id,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
await session.commit() # Document visible in UI now with pending status!
is_new_document = True
logging.info(f"Created pending document for YouTube video {video_id}")
# =======================================================================
# PHASE 2: Set to PROCESSING and do heavy work
# =======================================================================
document.status = DocumentStatus.processing()
await session.commit() # UI shows "processing" status
await task_logger.log_task_progress(
log_entry,
f"Fetching video metadata for: {video_id}",
{"stage": "metadata_fetch"},
)
# Fetch video metadata
params = {
"format": "json",
"url": f"https://www.youtube.com/watch?v={video_id}",
@ -120,6 +198,10 @@ async def add_youtube_video_document(
):
video_data = await response.json()
# Update title immediately for better UX (user sees actual title sooner)
document.title = video_data.get("title", f"YouTube Video: {video_id}")
await session.commit()
await task_logger.log_task_progress(
log_entry,
f"Video metadata fetched: {video_data.get('title', 'Unknown')}",
@ -204,53 +286,26 @@ async def add_youtube_video_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
# Generate unique identifier hash for this YouTube video
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.YOUTUBE_VIDEO, video_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing video: {video_id}",
{"stage": "duplicate_check", "video_id": video_id},
)
# For existing documents, check if content has changed
if not is_new_document and existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
"video_id": video_id,
},
)
logging.info(f"Document for YouTube video {video_id} unchanged. Marking as ready.")
document.status = DocumentStatus.ready()
await session.commit()
return document
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
"video_id": video_id,
},
)
logging.info(
f"Document for YouTube video {video_id} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for YouTube video {video_id}. Updating document."
)
await task_logger.log_task_progress(
log_entry,
f"Updating YouTube video document: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_update", "video_id": video_id},
)
# Get LLM for summary generation (needed for both create and update)
# Get LLM for summary generation
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
@ -272,7 +327,7 @@ async def add_youtube_video_document(
)
# Generate summary with metadata
document_metadata = {
document_metadata_for_summary = {
"url": url,
"video_id": video_id,
"title": video_data.get("title", "YouTube Video"),
@ -282,7 +337,7 @@ async def add_youtube_video_document(
"has_transcript": "No captions available" not in transcript_text,
}
summary_content, summary_embedding = await generate_document_summary(
combined_document_string, user_llm, document_metadata
combined_document_string, user_llm, document_metadata_for_summary
)
# Process chunks
@ -304,65 +359,33 @@ async def add_youtube_video_document(
chunks = await create_document_chunks(combined_document_string)
# Update or create document
if existing_document:
# Update existing document
await task_logger.log_task_progress(
log_entry,
f"Updating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_update", "chunks_count": len(chunks)},
)
# =======================================================================
# PHASE 3: Update document to READY with all content
# =======================================================================
await task_logger.log_task_progress(
log_entry,
f"Finalizing document: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_finalization", "chunks_count": len(chunks)},
)
existing_document.title = video_data.get("title", "YouTube Video")
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
}
existing_document.chunks = chunks
existing_document.blocknote_document = blocknote_json
existing_document.updated_at = get_current_timestamp()
document.title = video_data.get("title", "YouTube Video")
document.content = summary_content
document.content_hash = content_hash
document.embedding = summary_embedding
document.document_metadata = {
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
}
safe_set_chunks(document, chunks)
document.blocknote_document = blocknote_json
document.status = DocumentStatus.ready() # READY status - fully processed
document.updated_at = get_current_timestamp()
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
await task_logger.log_task_progress(
log_entry,
f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
document = Document(
title=video_data.get("title", "YouTube Video"),
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
search_space_id=search_space_id,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(
@ -380,27 +403,49 @@ async def add_youtube_video_document(
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
# Mark document as failed if it exists
if document:
try:
document.status = DocumentStatus.failed(f"Database error: {str(db_error)[:150]}")
document.updated_at = get_current_timestamp()
await session.commit()
except Exception:
await session.rollback()
else:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error while processing YouTube video: {url}",
str(db_error),
{
"error_type": "SQLAlchemyError",
"video_id": video_id if "video_id" in locals() else None,
"video_id": video_id,
},
)
raise db_error
except Exception as e:
await session.rollback()
# Mark document as failed if it exists
if document:
try:
document.status = DocumentStatus.failed(str(e)[:200])
document.updated_at = get_current_timestamp()
await session.commit()
except Exception:
await session.rollback()
else:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to process YouTube video: {url}",
str(e),
{
"error_type": type(e).__name__,
"video_id": video_id if "video_id" in locals() else None,
"video_id": video_id,
},
)
logging.error(f"Failed to process YouTube video: {e!s}")