Reapply "Merge pull request #686 from AnishSarkar22/feat/replace-logs"

This reverts commit 3418c0e026.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-01-16 11:32:06 -08:00
parent 3418c0e026
commit 8aad15d392
84 changed files with 8266 additions and 4613 deletions

View file

@ -1,12 +1,14 @@
"""Celery tasks for document processing."""
import logging
from uuid import UUID
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.document_processors import (
add_extension_received_document,
@ -84,6 +86,22 @@ async def _process_extension_document(
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Truncate title for notification display
page_title = individual_document.metadata.VisitedWebPageTitle[:50]
if len(individual_document.metadata.VisitedWebPageTitle) > 50:
page_title += "..."
# Create notification for document processing
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="EXTENSION",
document_name=page_title,
search_space_id=search_space_id,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_extension_document",
source="document_processor",
@ -97,6 +115,14 @@ async def _process_extension_document(
)
try:
# Update notification: parsing stage
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading page content",
)
result = await add_extension_received_document(
session, individual_document, search_space_id, user_id
)
@ -107,12 +133,31 @@ async def _process_extension_document(
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
{"document_id": result.id, "content_hash": result.content_hash},
)
# Update notification on success
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True},
)
# Update notification for duplicate
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Page already saved (duplicate)",
)
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@ -120,6 +165,23 @@ async def _process_extension_document(
str(e),
{"error_type": type(e).__name__},
)
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing extension document: {e!s}")
raise
@ -150,6 +212,20 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Extract video title from URL for notification (will be updated later)
video_name = url.split("v=")[-1][:11] if "v=" in url else url
# Create notification for document processing
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="YOUTUBE_VIDEO",
document_name=f"YouTube: {video_name}",
search_space_id=search_space_id,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_youtube_video",
source="document_processor",
@ -158,6 +234,14 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
)
try:
# Update notification: parsing (fetching transcript)
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Fetching video transcript",
)
result = await add_youtube_video_document(
session, url, search_space_id, user_id
)
@ -172,12 +256,31 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
"content_hash": result.content_hash,
},
)
# Update notification on success
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
# Update notification for duplicate
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Video already exists (duplicate)",
)
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@ -185,6 +288,23 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
str(e),
{"error_type": type(e).__name__},
)
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing YouTube video: {e!s}")
raise
@ -219,11 +339,31 @@ async def _process_file_upload(
file_path: str, filename: str, search_space_id: int, user_id: str
):
"""Process file upload with new session."""
import os
from app.tasks.document_processors.file_processors import process_file_in_background
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Get file size for notification metadata
try:
file_size = os.path.getsize(file_path)
except Exception:
file_size = None
# Create notification for document processing
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
source="document_processor",
@ -237,7 +377,7 @@ async def _process_file_upload(
)
try:
await process_file_in_background(
result = await process_file_in_background(
file_path,
filename,
search_space_id,
@ -245,7 +385,29 @@ async def _process_file_upload(
session,
task_logger,
log_entry,
notification=notification,
)
# Update notification on success
if result:
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
# Duplicate detected
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Document already exists (duplicate)",
)
)
except Exception as e:
# Import here to avoid circular dependencies
from fastapi import HTTPException
@ -258,7 +420,23 @@ async def _process_file_upload(
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
error_message = str(e.detail)
else:
error_message = f"Failed to process file: {filename}"
error_message = str(e)[:100]
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
await task_logger.log_task_failure(
log_entry,
@ -323,6 +501,22 @@ async def _process_circleback_meeting(
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Get user_id from metadata if available
user_id = metadata.get("user_id")
# Create notification if user_id is available
notification = None
if user_id:
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="CIRCLEBACK",
document_name=f"Meeting: {meeting_name[:40]}",
search_space_id=search_space_id,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_circleback_meeting",
source="circleback_webhook",
@ -336,6 +530,17 @@ async def _process_circleback_meeting(
)
try:
# Update notification: parsing stage
if notification:
await (
NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading meeting notes",
)
)
result = await add_circleback_meeting_document(
session=session,
meeting_id=meeting_id,
@ -355,12 +560,29 @@ async def _process_circleback_meeting(
"content_hash": result.content_hash,
},
)
# Update notification on success
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
else:
await task_logger.log_task_success(
log_entry,
f"Circleback meeting document already exists (duplicate): {meeting_name}",
{"duplicate_detected": True, "meeting_id": meeting_id},
)
# Update notification for duplicate
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Meeting already saved (duplicate)",
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@ -368,5 +590,21 @@ async def _process_circleback_meeting(
str(e),
{"error_type": type(e).__name__, "meeting_id": meeting_id},
)
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
if notification:
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing Circleback meeting: {e!s}")
raise