mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 00:36:31 +02:00
Revert "Merge pull request #686 from AnishSarkar22/feat/replace-logs"
This reverts commit5963a1125e, reversing changes made to0d2a2f8ea1.
This commit is contained in:
parent
5963a1125e
commit
3418c0e026
84 changed files with 4613 additions and 8266 deletions
|
|
@ -1,14 +1,12 @@
|
|||
"""Celery tasks for document processing."""
|
||||
|
||||
import logging
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
from app.celery_app import celery_app
|
||||
from app.config import config
|
||||
from app.services.notification_service import NotificationService
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.tasks.document_processors import (
|
||||
add_extension_received_document,
|
||||
|
|
@ -86,22 +84,6 @@ async def _process_extension_document(
|
|||
async with get_celery_session_maker()() as session:
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Truncate title for notification display
|
||||
page_title = individual_document.metadata.VisitedWebPageTitle[:50]
|
||||
if len(individual_document.metadata.VisitedWebPageTitle) > 50:
|
||||
page_title += "..."
|
||||
|
||||
# Create notification for document processing
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="EXTENSION",
|
||||
document_name=page_title,
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="process_extension_document",
|
||||
source="document_processor",
|
||||
|
|
@ -115,14 +97,6 @@ async def _process_extension_document(
|
|||
)
|
||||
|
||||
try:
|
||||
# Update notification: parsing stage
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Reading page content",
|
||||
)
|
||||
|
||||
result = await add_extension_received_document(
|
||||
session, individual_document, search_space_id, user_id
|
||||
)
|
||||
|
|
@ -133,31 +107,12 @@ async def _process_extension_document(
|
|||
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
|
||||
{"document_id": result.id, "content_hash": result.content_hash},
|
||||
)
|
||||
|
||||
# Update notification on success
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
|
||||
{"duplicate_detected": True},
|
||||
)
|
||||
|
||||
# Update notification for duplicate
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Page already saved (duplicate)",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
|
|
@ -165,23 +120,6 @@ async def _process_extension_document(
|
|||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
|
||||
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
||||
try:
|
||||
# Refresh notification to ensure it's not stale after any rollback
|
||||
await session.refresh(notification)
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=str(e)[:100],
|
||||
)
|
||||
)
|
||||
except Exception as notif_error:
|
||||
logger.error(
|
||||
f"Failed to update notification on failure: {notif_error!s}"
|
||||
)
|
||||
|
||||
logger.error(f"Error processing extension document: {e!s}")
|
||||
raise
|
||||
|
||||
|
|
@ -212,20 +150,6 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
async with get_celery_session_maker()() as session:
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Extract video title from URL for notification (will be updated later)
|
||||
video_name = url.split("v=")[-1][:11] if "v=" in url else url
|
||||
|
||||
# Create notification for document processing
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="YOUTUBE_VIDEO",
|
||||
document_name=f"YouTube: {video_name}",
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="process_youtube_video",
|
||||
source="document_processor",
|
||||
|
|
@ -234,14 +158,6 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
)
|
||||
|
||||
try:
|
||||
# Update notification: parsing (fetching transcript)
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Fetching video transcript",
|
||||
)
|
||||
|
||||
result = await add_youtube_video_document(
|
||||
session, url, search_space_id, user_id
|
||||
)
|
||||
|
|
@ -256,31 +172,12 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
"content_hash": result.content_hash,
|
||||
},
|
||||
)
|
||||
|
||||
# Update notification on success
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"YouTube video document already exists (duplicate): {url}",
|
||||
{"duplicate_detected": True},
|
||||
)
|
||||
|
||||
# Update notification for duplicate
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Video already exists (duplicate)",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
|
|
@ -288,23 +185,6 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
|
||||
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
||||
try:
|
||||
# Refresh notification to ensure it's not stale after any rollback
|
||||
await session.refresh(notification)
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=str(e)[:100],
|
||||
)
|
||||
)
|
||||
except Exception as notif_error:
|
||||
logger.error(
|
||||
f"Failed to update notification on failure: {notif_error!s}"
|
||||
)
|
||||
|
||||
logger.error(f"Error processing YouTube video: {e!s}")
|
||||
raise
|
||||
|
||||
|
|
@ -339,31 +219,11 @@ async def _process_file_upload(
|
|||
file_path: str, filename: str, search_space_id: int, user_id: str
|
||||
):
|
||||
"""Process file upload with new session."""
|
||||
import os
|
||||
|
||||
from app.tasks.document_processors.file_processors import process_file_in_background
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Get file size for notification metadata
|
||||
try:
|
||||
file_size = os.path.getsize(file_path)
|
||||
except Exception:
|
||||
file_size = None
|
||||
|
||||
# Create notification for document processing
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="FILE",
|
||||
document_name=filename,
|
||||
search_space_id=search_space_id,
|
||||
file_size=file_size,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="process_file_upload",
|
||||
source="document_processor",
|
||||
|
|
@ -377,7 +237,7 @@ async def _process_file_upload(
|
|||
)
|
||||
|
||||
try:
|
||||
result = await process_file_in_background(
|
||||
await process_file_in_background(
|
||||
file_path,
|
||||
filename,
|
||||
search_space_id,
|
||||
|
|
@ -385,29 +245,7 @@ async def _process_file_upload(
|
|||
session,
|
||||
task_logger,
|
||||
log_entry,
|
||||
notification=notification,
|
||||
)
|
||||
|
||||
# Update notification on success
|
||||
if result:
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Duplicate detected
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Document already exists (duplicate)",
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Import here to avoid circular dependencies
|
||||
from fastapi import HTTPException
|
||||
|
|
@ -420,23 +258,7 @@ async def _process_file_upload(
|
|||
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
|
||||
error_message = str(e.detail)
|
||||
else:
|
||||
error_message = str(e)[:100]
|
||||
|
||||
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
||||
try:
|
||||
# Refresh notification to ensure it's not stale after any rollback
|
||||
await session.refresh(notification)
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=error_message,
|
||||
)
|
||||
)
|
||||
except Exception as notif_error:
|
||||
logger.error(
|
||||
f"Failed to update notification on failure: {notif_error!s}"
|
||||
)
|
||||
error_message = f"Failed to process file: {filename}"
|
||||
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
|
|
@ -501,22 +323,6 @@ async def _process_circleback_meeting(
|
|||
async with get_celery_session_maker()() as session:
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Get user_id from metadata if available
|
||||
user_id = metadata.get("user_id")
|
||||
|
||||
# Create notification if user_id is available
|
||||
notification = None
|
||||
if user_id:
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="CIRCLEBACK",
|
||||
document_name=f"Meeting: {meeting_name[:40]}",
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="process_circleback_meeting",
|
||||
source="circleback_webhook",
|
||||
|
|
@ -530,17 +336,6 @@ async def _process_circleback_meeting(
|
|||
)
|
||||
|
||||
try:
|
||||
# Update notification: parsing stage
|
||||
if notification:
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_progress(
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Reading meeting notes",
|
||||
)
|
||||
)
|
||||
|
||||
result = await add_circleback_meeting_document(
|
||||
session=session,
|
||||
meeting_id=meeting_id,
|
||||
|
|
@ -560,29 +355,12 @@ async def _process_circleback_meeting(
|
|||
"content_hash": result.content_hash,
|
||||
},
|
||||
)
|
||||
|
||||
# Update notification on success
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Circleback meeting document already exists (duplicate): {meeting_name}",
|
||||
{"duplicate_detected": True, "meeting_id": meeting_id},
|
||||
)
|
||||
|
||||
# Update notification for duplicate
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Meeting already saved (duplicate)",
|
||||
)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
|
|
@ -590,21 +368,5 @@ async def _process_circleback_meeting(
|
|||
str(e),
|
||||
{"error_type": type(e).__name__, "meeting_id": meeting_id},
|
||||
)
|
||||
|
||||
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
|
||||
if notification:
|
||||
try:
|
||||
# Refresh notification to ensure it's not stale after any rollback
|
||||
await session.refresh(notification)
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=str(e)[:100],
|
||||
)
|
||||
except Exception as notif_error:
|
||||
logger.error(
|
||||
f"Failed to update notification on failure: {notif_error!s}"
|
||||
)
|
||||
|
||||
logger.error(f"Error processing Circleback meeting: {e!s}")
|
||||
raise
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue