mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 08:46:22 +02:00
1105 lines
41 KiB
Python
1105 lines
41 KiB
Python
"""Celery tasks for document processing."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
|
from sqlalchemy.pool import NullPool
|
|
|
|
from app.celery_app import celery_app
|
|
from app.config import config
|
|
from app.services.notification_service import NotificationService
|
|
from app.services.task_logging_service import TaskLoggingService
|
|
from app.tasks.document_processors import (
|
|
add_extension_received_document,
|
|
add_youtube_video_document,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ===== Redis heartbeat for document processing tasks =====
|
|
# Same mechanism as connector indexing heartbeats (search_source_connectors_routes.py).
|
|
# A background coroutine refreshes a Redis key every 60s with a 2-min TTL.
|
|
# If the Celery worker crashes, the coroutine dies, the key expires, and the
|
|
# stale_notification_cleanup_task detects the missing key and marks the
|
|
# notification + document as failed.
|
|
_doc_heartbeat_redis = None
|
|
HEARTBEAT_TTL_SECONDS = 120 # 2 minutes — same as connector indexing
|
|
HEARTBEAT_REFRESH_INTERVAL = 60 # Refresh every 60 seconds
|
|
|
|
|
|
def _get_doc_heartbeat_redis():
|
|
"""Get Redis client for document processing heartbeat."""
|
|
import redis
|
|
|
|
global _doc_heartbeat_redis
|
|
if _doc_heartbeat_redis is None:
|
|
_doc_heartbeat_redis = redis.from_url(
|
|
config.REDIS_APP_URL, decode_responses=True
|
|
)
|
|
return _doc_heartbeat_redis
|
|
|
|
|
|
def _get_heartbeat_key(notification_id: int) -> str:
|
|
"""Generate Redis key for document processing heartbeat.
|
|
|
|
Uses same key pattern as connector indexing: indexing:heartbeat:{notification_id}
|
|
"""
|
|
return f"indexing:heartbeat:{notification_id}"
|
|
|
|
|
|
def _start_heartbeat(notification_id: int) -> None:
|
|
"""Set initial Redis heartbeat key for a document processing task."""
|
|
try:
|
|
key = _get_heartbeat_key(notification_id)
|
|
_get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "started")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to set initial heartbeat for notification {notification_id}: {e}"
|
|
)
|
|
|
|
|
|
def _stop_heartbeat(notification_id: int) -> None:
|
|
"""Delete Redis heartbeat key when task completes (success or failure)."""
|
|
try:
|
|
key = _get_heartbeat_key(notification_id)
|
|
_get_doc_heartbeat_redis().delete(key)
|
|
except Exception:
|
|
pass # Key will expire on its own
|
|
|
|
|
|
async def _run_heartbeat_loop(notification_id: int):
|
|
"""Background coroutine that refreshes Redis heartbeat every 60 seconds.
|
|
|
|
This keeps the heartbeat alive while the task is running.
|
|
When the task finishes, this coroutine is cancelled via heartbeat_task.cancel().
|
|
When the worker crashes, this coroutine dies with it and the key expires.
|
|
"""
|
|
key = _get_heartbeat_key(notification_id)
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL)
|
|
try:
|
|
_get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "alive")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to refresh heartbeat for notification {notification_id}: {e}"
|
|
)
|
|
except asyncio.CancelledError:
|
|
pass # Normal cancellation when task completes
|
|
|
|
|
|
def get_celery_session_maker():
|
|
"""
|
|
Create a new async session maker for Celery tasks.
|
|
This is necessary because Celery tasks run in a new event loop,
|
|
and the default session maker is bound to the main app's event loop.
|
|
"""
|
|
engine = create_async_engine(
|
|
config.DATABASE_URL,
|
|
poolclass=NullPool, # Don't use connection pooling for Celery tasks
|
|
echo=False,
|
|
)
|
|
return async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
|
|
@celery_app.task(name="process_extension_document", bind=True)
|
|
def process_extension_document_task(
|
|
self, individual_document_dict, search_space_id: int, user_id: str
|
|
):
|
|
"""
|
|
Celery task to process extension document.
|
|
|
|
Args:
|
|
individual_document_dict: Document data as dictionary
|
|
search_space_id: ID of the search space
|
|
user_id: ID of the user
|
|
"""
|
|
# Create a new event loop for this task
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_until_complete(
|
|
_process_extension_document(
|
|
individual_document_dict, search_space_id, user_id
|
|
)
|
|
)
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _process_extension_document(
|
|
individual_document_dict, search_space_id: int, user_id: str
|
|
):
|
|
"""Process extension document with new session."""
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
|
|
# Reconstruct the document object from dict
|
|
# You'll need to define the proper model for this
|
|
class DocumentMetadata(BaseModel):
|
|
VisitedWebPageTitle: str
|
|
VisitedWebPageURL: str
|
|
BrowsingSessionId: str
|
|
VisitedWebPageDateWithTimeInISOString: str
|
|
VisitedWebPageReffererURL: str
|
|
VisitedWebPageVisitDurationInMilliseconds: str
|
|
|
|
class IndividualDocument(BaseModel):
|
|
model_config = ConfigDict(populate_by_name=True)
|
|
metadata: DocumentMetadata
|
|
page_content: str = Field(alias="pageContent")
|
|
|
|
individual_document = IndividualDocument(**individual_document_dict)
|
|
|
|
async with get_celery_session_maker()() as session:
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Truncate title for notification display
|
|
page_title = individual_document.metadata.VisitedWebPageTitle[:50]
|
|
if len(individual_document.metadata.VisitedWebPageTitle) > 50:
|
|
page_title += "..."
|
|
|
|
# Create notification for document processing
|
|
notification = (
|
|
await NotificationService.document_processing.notify_processing_started(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_type="EXTENSION",
|
|
document_name=page_title,
|
|
search_space_id=search_space_id,
|
|
)
|
|
)
|
|
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="process_extension_document",
|
|
source="document_processor",
|
|
message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}",
|
|
metadata={
|
|
"document_type": "EXTENSION",
|
|
"url": individual_document.metadata.VisitedWebPageURL,
|
|
"title": individual_document.metadata.VisitedWebPageTitle,
|
|
"user_id": user_id,
|
|
},
|
|
)
|
|
|
|
try:
|
|
# Update notification: parsing stage
|
|
await NotificationService.document_processing.notify_processing_progress(
|
|
session,
|
|
notification,
|
|
stage="parsing",
|
|
stage_message="Reading page content",
|
|
)
|
|
|
|
result = await add_extension_received_document(
|
|
session, individual_document, search_space_id, user_id
|
|
)
|
|
|
|
if result:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
|
|
{"document_id": result.id, "content_hash": result.content_hash},
|
|
)
|
|
|
|
# Update notification on success
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
document_id=result.id,
|
|
chunks_count=None,
|
|
)
|
|
)
|
|
else:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
|
|
{"duplicate_detected": True},
|
|
)
|
|
|
|
# Update notification for duplicate
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Page already saved (duplicate)",
|
|
)
|
|
)
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}",
|
|
str(e),
|
|
{"error_type": type(e).__name__},
|
|
)
|
|
|
|
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
|
try:
|
|
# Refresh notification to ensure it's not stale after any rollback
|
|
await session.refresh(notification)
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message=str(e)[:100],
|
|
)
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to update notification on failure: {notif_error!s}"
|
|
)
|
|
|
|
logger.error(f"Error processing extension document: {e!s}")
|
|
raise
|
|
|
|
|
|
@celery_app.task(name="process_youtube_video", bind=True)
|
|
def process_youtube_video_task(self, url: str, search_space_id: int, user_id: str):
|
|
"""
|
|
Celery task to process YouTube video.
|
|
|
|
Args:
|
|
url: YouTube video URL
|
|
search_space_id: ID of the search space
|
|
user_id: ID of the user
|
|
"""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_until_complete(_process_youtube_video(url, search_space_id, user_id))
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|
"""Process YouTube video with new session."""
|
|
async with get_celery_session_maker()() as session:
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Extract video title from URL for notification (will be updated later)
|
|
video_name = url.split("v=")[-1][:11] if "v=" in url else url
|
|
|
|
# Create notification for document processing
|
|
notification = (
|
|
await NotificationService.document_processing.notify_processing_started(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_type="YOUTUBE_VIDEO",
|
|
document_name=f"YouTube: {video_name}",
|
|
search_space_id=search_space_id,
|
|
)
|
|
)
|
|
|
|
# Start Redis heartbeat for stale task detection
|
|
_start_heartbeat(notification.id)
|
|
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
|
|
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="process_youtube_video",
|
|
source="document_processor",
|
|
message=f"Starting YouTube video processing for: {url}",
|
|
metadata={"document_type": "YOUTUBE_VIDEO", "url": url, "user_id": user_id},
|
|
)
|
|
|
|
try:
|
|
# Update notification: parsing (fetching transcript)
|
|
await NotificationService.document_processing.notify_processing_progress(
|
|
session,
|
|
notification,
|
|
stage="parsing",
|
|
stage_message="Fetching video transcript",
|
|
)
|
|
|
|
result = await add_youtube_video_document(
|
|
session, url, search_space_id, user_id, notification=notification
|
|
)
|
|
|
|
if result:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully processed YouTube video: {result.title}",
|
|
{
|
|
"document_id": result.id,
|
|
"video_id": result.document_metadata.get("video_id"),
|
|
"content_hash": result.content_hash,
|
|
},
|
|
)
|
|
|
|
# Update notification on success
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
document_id=result.id,
|
|
chunks_count=None,
|
|
)
|
|
)
|
|
else:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"YouTube video document already exists (duplicate): {url}",
|
|
{"duplicate_detected": True},
|
|
)
|
|
|
|
# Update notification for duplicate
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Video already exists (duplicate)",
|
|
)
|
|
)
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to process YouTube video: {url}",
|
|
str(e),
|
|
{"error_type": type(e).__name__},
|
|
)
|
|
|
|
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
|
try:
|
|
# Refresh notification to ensure it's not stale after any rollback
|
|
await session.refresh(notification)
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message=str(e)[:100],
|
|
)
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to update notification on failure: {notif_error!s}"
|
|
)
|
|
|
|
logger.error(f"Error processing YouTube video: {e!s}")
|
|
raise
|
|
finally:
|
|
# Stop heartbeat — key deleted on success, expires on crash
|
|
heartbeat_task.cancel()
|
|
_stop_heartbeat(notification.id)
|
|
|
|
|
|
@celery_app.task(name="process_file_upload", bind=True)
|
|
def process_file_upload_task(
|
|
self, file_path: str, filename: str, search_space_id: int, user_id: str
|
|
):
|
|
"""
|
|
Celery task to process uploaded file.
|
|
|
|
Args:
|
|
file_path: Path to the uploaded file
|
|
filename: Original filename
|
|
search_space_id: ID of the search space
|
|
user_id: ID of the user
|
|
"""
|
|
import traceback
|
|
|
|
logger.info(
|
|
f"[process_file_upload] Task started - file: {filename}, "
|
|
f"search_space_id: {search_space_id}, user_id: {user_id}"
|
|
)
|
|
logger.info(f"[process_file_upload] File path: {file_path}")
|
|
|
|
# Check if file exists and is accessible
|
|
if not os.path.exists(file_path):
|
|
logger.error(
|
|
f"[process_file_upload] File does not exist: {file_path}. "
|
|
"File may have been removed before syncing could start."
|
|
)
|
|
return
|
|
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
logger.info(f"[process_file_upload] File size: {file_size} bytes")
|
|
except Exception as e:
|
|
logger.warning(f"[process_file_upload] Could not get file size: {e}")
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_until_complete(
|
|
_process_file_upload(file_path, filename, search_space_id, user_id)
|
|
)
|
|
logger.info(
|
|
f"[process_file_upload] Task completed successfully for: {filename}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[process_file_upload] Task failed for {filename}: {e}\n"
|
|
f"Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
raise
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _process_file_upload(
|
|
file_path: str, filename: str, search_space_id: int, user_id: str
|
|
):
|
|
"""Process file upload with new session."""
|
|
from app.tasks.document_processors.file_processors import process_file_in_background
|
|
|
|
logger.info(f"[_process_file_upload] Starting async processing for: {filename}")
|
|
|
|
async with get_celery_session_maker()() as session:
|
|
logger.info(f"[_process_file_upload] Database session created for: {filename}")
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Get file size for notification metadata
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
logger.info(f"[_process_file_upload] File size: {file_size} bytes")
|
|
except Exception as e:
|
|
logger.warning(f"[_process_file_upload] Could not get file size: {e}")
|
|
file_size = None
|
|
|
|
# Create notification for document processing
|
|
logger.info(f"[_process_file_upload] Creating notification for: {filename}")
|
|
notification = (
|
|
await NotificationService.document_processing.notify_processing_started(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_type="FILE",
|
|
document_name=filename,
|
|
search_space_id=search_space_id,
|
|
file_size=file_size,
|
|
)
|
|
)
|
|
logger.info(
|
|
f"[_process_file_upload] Notification created with ID: {notification.id if notification else 'None'}"
|
|
)
|
|
|
|
# Start Redis heartbeat for stale task detection
|
|
_start_heartbeat(notification.id)
|
|
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
|
|
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="process_file_upload",
|
|
source="document_processor",
|
|
message=f"Starting file processing for: {filename}",
|
|
metadata={
|
|
"document_type": "FILE",
|
|
"filename": filename,
|
|
"file_path": file_path,
|
|
"user_id": user_id,
|
|
},
|
|
)
|
|
|
|
try:
|
|
result = await process_file_in_background(
|
|
file_path,
|
|
filename,
|
|
search_space_id,
|
|
user_id,
|
|
session,
|
|
task_logger,
|
|
log_entry,
|
|
notification=notification,
|
|
)
|
|
|
|
# Update notification on success
|
|
if result:
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
document_id=result.id,
|
|
chunks_count=None,
|
|
)
|
|
)
|
|
else:
|
|
# Duplicate detected
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Document already exists (duplicate)",
|
|
)
|
|
)
|
|
|
|
except Exception as e:
|
|
# Import here to avoid circular dependencies
|
|
from fastapi import HTTPException
|
|
|
|
from app.services.page_limit_service import PageLimitExceededError
|
|
|
|
# Check if this is a page limit error (either direct or wrapped in HTTPException)
|
|
page_limit_error: PageLimitExceededError | None = None
|
|
if isinstance(e, PageLimitExceededError):
|
|
page_limit_error = e
|
|
elif (
|
|
isinstance(e, HTTPException)
|
|
and e.__cause__
|
|
and isinstance(e.__cause__, PageLimitExceededError)
|
|
):
|
|
# HTTPException wraps the original PageLimitExceededError
|
|
page_limit_error = e.__cause__
|
|
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
|
|
# Fallback: HTTPException with page limit message but no cause
|
|
page_limit_error = None # We don't have the details
|
|
|
|
# For page limit errors, create a dedicated page_limit_exceeded notification
|
|
if page_limit_error is not None:
|
|
error_message = str(page_limit_error)
|
|
# Create a dedicated page limit exceeded notification
|
|
try:
|
|
# First, mark the processing notification as failed
|
|
await session.refresh(notification)
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Page limit exceeded",
|
|
)
|
|
|
|
# Then create a separate page_limit_exceeded notification for better UX
|
|
await NotificationService.page_limit.notify_page_limit_exceeded(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_name=filename,
|
|
document_type="FILE",
|
|
search_space_id=search_space_id,
|
|
pages_used=page_limit_error.pages_used,
|
|
pages_limit=page_limit_error.pages_limit,
|
|
pages_to_add=page_limit_error.pages_to_add,
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to create page limit notification: {notif_error!s}"
|
|
)
|
|
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
|
|
# HTTPException with page limit message but no detailed cause
|
|
error_message = str(e.detail)
|
|
try:
|
|
await session.refresh(notification)
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message=error_message,
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to update notification on failure: {notif_error!s}"
|
|
)
|
|
else:
|
|
error_message = str(e)[:100]
|
|
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
|
try:
|
|
# Refresh notification to ensure it's not stale after any rollback
|
|
await session.refresh(notification)
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message=error_message,
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to update notification on failure: {notif_error!s}"
|
|
)
|
|
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
error_message,
|
|
str(e),
|
|
{"error_type": type(e).__name__},
|
|
)
|
|
logger.error(error_message)
|
|
raise
|
|
finally:
|
|
# Stop heartbeat — key deleted on success, expires on crash
|
|
heartbeat_task.cancel()
|
|
_stop_heartbeat(notification.id)
|
|
|
|
|
|
@celery_app.task(name="process_file_upload_with_document", bind=True)
|
|
def process_file_upload_with_document_task(
|
|
self,
|
|
document_id: int,
|
|
temp_path: str,
|
|
filename: str,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
):
|
|
"""
|
|
Celery task to process uploaded file with existing pending document.
|
|
|
|
This task is used by the 2-phase document upload flow:
|
|
- Phase 1 (API): Creates pending document (visible in UI immediately)
|
|
- Phase 2 (this task): Updates document status: pending → processing → ready/failed
|
|
|
|
Args:
|
|
document_id: ID of the pending document created in Phase 1
|
|
temp_path: Path to the uploaded file
|
|
filename: Original filename
|
|
search_space_id: ID of the search space
|
|
user_id: ID of the user
|
|
"""
|
|
import traceback
|
|
|
|
logger.info(
|
|
f"[process_file_upload_with_document] Task started - document_id: {document_id}, "
|
|
f"file: {filename}, search_space_id: {search_space_id}"
|
|
)
|
|
|
|
# Check if file exists and is accessible
|
|
if not os.path.exists(temp_path):
|
|
logger.error(
|
|
f"[process_file_upload_with_document] File does not exist: {temp_path}. "
|
|
"File may have been removed before syncing could start."
|
|
)
|
|
# Mark document as failed since file is missing
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(
|
|
_mark_document_failed(
|
|
document_id,
|
|
"File not found. Please re-upload the file.",
|
|
)
|
|
)
|
|
finally:
|
|
loop.close()
|
|
return
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_until_complete(
|
|
_process_file_with_document(
|
|
document_id, temp_path, filename, search_space_id, user_id
|
|
)
|
|
)
|
|
logger.info(
|
|
f"[process_file_upload_with_document] Task completed successfully for: {filename}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[process_file_upload_with_document] Task failed for {filename}: {e}\n"
|
|
f"Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
raise
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _mark_document_failed(document_id: int, reason: str):
|
|
"""Mark a document as failed when task cannot proceed."""
|
|
from app.db import Document, DocumentStatus
|
|
from app.tasks.document_processors.base import get_current_timestamp
|
|
|
|
async with get_celery_session_maker()() as session:
|
|
document = await session.get(Document, document_id)
|
|
if document:
|
|
document.status = DocumentStatus.failed(reason)
|
|
document.updated_at = get_current_timestamp()
|
|
await session.commit()
|
|
logger.info(f"Marked document {document_id} as failed: {reason}")
|
|
|
|
|
|
async def _process_file_with_document(
|
|
document_id: int,
|
|
temp_path: str,
|
|
filename: str,
|
|
search_space_id: int,
|
|
user_id: str,
|
|
):
|
|
"""
|
|
Process file and update existing pending document status.
|
|
|
|
This function implements Phase 2 of the 2-phase document upload:
|
|
- Sets document status to 'processing' (shows spinner in UI)
|
|
- Processes the file (parsing, embedding, chunking)
|
|
- Updates document to 'ready' on success or 'failed' on error
|
|
"""
|
|
from app.db import Document, DocumentStatus
|
|
from app.tasks.document_processors.base import get_current_timestamp
|
|
from app.tasks.document_processors.file_processors import (
|
|
process_file_in_background_with_document,
|
|
)
|
|
|
|
logger.info(
|
|
f"[_process_file_with_document] Starting async processing for: {filename}"
|
|
)
|
|
|
|
async with get_celery_session_maker()() as session:
|
|
logger.info(
|
|
f"[_process_file_with_document] Database session created for: {filename}"
|
|
)
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Get the document
|
|
document = await session.get(Document, document_id)
|
|
if not document:
|
|
logger.error(f"Document {document_id} not found")
|
|
return
|
|
|
|
# Get file size for notification metadata
|
|
try:
|
|
file_size = os.path.getsize(temp_path)
|
|
logger.info(f"[_process_file_with_document] File size: {file_size} bytes")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"[_process_file_with_document] Could not get file size: {e}"
|
|
)
|
|
file_size = None
|
|
|
|
# Create notification for document processing
|
|
logger.info(
|
|
f"[_process_file_with_document] Creating notification for: {filename}"
|
|
)
|
|
notification = (
|
|
await NotificationService.document_processing.notify_processing_started(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_type="FILE",
|
|
document_name=filename,
|
|
search_space_id=search_space_id,
|
|
file_size=file_size,
|
|
)
|
|
)
|
|
|
|
# Store document_id in notification metadata so cleanup task can find the document
|
|
if notification and notification.notification_metadata is not None:
|
|
notification.notification_metadata["document_id"] = document_id
|
|
from sqlalchemy.orm.attributes import flag_modified
|
|
|
|
flag_modified(notification, "notification_metadata")
|
|
await session.commit()
|
|
await session.refresh(notification)
|
|
|
|
# Start Redis heartbeat for stale task detection
|
|
_start_heartbeat(notification.id)
|
|
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
|
|
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="process_file_upload_with_document",
|
|
source="document_processor",
|
|
message=f"Starting file processing for: {filename} (document_id: {document_id})",
|
|
metadata={
|
|
"document_type": "FILE",
|
|
"document_id": document_id,
|
|
"filename": filename,
|
|
"file_path": temp_path,
|
|
"user_id": user_id,
|
|
},
|
|
)
|
|
|
|
try:
|
|
# Set status to PROCESSING (shows spinner in UI via ElectricSQL)
|
|
document.status = DocumentStatus.processing()
|
|
await session.commit()
|
|
logger.info(
|
|
f"[_process_file_with_document] Document {document_id} status set to 'processing'"
|
|
)
|
|
|
|
# Process the file and update document
|
|
result = await process_file_in_background_with_document(
|
|
document=document,
|
|
file_path=temp_path,
|
|
filename=filename,
|
|
search_space_id=search_space_id,
|
|
user_id=user_id,
|
|
session=session,
|
|
task_logger=task_logger,
|
|
log_entry=log_entry,
|
|
notification=notification,
|
|
)
|
|
|
|
# Update notification on success
|
|
if result:
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
document_id=result.id,
|
|
chunks_count=None,
|
|
)
|
|
)
|
|
logger.info(
|
|
f"[_process_file_with_document] Successfully processed document {document_id}"
|
|
)
|
|
else:
|
|
# Duplicate detected - mark as failed
|
|
document.status = DocumentStatus.failed("Duplicate content detected")
|
|
document.updated_at = get_current_timestamp()
|
|
await session.commit()
|
|
await (
|
|
NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Document already exists (duplicate)",
|
|
)
|
|
)
|
|
|
|
except Exception as e:
|
|
# Import here to avoid circular dependencies
|
|
from fastapi import HTTPException
|
|
|
|
from app.services.page_limit_service import PageLimitExceededError
|
|
|
|
# Check if this is a page limit error
|
|
page_limit_error: PageLimitExceededError | None = None
|
|
if isinstance(e, PageLimitExceededError):
|
|
page_limit_error = e
|
|
elif (
|
|
isinstance(e, HTTPException)
|
|
and e.__cause__
|
|
and isinstance(e.__cause__, PageLimitExceededError)
|
|
):
|
|
page_limit_error = e.__cause__
|
|
|
|
# Mark document as failed (shows error in UI via ElectricSQL)
|
|
error_message = str(e)[:500]
|
|
document.status = DocumentStatus.failed(error_message)
|
|
document.updated_at = get_current_timestamp()
|
|
await session.commit()
|
|
logger.info(
|
|
f"[_process_file_with_document] Document {document_id} marked as failed: {error_message[:100]}"
|
|
)
|
|
|
|
# Handle page limit errors with dedicated notification
|
|
if page_limit_error is not None:
|
|
try:
|
|
await session.refresh(notification)
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Page limit exceeded",
|
|
)
|
|
await NotificationService.page_limit.notify_page_limit_exceeded(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_name=filename,
|
|
document_type="FILE",
|
|
search_space_id=search_space_id,
|
|
pages_used=page_limit_error.pages_used,
|
|
pages_limit=page_limit_error.pages_limit,
|
|
pages_to_add=page_limit_error.pages_to_add,
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to create page limit notification: {notif_error!s}"
|
|
)
|
|
else:
|
|
# Update notification on failure
|
|
try:
|
|
await session.refresh(notification)
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message=str(e)[:100],
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to update notification on failure: {notif_error!s}"
|
|
)
|
|
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
error_message[:100],
|
|
str(e),
|
|
{"error_type": type(e).__name__, "document_id": document_id},
|
|
)
|
|
logger.error(f"Error processing file {filename}: {e!s}")
|
|
raise
|
|
|
|
finally:
|
|
# Stop heartbeat — key deleted on success, expires on crash
|
|
heartbeat_task.cancel()
|
|
_stop_heartbeat(notification.id)
|
|
|
|
# Clean up temp file
|
|
if os.path.exists(temp_path):
|
|
try:
|
|
os.unlink(temp_path)
|
|
logger.info(
|
|
f"[_process_file_with_document] Cleaned up temp file: {temp_path}"
|
|
)
|
|
except Exception as cleanup_error:
|
|
logger.warning(
|
|
f"[_process_file_with_document] Failed to clean up temp file: {cleanup_error}"
|
|
)
|
|
|
|
|
|
@celery_app.task(name="process_circleback_meeting", bind=True)
|
|
def process_circleback_meeting_task(
|
|
self,
|
|
meeting_id: int,
|
|
meeting_name: str,
|
|
markdown_content: str,
|
|
metadata: dict,
|
|
search_space_id: int,
|
|
connector_id: int | None = None,
|
|
):
|
|
"""
|
|
Celery task to process Circleback meeting webhook data.
|
|
|
|
Args:
|
|
meeting_id: Circleback meeting ID
|
|
meeting_name: Name of the meeting
|
|
markdown_content: Meeting content formatted as markdown
|
|
metadata: Meeting metadata dictionary
|
|
search_space_id: ID of the search space
|
|
connector_id: ID of the Circleback connector (for deletion support)
|
|
"""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_until_complete(
|
|
_process_circleback_meeting(
|
|
meeting_id,
|
|
meeting_name,
|
|
markdown_content,
|
|
metadata,
|
|
search_space_id,
|
|
connector_id,
|
|
)
|
|
)
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _process_circleback_meeting(
|
|
meeting_id: int,
|
|
meeting_name: str,
|
|
markdown_content: str,
|
|
metadata: dict,
|
|
search_space_id: int,
|
|
connector_id: int | None = None,
|
|
):
|
|
"""Process Circleback meeting with new session."""
|
|
from app.tasks.document_processors.circleback_processor import (
|
|
add_circleback_meeting_document,
|
|
)
|
|
|
|
async with get_celery_session_maker()() as session:
|
|
task_logger = TaskLoggingService(session, search_space_id)
|
|
|
|
# Get user_id from metadata if available
|
|
user_id = metadata.get("user_id")
|
|
|
|
# Create notification if user_id is available
|
|
notification = None
|
|
heartbeat_task = None
|
|
if user_id:
|
|
notification = (
|
|
await NotificationService.document_processing.notify_processing_started(
|
|
session=session,
|
|
user_id=UUID(user_id),
|
|
document_type="CIRCLEBACK",
|
|
document_name=f"Meeting: {meeting_name[:40]}",
|
|
search_space_id=search_space_id,
|
|
)
|
|
)
|
|
|
|
# Start Redis heartbeat for stale task detection
|
|
_start_heartbeat(notification.id)
|
|
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
|
|
|
|
log_entry = await task_logger.log_task_start(
|
|
task_name="process_circleback_meeting",
|
|
source="circleback_webhook",
|
|
message=f"Starting Circleback meeting processing: {meeting_name}",
|
|
metadata={
|
|
"document_type": "CIRCLEBACK",
|
|
"meeting_id": meeting_id,
|
|
"meeting_name": meeting_name,
|
|
**metadata,
|
|
},
|
|
)
|
|
|
|
try:
|
|
# Update notification: parsing stage
|
|
if notification:
|
|
await (
|
|
NotificationService.document_processing.notify_processing_progress(
|
|
session,
|
|
notification,
|
|
stage="parsing",
|
|
stage_message="Reading meeting notes",
|
|
)
|
|
)
|
|
|
|
result = await add_circleback_meeting_document(
|
|
session=session,
|
|
meeting_id=meeting_id,
|
|
meeting_name=meeting_name,
|
|
markdown_content=markdown_content,
|
|
metadata=metadata,
|
|
search_space_id=search_space_id,
|
|
connector_id=connector_id,
|
|
)
|
|
|
|
if result:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Successfully processed Circleback meeting: {meeting_name}",
|
|
{
|
|
"document_id": result.id,
|
|
"meeting_id": meeting_id,
|
|
"content_hash": result.content_hash,
|
|
},
|
|
)
|
|
|
|
# Update notification on success
|
|
if notification:
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
document_id=result.id,
|
|
chunks_count=None,
|
|
)
|
|
else:
|
|
await task_logger.log_task_success(
|
|
log_entry,
|
|
f"Circleback meeting document already exists (duplicate): {meeting_name}",
|
|
{"duplicate_detected": True, "meeting_id": meeting_id},
|
|
)
|
|
|
|
# Update notification for duplicate
|
|
if notification:
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message="Meeting already saved (duplicate)",
|
|
)
|
|
except Exception as e:
|
|
await task_logger.log_task_failure(
|
|
log_entry,
|
|
f"Failed to process Circleback meeting: {meeting_name}",
|
|
str(e),
|
|
{"error_type": type(e).__name__, "meeting_id": meeting_id},
|
|
)
|
|
|
|
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
|
if notification:
|
|
try:
|
|
# Refresh notification to ensure it's not stale after any rollback
|
|
await session.refresh(notification)
|
|
await NotificationService.document_processing.notify_processing_completed(
|
|
session=session,
|
|
notification=notification,
|
|
error_message=str(e)[:100],
|
|
)
|
|
except Exception as notif_error:
|
|
logger.error(
|
|
f"Failed to update notification on failure: {notif_error!s}"
|
|
)
|
|
|
|
logger.error(f"Error processing Circleback meeting: {e!s}")
|
|
raise
|
|
finally:
|
|
# Stop heartbeat — key deleted on success, expires on crash
|
|
if heartbeat_task:
|
|
heartbeat_task.cancel()
|
|
if notification:
|
|
_stop_heartbeat(notification.id)
|