fix: Resolve merge conflict in documents_routes.py

- Integrated Docling ETL service with new task logging system
- Maintained consistent logging pattern across all ETL services
- Added progress and success/failure logging for Docling processing
This commit is contained in:
Abdullah 3li 2025-07-21 10:43:15 +03:00
commit f117d94ef7
34 changed files with 4160 additions and 520 deletions

File diff suppressed because it is too large Load diff

View file

@ -91,6 +91,18 @@ class LiteLLMProvider(str, Enum):
ALEPH_ALPHA = "ALEPH_ALPHA"
PETALS = "PETALS"
CUSTOM = "CUSTOM"
class LogLevel(str, Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class LogStatus(str, Enum):
IN_PROGRESS = "IN_PROGRESS"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
class Base(DeclarativeBase):
pass
@ -163,6 +175,7 @@ class SearchSpace(BaseModel, TimestampMixin):
documents = relationship("Document", back_populates="search_space", order_by="Document.id", cascade="all, delete-orphan")
podcasts = relationship("Podcast", back_populates="search_space", order_by="Podcast.id", cascade="all, delete-orphan")
chats = relationship('Chat', back_populates='search_space', order_by='Chat.id', cascade="all, delete-orphan")
logs = relationship("Log", back_populates="search_space", order_by="Log.id", cascade="all, delete-orphan")
class SearchSourceConnector(BaseModel, TimestampMixin):
__tablename__ = "search_source_connectors"
@ -196,6 +209,18 @@ class LLMConfig(BaseModel, TimestampMixin):
user_id = Column(UUID(as_uuid=True), ForeignKey("user.id", ondelete='CASCADE'), nullable=False)
user = relationship("User", back_populates="llm_configs", foreign_keys=[user_id])
class Log(BaseModel, TimestampMixin):
__tablename__ = "logs"
level = Column(SQLAlchemyEnum(LogLevel), nullable=False, index=True)
status = Column(SQLAlchemyEnum(LogStatus), nullable=False, index=True)
message = Column(Text, nullable=False)
source = Column(String(200), nullable=True, index=True) # Service/component that generated the log
log_metadata = Column(JSON, nullable=True, default={}) # Additional context data
search_space_id = Column(Integer, ForeignKey("searchspaces.id", ondelete='CASCADE'), nullable=False)
search_space = relationship("SearchSpace", back_populates="logs")
if config.AUTH_TYPE == "GOOGLE":
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
pass

View file

@ -5,6 +5,7 @@ from .podcasts_routes import router as podcasts_router
from .chats_routes import router as chats_router
from .search_source_connectors_routes import router as search_source_connectors_router
from .llm_config_routes import router as llm_config_router
from .logs_routes import router as logs_router
router = APIRouter()
@ -14,3 +15,4 @@ router.include_router(podcasts_router)
router.include_router(chats_router)
router.include_router(search_source_connectors_router)
router.include_router(llm_config_router)
router.include_router(logs_router)

View file

@ -54,32 +54,23 @@ async def handle_chat_data(
if message['role'] == "user":
langchain_chat_history.append(HumanMessage(content=message['content']))
elif message['role'] == "assistant":
# Find the last "ANSWER" annotation specifically
answer_annotation = None
for annotation in reversed(message['annotations']):
if annotation['type'] == "ANSWER":
answer_annotation = annotation
break
if answer_annotation:
answer_text = answer_annotation['content']
# If content is a list, join it into a single string
if isinstance(answer_text, list):
answer_text = "\n".join(answer_text)
langchain_chat_history.append(AIMessage(content=answer_text))
langchain_chat_history.append(AIMessage(content=message['content']))
response = StreamingResponse(stream_connector_search_results(
user_query,
user.id,
search_space_id, # Already converted to int in lines 32-37
session,
research_mode,
selected_connectors,
langchain_chat_history,
search_mode_str,
document_ids_to_add_in_context
))
response.headers['x-vercel-ai-data-stream'] = 'v1'
response = StreamingResponse(
stream_connector_search_results(
user_query,
user.id,
search_space_id,
session,
research_mode,
selected_connectors,
langchain_chat_history,
search_mode_str,
document_ids_to_add_in_context,
)
)
response.headers["x-vercel-ai-data-stream"] = "v1"
return response

View file

@ -135,11 +135,19 @@ async def process_file_in_background(
filename: str,
search_space_id: int,
user_id: str,
session: AsyncSession
session: AsyncSession,
task_logger: 'TaskLoggingService',
log_entry: 'Log'
):
try:
# Check if the file is a markdown or text file
if filename.lower().endswith(('.md', '.markdown', '.txt')):
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
{"file_type": "markdown", "processing_stage": "reading_file"}
)
# For markdown files, read the content directly
with open(file_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
@ -151,16 +159,42 @@ async def process_file_in_background(
except:
pass
await task_logger.log_task_progress(
log_entry,
f"Creating document from markdown content: {filename}",
{"processing_stage": "creating_document", "content_length": len(markdown_content)}
)
# Process markdown directly through specialized function
await add_received_markdown_file_document(
result = await add_received_markdown_file_document(
session,
filename,
markdown_content,
search_space_id,
user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed markdown file: {filename}",
{"document_id": result.id, "content_hash": result.content_hash, "file_type": "markdown"}
)
else:
await task_logger.log_task_success(
log_entry,
f"Markdown file already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "markdown"}
)
# Check if the file is an audio file
elif filename.lower().endswith(('.mp3', '.mp4', '.mpeg', '.mpga', '.m4a', '.wav', '.webm')):
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"}
)
# Open the audio file for transcription
with open(file_path, "rb") as audio_file:
# Use LiteLLM for audio transcription
@ -184,6 +218,12 @@ async def process_file_in_background(
# Add metadata about the transcription
transcribed_text = f"# Transcription of {filename}\n\n{transcribed_text}"
await task_logger.log_task_progress(
log_entry,
f"Transcription completed, creating document: {filename}",
{"processing_stage": "transcription_complete", "transcript_length": len(transcribed_text)}
)
# Clean up the temp file
try:
os.unlink(file_path)
@ -191,15 +231,35 @@ async def process_file_in_background(
pass
# Process transcription as markdown document
await add_received_markdown_file_document(
result = await add_received_markdown_file_document(
session,
filename,
transcribed_text,
search_space_id,
user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully transcribed and processed audio file: {filename}",
{"document_id": result.id, "content_hash": result.content_hash, "file_type": "audio", "transcript_length": len(transcribed_text)}
)
else:
await task_logger.log_task_success(
log_entry,
f"Audio file transcript already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "audio"}
)
else:
if app_config.ETL_SERVICE == "UNSTRUCTURED":
await task_logger.log_task_progress(
log_entry,
f"Processing file with Unstructured ETL: {filename}",
{"file_type": "document", "etl_service": "UNSTRUCTURED", "processing_stage": "loading"}
)
from langchain_unstructured import UnstructuredLoader
# Process the file
@ -215,6 +275,12 @@ async def process_file_in_background(
docs = await loader.aload()
await task_logger.log_task_progress(
log_entry,
f"Unstructured ETL completed, creating document: {filename}",
{"processing_stage": "etl_complete", "elements_count": len(docs)}
)
# Clean up the temp file
import os
try:
@ -223,14 +289,34 @@ async def process_file_in_background(
pass
# Pass the documents to the existing background task
await add_received_file_document_using_unstructured(
result = await add_received_file_document_using_unstructured(
session,
filename,
docs,
search_space_id,
user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Unstructured: {filename}",
{"document_id": result.id, "content_hash": result.content_hash, "file_type": "document", "etl_service": "UNSTRUCTURED"}
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "document", "etl_service": "UNSTRUCTURED"}
)
elif app_config.ETL_SERVICE == "LLAMACLOUD":
await task_logger.log_task_progress(
log_entry,
f"Processing file with LlamaCloud ETL: {filename}",
{"file_type": "document", "etl_service": "LLAMACLOUD", "processing_stage": "parsing"}
)
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
@ -257,19 +343,45 @@ async def process_file_in_background(
# Get markdown documents from the result
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud parsing completed, creating documents: {filename}",
{"processing_stage": "parsing_complete", "documents_count": len(markdown_documents)}
)
for doc in markdown_documents:
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
await add_received_file_document_using_llamacloud(
doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id
)
if doc_result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with LlamaCloud: {filename}",
{"document_id": doc_result.id, "content_hash": doc_result.content_hash, "file_type": "document", "etl_service": "LLAMACLOUD"}
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "document", "etl_service": "LLAMACLOUD"}
)
elif app_config.ETL_SERVICE == "DOCLING":
await task_logger.log_task_progress(
log_entry,
f"Processing file with Docling ETL: {filename}",
{"file_type": "document", "etl_service": "DOCLING", "processing_stage": "parsing"}
)
# Use Docling service for document processing
from app.services.document_processing.docling_service import create_docling_service
@ -286,17 +398,43 @@ async def process_file_in_background(
except:
pass
await task_logger.log_task_progress(
log_entry,
f"Docling parsing completed, creating document: {filename}",
{"processing_stage": "parsing_complete", "content_length": len(result['content'])}
)
# Process the document using our Docling background task
await add_received_file_document_using_docling(
doc_result = await add_received_file_document_using_docling(
session,
filename,
docling_markdown_document=result['content'],
search_space_id=search_space_id,
user_id=user_id
)
if doc_result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Docling: {filename}",
{"document_id": doc_result.id, "content_hash": doc_result.content_hash, "file_type": "document", "etl_service": "DOCLING"}
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "document", "etl_service": "DOCLING"}
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process file: {filename}",
str(e),
{"error_type": type(e).__name__, "filename": filename}
)
import logging
logging.error(f"Error processing file in background: {str(e)}")
raise # Re-raise so the wrapper can also handle it
@router.get("/documents/", response_model=List[DocumentRead])
@ -467,11 +605,47 @@ async def process_extension_document_with_new_session(
):
"""Create a new session and process extension document."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_extension_document",
source="document_processor",
message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}",
metadata={
"document_type": "EXTENSION",
"url": individual_document.metadata.VisitedWebPageURL,
"title": individual_document.metadata.VisitedWebPageTitle,
"user_id": user_id
}
)
try:
await add_extension_received_document(session, individual_document, search_space_id, user_id)
result = await add_extension_received_document(session, individual_document, search_space_id, user_id)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
{"document_id": result.id, "content_hash": result.content_hash}
)
else:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True}
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}",
str(e),
{"error_type": type(e).__name__}
)
import logging
logging.error(f"Error processing extension document: {str(e)}")
@ -483,11 +657,46 @@ async def process_crawled_url_with_new_session(
):
"""Create a new session and process crawled URL."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_crawled_url",
source="document_processor",
message=f"Starting URL crawling and processing for: {url}",
metadata={
"document_type": "CRAWLED_URL",
"url": url,
"user_id": user_id
}
)
try:
await add_crawled_url_document(session, url, search_space_id, user_id)
result = await add_crawled_url_document(session, url, search_space_id, user_id)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully crawled and processed URL: {url}",
{"document_id": result.id, "title": result.title, "content_hash": result.content_hash}
)
else:
await task_logger.log_task_success(
log_entry,
f"URL document already exists (duplicate): {url}",
{"duplicate_detected": True}
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to crawl URL: {url}",
str(e),
{"error_type": type(e).__name__}
)
import logging
logging.error(f"Error processing crawled URL: {str(e)}")
@ -500,9 +709,38 @@ async def process_file_in_background_with_new_session(
):
"""Create a new session and process file."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
await process_file_in_background(file_path, filename, search_space_id, user_id, session)
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
source="document_processor",
message=f"Starting file processing for: {filename}",
metadata={
"document_type": "FILE",
"filename": filename,
"file_path": file_path,
"user_id": user_id
}
)
try:
await process_file_in_background(file_path, filename, search_space_id, user_id, session, task_logger, log_entry)
# Note: success/failure logging is handled within process_file_in_background
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process file: {filename}",
str(e),
{"error_type": type(e).__name__}
)
import logging
logging.error(f"Error processing file: {str(e)}")
async def process_youtube_video_with_new_session(
@ -512,11 +750,46 @@ async def process_youtube_video_with_new_session(
):
"""Create a new session and process YouTube video."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_youtube_video",
source="document_processor",
message=f"Starting YouTube video processing for: {url}",
metadata={
"document_type": "YOUTUBE_VIDEO",
"url": url,
"user_id": user_id
}
)
try:
await add_youtube_video_document(session, url, search_space_id, user_id)
result = await add_youtube_video_document(session, url, search_space_id, user_id)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed YouTube video: {result.title}",
{"document_id": result.id, "video_id": result.document_metadata.get("video_id"), "content_hash": result.content_hash}
)
else:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists (duplicate): {url}",
{"duplicate_detected": True}
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process YouTube video: {url}",
str(e),
{"error_type": type(e).__name__}
)
import logging
logging.error(f"Error processing YouTube video: {str(e)}")

View file

@ -0,0 +1,280 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import and_, desc
from typing import List, Optional
from datetime import datetime, timedelta
from app.db import get_async_session, User, SearchSpace, Log, LogLevel, LogStatus
from app.schemas import LogCreate, LogUpdate, LogRead, LogFilter
from app.users import current_active_user
from app.utils.check_ownership import check_ownership
router = APIRouter()
@router.post("/logs/", response_model=LogRead)
async def create_log(
log: LogCreate,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)
):
"""Create a new log entry."""
try:
# Check if the user owns the search space
await check_ownership(session, SearchSpace, log.search_space_id, user)
db_log = Log(**log.model_dump())
session.add(db_log)
await session.commit()
await session.refresh(db_log)
return db_log
except HTTPException:
raise
except Exception as e:
await session.rollback()
raise HTTPException(
status_code=500,
detail=f"Failed to create log: {str(e)}"
)
@router.get("/logs/", response_model=List[LogRead])
async def read_logs(
skip: int = 0,
limit: int = 100,
search_space_id: Optional[int] = None,
level: Optional[LogLevel] = None,
status: Optional[LogStatus] = None,
source: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)
):
"""Get logs with optional filtering."""
try:
# Build base query - only logs from user's search spaces
query = (
select(Log)
.join(SearchSpace)
.filter(SearchSpace.user_id == user.id)
.order_by(desc(Log.created_at)) # Most recent first
)
# Apply filters
filters = []
if search_space_id is not None:
await check_ownership(session, SearchSpace, search_space_id, user)
filters.append(Log.search_space_id == search_space_id)
if level is not None:
filters.append(Log.level == level)
if status is not None:
filters.append(Log.status == status)
if source is not None:
filters.append(Log.source.ilike(f"%{source}%"))
if start_date is not None:
filters.append(Log.created_at >= start_date)
if end_date is not None:
filters.append(Log.created_at <= end_date)
if filters:
query = query.filter(and_(*filters))
# Apply pagination
result = await session.execute(query.offset(skip).limit(limit))
return result.scalars().all()
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to fetch logs: {str(e)}"
)
@router.get("/logs/{log_id}", response_model=LogRead)
async def read_log(
log_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)
):
"""Get a specific log by ID."""
try:
# Get log and verify user owns the search space
result = await session.execute(
select(Log)
.join(SearchSpace)
.filter(Log.id == log_id, SearchSpace.user_id == user.id)
)
log = result.scalars().first()
if not log:
raise HTTPException(status_code=404, detail="Log not found")
return log
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to fetch log: {str(e)}"
)
@router.put("/logs/{log_id}", response_model=LogRead)
async def update_log(
log_id: int,
log_update: LogUpdate,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)
):
"""Update a log entry."""
try:
# Get log and verify user owns the search space
result = await session.execute(
select(Log)
.join(SearchSpace)
.filter(Log.id == log_id, SearchSpace.user_id == user.id)
)
db_log = result.scalars().first()
if not db_log:
raise HTTPException(status_code=404, detail="Log not found")
# Update only provided fields
update_data = log_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_log, field, value)
await session.commit()
await session.refresh(db_log)
return db_log
except HTTPException:
raise
except Exception as e:
await session.rollback()
raise HTTPException(
status_code=500,
detail=f"Failed to update log: {str(e)}"
)
@router.delete("/logs/{log_id}")
async def delete_log(
log_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)
):
"""Delete a log entry."""
try:
# Get log and verify user owns the search space
result = await session.execute(
select(Log)
.join(SearchSpace)
.filter(Log.id == log_id, SearchSpace.user_id == user.id)
)
db_log = result.scalars().first()
if not db_log:
raise HTTPException(status_code=404, detail="Log not found")
await session.delete(db_log)
await session.commit()
return {"message": "Log deleted successfully"}
except HTTPException:
raise
except Exception as e:
await session.rollback()
raise HTTPException(
status_code=500,
detail=f"Failed to delete log: {str(e)}"
)
@router.get("/logs/search-space/{search_space_id}/summary")
async def get_logs_summary(
search_space_id: int,
hours: int = 24,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)
):
"""Get a summary of logs for a search space in the last X hours."""
try:
# Check ownership
await check_ownership(session, SearchSpace, search_space_id, user)
# Calculate time window
since = datetime.utcnow().replace(microsecond=0) - timedelta(hours=hours)
# Get logs from the time window
result = await session.execute(
select(Log)
.filter(
and_(
Log.search_space_id == search_space_id,
Log.created_at >= since
)
)
.order_by(desc(Log.created_at))
)
logs = result.scalars().all()
# Create summary
summary = {
"total_logs": len(logs),
"time_window_hours": hours,
"by_status": {},
"by_level": {},
"by_source": {},
"active_tasks": [],
"recent_failures": []
}
# Count by status and level
for log in logs:
# Status counts
status_str = log.status.value
summary["by_status"][status_str] = summary["by_status"].get(status_str, 0) + 1
# Level counts
level_str = log.level.value
summary["by_level"][level_str] = summary["by_level"].get(level_str, 0) + 1
# Source counts
if log.source:
summary["by_source"][log.source] = summary["by_source"].get(log.source, 0) + 1
# Active tasks (IN_PROGRESS)
if log.status == LogStatus.IN_PROGRESS:
task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown"
summary["active_tasks"].append({
"id": log.id,
"task_name": task_name,
"message": log.message,
"started_at": log.created_at,
"source": log.source
})
# Recent failures
if log.status == LogStatus.FAILED and len(summary["recent_failures"]) < 10:
task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown"
summary["recent_failures"].append({
"id": log.id,
"task_name": task_name,
"message": log.message,
"failed_at": log.created_at,
"source": log.source,
"error_details": log.log_metadata.get("error_details") if log.log_metadata else None
})
return summary
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to generate logs summary: {str(e)}"
)

View file

@ -14,6 +14,7 @@ from .podcasts import PodcastBase, PodcastCreate, PodcastUpdate, PodcastRead, Po
from .chats import ChatBase, ChatCreate, ChatUpdate, ChatRead, AISDKChatRequest
from .search_source_connector import SearchSourceConnectorBase, SearchSourceConnectorCreate, SearchSourceConnectorUpdate, SearchSourceConnectorRead
from .llm_config import LLMConfigBase, LLMConfigCreate, LLMConfigUpdate, LLMConfigRead
from .logs import LogBase, LogCreate, LogUpdate, LogRead, LogFilter
__all__ = [
"AISDKChatRequest",
@ -53,4 +54,9 @@ __all__ = [
"LLMConfigCreate",
"LLMConfigUpdate",
"LLMConfigRead",
"LogBase",
"LogCreate",
"LogUpdate",
"LogRead",
"LogFilter",
]

View file

@ -0,0 +1,44 @@
from datetime import datetime
from typing import Optional, Dict, Any
from pydantic import BaseModel, ConfigDict
from .base import IDModel, TimestampModel
from app.db import LogLevel, LogStatus
class LogBase(BaseModel):
level: LogLevel
status: LogStatus
message: str
source: Optional[str] = None
log_metadata: Optional[Dict[str, Any]] = None
class LogCreate(BaseModel):
level: LogLevel
status: LogStatus
message: str
source: Optional[str] = None
log_metadata: Optional[Dict[str, Any]] = None
search_space_id: int
class LogUpdate(BaseModel):
level: Optional[LogLevel] = None
status: Optional[LogStatus] = None
message: Optional[str] = None
source: Optional[str] = None
log_metadata: Optional[Dict[str, Any]] = None
class LogRead(LogBase, IDModel, TimestampModel):
id: int
created_at: datetime
search_space_id: int
model_config = ConfigDict(from_attributes=True)
class LogFilter(BaseModel):
level: Optional[LogLevel] = None
status: Optional[LogStatus] = None
source: Optional[str] = None
search_space_id: Optional[int] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
model_config = ConfigDict(from_attributes=True)

View file

@ -23,17 +23,138 @@ class StreamingService:
"content": []
}
]
# It is used to send annotations to the frontend
# DEPRECATED: This sends the full annotation array every time (inefficient)
def _format_annotations(self) -> str:
"""
Format the annotations as a string
DEPRECATED: This method sends the full annotation state every time.
Use the delta formatters instead for optimal streaming.
Returns:
str: The formatted annotations string
"""
return f'8:{json.dumps(self.message_annotations)}\n'
# It is used to end Streaming
def format_terminal_info_delta(self, text: str, message_type: str = "info") -> str:
"""
Format a single terminal info message as a delta annotation
Args:
text: The terminal message text
message_type: The message type (info, error, success, etc.)
Returns:
str: The formatted annotation delta string
"""
message = {"id": self.terminal_idx, "text": text, "type": message_type}
self.terminal_idx += 1
# Update internal state for reference
self.message_annotations[0]["content"].append(message)
# Return only the delta annotation
annotation = {"type": "TERMINAL_INFO", "content": [message]}
return f"8:[{json.dumps(annotation)}]\n"
def format_sources_delta(self, sources: List[Dict[str, Any]]) -> str:
"""
Format sources as a delta annotation
Args:
sources: List of source objects
Returns:
str: The formatted annotation delta string
"""
# Update internal state
self.message_annotations[1]["content"] = sources
# Return only the delta annotation
annotation = {"type": "SOURCES", "content": sources}
return f"8:[{json.dumps(annotation)}]\n"
def format_answer_delta(self, answer_chunk: str) -> str:
"""
Format a single answer chunk as a delta annotation
Args:
answer_chunk: The new answer chunk to add
Returns:
str: The formatted annotation delta string
"""
# Update internal state by appending the chunk
if isinstance(self.message_annotations[2]["content"], list):
self.message_annotations[2]["content"].append(answer_chunk)
else:
self.message_annotations[2]["content"] = [answer_chunk]
# Return only the delta annotation with the new chunk
annotation = {"type": "ANSWER", "content": [answer_chunk]}
return f"8:[{json.dumps(annotation)}]\n"
def format_answer_annotation(self, answer_lines: List[str]) -> str:
"""
Format the complete answer as a replacement annotation
Args:
answer_lines: Complete list of answer lines
Returns:
str: The formatted annotation string
"""
# Update internal state
self.message_annotations[2]["content"] = answer_lines
# Return the full answer annotation
annotation = {"type": "ANSWER", "content": answer_lines}
return f"8:[{json.dumps(annotation)}]\n"
def format_further_questions_delta(
self, further_questions: List[Dict[str, Any]]
) -> str:
"""
Format further questions as a delta annotation
Args:
further_questions: List of further question objects
Returns:
str: The formatted annotation delta string
"""
# Update internal state
self.message_annotations[3]["content"] = further_questions
# Return only the delta annotation
annotation = {"type": "FURTHER_QUESTIONS", "content": further_questions}
return f"8:[{json.dumps(annotation)}]\n"
def format_text_chunk(self, text: str) -> str:
"""
Format a text chunk using the text stream part
Args:
text: The text chunk to stream
Returns:
str: The formatted text part string
"""
return f"0:{json.dumps(text)}\n"
def format_error(self, error_message: str) -> str:
"""
Format an error using the error stream part
Args:
error_message: The error message
Returns:
str: The formatted error part string
"""
return f"3:{json.dumps(error_message)}\n"
def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str:
"""
Format a completion message
@ -56,7 +177,12 @@ class StreamingService:
}
return f'd:{json.dumps(completion_data)}\n'
# DEPRECATED METHODS: Keep for backward compatibility but mark as deprecated
def only_update_terminal(self, text: str, message_type: str = "info") -> str:
"""
DEPRECATED: Use format_terminal_info_delta() instead for optimal streaming
"""
self.message_annotations[0]["content"].append({
"id": self.terminal_idx,
"text": text,
@ -66,17 +192,23 @@ class StreamingService:
return self.message_annotations
def only_update_sources(self, sources: List[Dict[str, Any]]) -> str:
"""
DEPRECATED: Use format_sources_delta() instead for optimal streaming
"""
self.message_annotations[1]["content"] = sources
return self.message_annotations
def only_update_answer(self, answer: List[str]) -> str:
"""
DEPRECATED: Use format_answer_delta() or format_answer_annotation() instead for optimal streaming
"""
self.message_annotations[2]["content"] = answer
return self.message_annotations
def only_update_further_questions(self, further_questions: List[Dict[str, Any]]) -> str:
"""
Update the further questions annotation
DEPRECATED: Use format_further_questions_delta() instead for optimal streaming
Args:
further_questions: List of further question objects with id and question fields

View file

@ -0,0 +1,204 @@
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Log, LogLevel, LogStatus
import logging
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class TaskLoggingService:
"""Service for logging background tasks using the database Log model"""
def __init__(self, session: AsyncSession, search_space_id: int):
self.session = session
self.search_space_id = search_space_id
async def log_task_start(
self,
task_name: str,
source: str,
message: str,
metadata: Optional[Dict[str, Any]] = None
) -> Log:
"""
Log the start of a task with IN_PROGRESS status
Args:
task_name: Name/identifier of the task
source: Source service/component (e.g., 'document_processor', 'slack_indexer')
message: Human-readable message about the task
metadata: Additional context data
Returns:
Log: The created log entry
"""
log_metadata = metadata or {}
log_metadata.update({
"task_name": task_name,
"started_at": datetime.utcnow().isoformat()
})
log_entry = Log(
level=LogLevel.INFO,
status=LogStatus.IN_PROGRESS,
message=message,
source=source,
log_metadata=log_metadata,
search_space_id=self.search_space_id
)
self.session.add(log_entry)
await self.session.commit()
await self.session.refresh(log_entry)
logger.info(f"Started task {task_name}: {message}")
return log_entry
async def log_task_success(
self,
log_entry: Log,
message: str,
additional_metadata: Optional[Dict[str, Any]] = None
) -> Log:
"""
Update a log entry to SUCCESS status
Args:
log_entry: The original log entry to update
message: Success message
additional_metadata: Additional metadata to merge
Returns:
Log: The updated log entry
"""
# Update the existing log entry
log_entry.status = LogStatus.SUCCESS
log_entry.message = message
# Merge additional metadata
if additional_metadata:
if log_entry.log_metadata is None:
log_entry.log_metadata = {}
log_entry.log_metadata.update(additional_metadata)
log_entry.log_metadata["completed_at"] = datetime.utcnow().isoformat()
await self.session.commit()
await self.session.refresh(log_entry)
task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
logger.info(f"Completed task {task_name}: {message}")
return log_entry
async def log_task_failure(
self,
log_entry: Log,
error_message: str,
error_details: Optional[str] = None,
additional_metadata: Optional[Dict[str, Any]] = None
) -> Log:
"""
Update a log entry to FAILED status
Args:
log_entry: The original log entry to update
error_message: Error message
error_details: Detailed error information
additional_metadata: Additional metadata to merge
Returns:
Log: The updated log entry
"""
# Update the existing log entry
log_entry.status = LogStatus.FAILED
log_entry.level = LogLevel.ERROR
log_entry.message = error_message
# Merge additional metadata
if log_entry.log_metadata is None:
log_entry.log_metadata = {}
log_entry.log_metadata.update({
"failed_at": datetime.utcnow().isoformat(),
"error_details": error_details
})
if additional_metadata:
log_entry.log_metadata.update(additional_metadata)
await self.session.commit()
await self.session.refresh(log_entry)
task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
logger.error(f"Failed task {task_name}: {error_message}")
if error_details:
logger.error(f"Error details: {error_details}")
return log_entry
async def log_task_progress(
self,
log_entry: Log,
progress_message: str,
progress_metadata: Optional[Dict[str, Any]] = None
) -> Log:
"""
Update a log entry with progress information while keeping IN_PROGRESS status
Args:
log_entry: The log entry to update
progress_message: Progress update message
progress_metadata: Additional progress metadata
Returns:
Log: The updated log entry
"""
log_entry.message = progress_message
if progress_metadata:
if log_entry.log_metadata is None:
log_entry.log_metadata = {}
log_entry.log_metadata.update(progress_metadata)
log_entry.log_metadata["last_progress_update"] = datetime.utcnow().isoformat()
await self.session.commit()
await self.session.refresh(log_entry)
task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
logger.info(f"Progress update for task {task_name}: {progress_message}")
return log_entry
async def log_simple_event(
self,
level: LogLevel,
source: str,
message: str,
metadata: Optional[Dict[str, Any]] = None
) -> Log:
"""
Log a simple event (not a long-running task)
Args:
level: Log level
source: Source service/component
message: Log message
metadata: Additional context data
Returns:
Log: The created log entry
"""
log_entry = Log(
level=level,
status=LogStatus.SUCCESS, # Simple events are immediately complete
message=message,
source=source,
log_metadata=metadata or {},
search_space_id=self.search_space_id
)
self.session.add(log_entry)
await self.session.commit()
await self.session.refresh(log_entry)
logger.info(f"Logged event from {source}: {message}")
return log_entry

View file

@ -8,6 +8,7 @@ from app.config import config
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.utils.document_converters import convert_document_to_markdown, generate_content_hash
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from langchain_core.documents import Document as LangChainDocument
from langchain_community.document_loaders import FireCrawlLoader, AsyncChromiumLoader
from langchain_community.document_transformers import MarkdownifyTransformer
@ -22,10 +23,34 @@ md = MarkdownifyTransformer()
async def add_crawled_url_document(
session: AsyncSession, url: str, search_space_id: int, user_id: str
) -> Optional[Document]:
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="crawl_url_document",
source="background_task",
message=f"Starting URL crawling process for: {url}",
metadata={"url": url, "user_id": str(user_id)}
)
try:
# URL validation step
await task_logger.log_task_progress(
log_entry,
f"Validating URL: {url}",
{"stage": "validation"}
)
if not validators.url(url):
raise ValueError(f"Url {url} is not a valid URL address")
# Set up crawler
await task_logger.log_task_progress(
log_entry,
f"Setting up crawler for URL: {url}",
{"stage": "crawler_setup", "firecrawl_available": bool(config.FIRECRAWL_API_KEY)}
)
if config.FIRECRAWL_API_KEY:
crawl_loader = FireCrawlLoader(
url=url,
@ -39,6 +64,13 @@ async def add_crawled_url_document(
else:
crawl_loader = AsyncChromiumLoader(urls=[url], headless=True)
# Perform crawling
await task_logger.log_task_progress(
log_entry,
f"Crawling URL content: {url}",
{"stage": "crawling", "crawler_type": type(crawl_loader).__name__}
)
url_crawled = await crawl_loader.aload()
if type(crawl_loader) == FireCrawlLoader:
@ -46,6 +78,13 @@ async def add_crawled_url_document(
elif type(crawl_loader) == AsyncChromiumLoader:
content_in_markdown = md.transform_documents(url_crawled)[0].page_content
# Format document
await task_logger.log_task_progress(
log_entry,
f"Processing crawled content from: {url}",
{"stage": "content_processing", "content_length": len(content_in_markdown)}
)
# Format document metadata in a more maintainable way
metadata_sections = [
(
@ -74,6 +113,13 @@ async def add_crawled_url_document(
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check for duplicates
await task_logger.log_task_progress(
log_entry,
f"Checking for duplicate content: {url}",
{"stage": "duplicate_check", "content_hash": content_hash}
)
# Check if document with this content hash already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
@ -81,15 +127,33 @@ async def add_crawled_url_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Document already exists for URL: {url}",
{"duplicate_detected": True, "existing_document_id": existing_document.id}
)
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
# Get LLM for summary generation
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {url}",
{"stage": "llm_setup"}
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
raise RuntimeError(f"No long context LLM configured for user {user_id}")
# Generate summary
await task_logger.log_task_progress(
log_entry,
f"Generating summary for URL content: {url}",
{"stage": "summary_generation"}
)
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
summary_result = await summary_chain.ainvoke(
{"document": combined_document_string}
@ -98,6 +162,12 @@ async def add_crawled_url_document(
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
await task_logger.log_task_progress(
log_entry,
f"Processing content chunks for URL: {url}",
{"stage": "chunk_processing"}
)
chunks = [
Chunk(
content=chunk.text,
@ -107,6 +177,12 @@ async def add_crawled_url_document(
]
# Create and store document
await task_logger.log_task_progress(
log_entry,
f"Creating document in database for URL: {url}",
{"stage": "document_creation", "chunks_count": len(chunks)}
)
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
@ -124,13 +200,38 @@ async def add_crawled_url_document(
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully crawled and processed URL: {url}",
{
"document_id": document.id,
"title": document.title,
"content_hash": content_hash,
"chunks_count": len(chunks),
"summary_length": len(summary_content)
}
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error while processing URL: {url}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to crawl URL: {url}",
str(e),
{"error_type": type(e).__name__}
)
raise RuntimeError(f"Failed to crawl URL: {str(e)}")
@ -148,6 +249,20 @@ async def add_extension_received_document(
Returns:
Document object if successful, None if failed
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="extension_document",
source="background_task",
message=f"Processing extension document: {content.metadata.VisitedWebPageTitle}",
metadata={
"url": content.metadata.VisitedWebPageURL,
"title": content.metadata.VisitedWebPageTitle,
"user_id": str(user_id)
}
)
try:
# Format document metadata in a more maintainable way
metadata_sections = [
@ -188,6 +303,11 @@ async def add_extension_received_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists: {content.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True, "existing_document_id": existing_document.id}
)
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
@ -229,19 +349,52 @@ async def add_extension_received_document(
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully processed extension document: {content.metadata.VisitedWebPageTitle}",
{
"document_id": document.id,
"content_hash": content_hash,
"url": content.metadata.VisitedWebPageURL
}
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error processing extension document: {content.metadata.VisitedWebPageTitle}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to process extension document: {content.metadata.VisitedWebPageTitle}",
str(e),
{"error_type": type(e).__name__}
)
raise RuntimeError(f"Failed to process extension document: {str(e)}")
async def add_received_markdown_file_document(
session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int, user_id: str
) -> Optional[Document]:
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="markdown_file_document",
source="background_task",
message=f"Processing markdown file: {file_name}",
metadata={"filename": file_name, "user_id": str(user_id), "content_length": len(file_in_markdown)}
)
try:
content_hash = generate_content_hash(file_in_markdown, search_space_id)
@ -252,6 +405,11 @@ async def add_received_markdown_file_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Markdown file document already exists: {file_name}",
{"duplicate_detected": True, "existing_document_id": existing_document.id}
)
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
@ -293,12 +451,36 @@ async def add_received_markdown_file_document(
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully processed markdown file: {file_name}",
{
"document_id": document.id,
"content_hash": content_hash,
"chunks_count": len(chunks),
"summary_length": len(summary_content)
}
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error processing markdown file: {file_name}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to process markdown file: {file_name}",
str(e),
{"error_type": type(e).__name__}
)
raise RuntimeError(f"Failed to process file document: {str(e)}")
@ -566,8 +748,24 @@ async def add_youtube_video_document(
SQLAlchemyError: If there's a database error
RuntimeError: If the video processing fails
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="youtube_video_document",
source="background_task",
message=f"Starting YouTube video processing for: {url}",
metadata={"url": url, "user_id": str(user_id)}
)
try:
# Extract video ID from URL
await task_logger.log_task_progress(
log_entry,
f"Extracting video ID from URL: {url}",
{"stage": "video_id_extraction"}
)
def get_youtube_video_id(url: str):
parsed_url = urlparse(url)
hostname = parsed_url.hostname
@ -589,7 +787,19 @@ async def add_youtube_video_document(
if not video_id:
raise ValueError(f"Could not extract video ID from URL: {url}")
# Get video metadata using async HTTP client
await task_logger.log_task_progress(
log_entry,
f"Video ID extracted: {video_id}",
{"stage": "video_id_extracted", "video_id": video_id}
)
# Get video metadata
await task_logger.log_task_progress(
log_entry,
f"Fetching video metadata for: {video_id}",
{"stage": "metadata_fetch"}
)
params = {
"format": "json",
"url": f"https://www.youtube.com/watch?v={video_id}",
@ -600,7 +810,19 @@ async def add_youtube_video_document(
async with http_session.get(oembed_url, params=params) as response:
video_data = await response.json()
await task_logger.log_task_progress(
log_entry,
f"Video metadata fetched: {video_data.get('title', 'Unknown')}",
{"stage": "metadata_fetched", "title": video_data.get('title'), "author": video_data.get('author_name')}
)
# Get video transcript
await task_logger.log_task_progress(
log_entry,
f"Fetching transcript for video: {video_id}",
{"stage": "transcript_fetch"}
)
try:
captions = YouTubeTranscriptApi.get_transcript(video_id)
# Include complete caption information with timestamps
@ -612,8 +834,26 @@ async def add_youtube_video_document(
timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]"
transcript_segments.append(f"{timestamp} {text}")
transcript_text = "\n".join(transcript_segments)
await task_logger.log_task_progress(
log_entry,
f"Transcript fetched successfully: {len(captions)} segments",
{"stage": "transcript_fetched", "segments_count": len(captions), "transcript_length": len(transcript_text)}
)
except Exception as e:
transcript_text = f"No captions available for this video. Error: {str(e)}"
await task_logger.log_task_progress(
log_entry,
f"No transcript available for video: {video_id}",
{"stage": "transcript_unavailable", "error": str(e)}
)
# Format document
await task_logger.log_task_progress(
log_entry,
f"Processing video content: {video_data.get('title', 'YouTube Video')}",
{"stage": "content_processing"}
)
# Format document metadata in a more maintainable way
metadata_sections = [
@ -646,6 +886,13 @@ async def add_youtube_video_document(
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check for duplicates
await task_logger.log_task_progress(
log_entry,
f"Checking for duplicate video content: {video_id}",
{"stage": "duplicate_check", "content_hash": content_hash}
)
# Check if document with this content hash already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
@ -653,15 +900,33 @@ async def add_youtube_video_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}",
{"duplicate_detected": True, "existing_document_id": existing_document.id, "video_id": video_id}
)
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
# Get LLM for summary generation
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
{"stage": "llm_setup"}
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
raise RuntimeError(f"No long context LLM configured for user {user_id}")
# Generate summary
await task_logger.log_task_progress(
log_entry,
f"Generating summary for video: {video_data.get('title', 'YouTube Video')}",
{"stage": "summary_generation"}
)
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
summary_result = await summary_chain.ainvoke(
{"document": combined_document_string}
@ -670,6 +935,12 @@ async def add_youtube_video_document(
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
await task_logger.log_task_progress(
log_entry,
f"Processing content chunks for video: {video_data.get('title', 'YouTube Video')}",
{"stage": "chunk_processing"}
)
chunks = [
Chunk(
content=chunk.text,
@ -679,6 +950,11 @@ async def add_youtube_video_document(
]
# Create document
await task_logger.log_task_progress(
log_entry,
f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_creation", "chunks_count": len(chunks)}
)
document = Document(
title=video_data.get("title", "YouTube Video"),
@ -701,11 +977,38 @@ async def add_youtube_video_document(
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully processed YouTube video: {video_data.get('title', 'YouTube Video')}",
{
"document_id": document.id,
"video_id": video_id,
"title": document.title,
"content_hash": content_hash,
"chunks_count": len(chunks),
"summary_length": len(summary_content),
"has_transcript": "No captions available" not in transcript_text
}
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error while processing YouTube video: {url}",
str(db_error),
{"error_type": "SQLAlchemyError", "video_id": video_id if 'video_id' in locals() else None}
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to process YouTube video: {url}",
str(e),
{"error_type": type(e).__name__, "video_id": video_id if 'video_id' in locals() else None}
)
logging.error(f"Failed to process YouTube video: {str(e)}")
raise

View file

@ -7,6 +7,7 @@ from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchS
from app.config import config
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.connectors.slack_history import SlackHistory
from app.connectors.notion_history import NotionHistoryConnector
from app.connectors.github_connector import GitHubConnector
@ -42,8 +43,24 @@ async def index_slack_messages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="slack_messages_indexing",
source="connector_indexing_task",
message=f"Starting Slack messages indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Slack connector {connector_id} from database",
{"stage": "connector_retrieval"}
)
result = await session.execute(
select(SearchSourceConnector)
.filter(
@ -54,17 +71,41 @@ async def index_slack_messages(
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Slack connector",
"Connector not found",
{"error_type": "ConnectorNotFound"}
)
return 0, f"Connector with ID {connector_id} not found or is not a Slack connector"
# Get the Slack token from the connector config
slack_token = connector.config.get("SLACK_BOT_TOKEN")
if not slack_token:
await task_logger.log_task_failure(
log_entry,
f"Slack token not found in connector config for connector {connector_id}",
"Missing Slack token",
{"error_type": "MissingToken"}
)
return 0, "Slack token not found in connector config"
# Initialize Slack client
await task_logger.log_task_progress(
log_entry,
f"Initializing Slack client for connector {connector_id}",
{"stage": "client_initialization"}
)
slack_client = SlackHistory(token=slack_token)
# Calculate date range
await task_logger.log_task_progress(
log_entry,
f"Calculating date range for Slack indexing",
{"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date}
)
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
@ -95,13 +136,30 @@ async def index_slack_messages(
logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}")
await task_logger.log_task_progress(
log_entry,
f"Fetching Slack channels from {start_date_str} to {end_date_str}",
{"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str}
)
# Get all channels
try:
channels = slack_client.get_all_channels()
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Slack channels for connector {connector_id}",
str(e),
{"error_type": "ChannelFetchError"}
)
return 0, f"Failed to get Slack channels: {str(e)}"
if not channels:
await task_logger.log_task_success(
log_entry,
f"No Slack channels found for connector {connector_id}",
{"channels_found": 0}
)
return 0, "No Slack channels found"
# Track the number of documents indexed
@ -109,6 +167,12 @@ async def index_slack_messages(
documents_skipped = 0
skipped_channels = []
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(channels)} Slack channels",
{"stage": "process_channels", "total_channels": len(channels)}
)
# Process each channel
for channel_obj in channels: # Modified loop to iterate over list of channel objects
channel_id = channel_obj["id"]
@ -283,15 +347,40 @@ async def index_slack_messages(
else:
result_message = f"Processed {total_processed} channels."
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Slack indexing for connector {connector_id}",
{
"channels_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_channels_count": len(skipped_channels),
"result_message": result_message
}
)
logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Slack indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
logger.error(f"Database error: {str(db_error)}")
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Slack messages for connector {connector_id}",
str(e),
{"error_type": type(e).__name__}
)
logger.error(f"Failed to index Slack messages: {str(e)}")
return 0, f"Failed to index Slack messages: {str(e)}"
@ -316,8 +405,24 @@ async def index_notion_pages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="notion_pages_indexing",
source="connector_indexing_task",
message=f"Starting Notion pages indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Notion connector {connector_id} from database",
{"stage": "connector_retrieval"}
)
result = await session.execute(
select(SearchSourceConnector)
.filter(
@ -328,14 +433,32 @@ async def index_notion_pages(
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Notion connector",
"Connector not found",
{"error_type": "ConnectorNotFound"}
)
return 0, f"Connector with ID {connector_id} not found or is not a Notion connector"
# Get the Notion token from the connector config
notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN")
if not notion_token:
await task_logger.log_task_failure(
log_entry,
f"Notion integration token not found in connector config for connector {connector_id}",
"Missing Notion token",
{"error_type": "MissingToken"}
)
return 0, "Notion integration token not found in connector config"
# Initialize Notion client
await task_logger.log_task_progress(
log_entry,
f"Initializing Notion client for connector {connector_id}",
{"stage": "client_initialization"}
)
logger.info(f"Initializing Notion client for connector {connector_id}")
notion_client = NotionHistoryConnector(token=notion_token)
@ -364,15 +487,32 @@ async def index_notion_pages(
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
await task_logger.log_task_progress(
log_entry,
f"Fetching Notion pages from {start_date_iso} to {end_date_iso}",
{"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso}
)
# Get all pages
try:
pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso)
logger.info(f"Found {len(pages)} Notion pages")
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Notion pages for connector {connector_id}",
str(e),
{"error_type": "PageFetchError"}
)
logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to get Notion pages: {str(e)}"
if not pages:
await task_logger.log_task_success(
log_entry,
f"No Notion pages found for connector {connector_id}",
{"pages_found": 0}
)
logger.info("No Notion pages found to index")
return 0, "No Notion pages found"
@ -381,6 +521,12 @@ async def index_notion_pages(
documents_skipped = 0
skipped_pages = []
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(pages)} Notion pages",
{"stage": "process_pages", "total_pages": len(pages)}
)
# Process each page
for page in pages:
try:
@ -552,15 +698,40 @@ async def index_notion_pages(
else:
result_message = f"Processed {total_processed} pages."
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Notion indexing for connector {connector_id}",
{
"pages_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_pages_count": len(skipped_pages),
"result_message": result_message
}
)
logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Notion indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Notion pages for connector {connector_id}",
str(e),
{"error_type": type(e).__name__}
)
logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to index Notion pages: {str(e)}"
@ -585,11 +756,27 @@ async def index_github_repos(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="github_repos_indexing",
source="connector_indexing_task",
message=f"Starting GitHub repositories indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
documents_processed = 0
errors = []
try:
# 1. Get the GitHub connector from the database
await task_logger.log_task_progress(
log_entry,
f"Retrieving GitHub connector {connector_id} from database",
{"stage": "connector_retrieval"}
)
result = await session.execute(
select(SearchSourceConnector)
.filter(
@ -600,6 +787,12 @@ async def index_github_repos(
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a GitHub connector",
"Connector not found",
{"error_type": "ConnectorNotFound"}
)
return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector"
# 2. Get the GitHub PAT and selected repositories from the connector config
@ -607,20 +800,50 @@ async def index_github_repos(
repo_full_names_to_index = connector.config.get("repo_full_names")
if not github_pat:
await task_logger.log_task_failure(
log_entry,
f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}",
"Missing GitHub PAT",
{"error_type": "MissingToken"}
)
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list):
return 0, "'repo_full_names' not found or is not a list in connector config"
await task_logger.log_task_failure(
log_entry,
f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}",
"Invalid repo configuration",
{"error_type": "InvalidConfiguration"}
)
return 0, "'repo_full_names' not found or is not a list in connector config"
# 3. Initialize GitHub connector client
await task_logger.log_task_progress(
log_entry,
f"Initializing GitHub client for connector {connector_id}",
{"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)}
)
try:
github_client = GitHubConnector(token=github_pat)
except ValueError as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to initialize GitHub client for connector {connector_id}",
str(e),
{"error_type": "ClientInitializationError"}
)
return 0, f"Failed to initialize GitHub client: {str(e)}"
# 4. Validate selected repositories
# For simplicity, we'll proceed with the list provided.
# If a repo is inaccessible, get_repository_files will likely fail gracefully later.
await task_logger.log_task_progress(
log_entry,
f"Starting indexing for {len(repo_full_names_to_index)} selected repositories",
{"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date}
)
logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.")
if start_date and end_date:
logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)")
@ -719,13 +942,36 @@ async def index_github_repos(
await session.commit()
logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.")
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed GitHub indexing for connector {connector_id}",
{
"documents_processed": documents_processed,
"errors_count": len(errors),
"repo_count": len(repo_full_names_to_index)
}
)
except SQLAlchemyError as db_err:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during GitHub indexing for connector {connector_id}",
str(db_err),
{"error_type": "SQLAlchemyError"}
)
logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}")
errors.append(f"Database error: {db_err}")
return documents_processed, "; ".join(errors) if errors else str(db_err)
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Unexpected error during GitHub indexing for connector {connector_id}",
str(e),
{"error_type": type(e).__name__}
)
logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True)
errors.append(f"Unexpected error: {e}")
return documents_processed, "; ".join(errors) if errors else str(e)
@ -754,8 +1000,24 @@ async def index_linear_issues(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="linear_issues_indexing",
source="connector_indexing_task",
message=f"Starting Linear issues indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Linear connector {connector_id} from database",
{"stage": "connector_retrieval"}
)
result = await session.execute(
select(SearchSourceConnector)
.filter(
@ -766,14 +1028,32 @@ async def index_linear_issues(
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Linear connector",
"Connector not found",
{"error_type": "ConnectorNotFound"}
)
return 0, f"Connector with ID {connector_id} not found or is not a Linear connector"
# Get the Linear token from the connector config
linear_token = connector.config.get("LINEAR_API_KEY")
if not linear_token:
await task_logger.log_task_failure(
log_entry,
f"Linear API token not found in connector config for connector {connector_id}",
"Missing Linear token",
{"error_type": "MissingToken"}
)
return 0, "Linear API token not found in connector config"
# Initialize Linear client
await task_logger.log_task_progress(
log_entry,
f"Initializing Linear client for connector {connector_id}",
{"stage": "client_initialization"}
)
linear_client = LinearConnector(token=linear_token)
# Calculate date range
@ -807,6 +1087,12 @@ async def index_linear_issues(
logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}")
await task_logger.log_task_progress(
log_entry,
f"Fetching Linear issues from {start_date_str} to {end_date_str}",
{"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str}
)
# Get issues within date range
try:
issues, error = linear_client.get_issues_by_date_range(
@ -855,6 +1141,12 @@ async def index_linear_issues(
documents_skipped = 0
skipped_issues = []
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(issues)} Linear issues",
{"stage": "process_issues", "total_issues": len(issues)}
)
# Process each issue
for issue in issues:
try:
@ -959,16 +1251,39 @@ async def index_linear_issues(
await session.commit()
logger.info(f"Successfully committed all Linear document changes to database")
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Linear indexing for connector {connector_id}",
{
"issues_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_issues_count": len(skipped_issues)
}
)
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped")
return total_processed, None # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Linear indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
logger.error(f"Database error: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Linear issues for connector {connector_id}",
str(e),
{"error_type": type(e).__name__}
)
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Linear issues: {str(e)}"
@ -993,8 +1308,24 @@ async def index_discord_messages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="discord_messages_indexing",
source="connector_indexing_task",
message=f"Starting Discord messages indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Discord connector {connector_id} from database",
{"stage": "connector_retrieval"}
)
result = await session.execute(
select(SearchSourceConnector)
.filter(
@ -1005,16 +1336,34 @@ async def index_discord_messages(
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Discord connector",
"Connector not found",
{"error_type": "ConnectorNotFound"}
)
return 0, f"Connector with ID {connector_id} not found or is not a Discord connector"
# Get the Discord token from the connector config
discord_token = connector.config.get("DISCORD_BOT_TOKEN")
if not discord_token:
await task_logger.log_task_failure(
log_entry,
f"Discord token not found in connector config for connector {connector_id}",
"Missing Discord token",
{"error_type": "MissingToken"}
)
return 0, "Discord token not found in connector config"
logger.info(f"Starting Discord indexing for connector {connector_id}")
# Initialize Discord client
await task_logger.log_task_progress(
log_entry,
f"Initializing Discord client for connector {connector_id}",
{"stage": "client_initialization"}
)
discord_client = DiscordConnector(token=discord_token)
# Calculate date range
@ -1054,6 +1403,12 @@ async def index_discord_messages(
skipped_channels = []
try:
await task_logger.log_task_progress(
log_entry,
f"Starting Discord bot and fetching guilds for connector {connector_id}",
{"stage": "fetch_guilds"}
)
logger.info("Starting Discord bot to fetch guilds")
discord_client._bot_task = asyncio.create_task(discord_client.start_bot())
await discord_client._wait_until_ready()
@ -1062,15 +1417,32 @@ async def index_discord_messages(
guilds = await discord_client.get_guilds()
logger.info(f"Found {len(guilds)} guilds")
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Discord guilds for connector {connector_id}",
str(e),
{"error_type": "GuildFetchError"}
)
logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True)
await discord_client.close_bot()
return 0, f"Failed to get Discord guilds: {str(e)}"
if not guilds:
await task_logger.log_task_success(
log_entry,
f"No Discord guilds found for connector {connector_id}",
{"guilds_found": 0}
)
logger.info("No Discord guilds found to index")
await discord_client.close_bot()
return 0, "No Discord guilds found"
# Process each guild and channel
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(guilds)} Discord guilds",
{"stage": "process_guilds", "total_guilds": len(guilds)}
)
for guild in guilds:
guild_id = guild["id"]
guild_name = guild["name"]
@ -1242,14 +1614,40 @@ async def index_discord_messages(
else:
result_message = f"Processed {documents_indexed} channels."
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Discord indexing for connector {connector_id}",
{
"channels_processed": documents_indexed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_channels_count": len(skipped_channels),
"guilds_processed": len(guilds),
"result_message": result_message
}
)
logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return documents_indexed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Discord indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Discord messages for connector {connector_id}",
str(e),
{"error_type": type(e).__name__}
)
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
return 0, f"Failed to index Discord messages: {str(e)}"

View file

@ -2,8 +2,10 @@
from app.agents.podcaster.graph import graph as podcaster_graph
from app.agents.podcaster.state import State
from app.db import Chat, Podcast
from app.services.task_logging_service import TaskLoggingService
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
async def generate_document_podcast(
@ -24,73 +26,177 @@ async def generate_chat_podcast(
podcast_title: str,
user_id: int
):
# Fetch the chat with the specified ID
query = select(Chat).filter(
Chat.id == chat_id,
Chat.search_space_id == search_space_id
)
task_logger = TaskLoggingService(session, search_space_id)
result = await session.execute(query)
chat = result.scalars().first()
if not chat:
raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}")
# Create chat history structure
chat_history_str = "<chat_history>"
for message in chat.messages:
if message["role"] == "user":
chat_history_str += f"<user_message>{message['content']}</user_message>"
elif message["role"] == "assistant":
# Last annotation type will always be "ANSWER" here
answer_annotation = message["annotations"][-1]
answer_text = ""
if answer_annotation["type"] == "ANSWER":
answer_text = answer_annotation["content"]
# If content is a list, join it into a single string
if isinstance(answer_text, list):
answer_text = "\n".join(answer_text)
chat_history_str += f"<assistant_message>{answer_text}</assistant_message>"
chat_history_str += "</chat_history>"
# Pass it to the SurfSense Podcaster
config = {
"configurable": {
"podcast_title": "SurfSense",
"user_id": str(user_id),
# Log task start
log_entry = await task_logger.log_task_start(
task_name="generate_chat_podcast",
source="podcast_task",
message=f"Starting podcast generation for chat {chat_id}",
metadata={
"chat_id": chat_id,
"search_space_id": search_space_id,
"podcast_title": podcast_title,
"user_id": str(user_id)
}
}
# Initialize state with database session and streaming service
initial_state = State(
source_content=chat_history_str,
db_session=session
)
# Run the graph directly
result = await podcaster_graph.ainvoke(initial_state, config=config)
# Convert podcast transcript entries to serializable format
serializable_transcript = []
for entry in result["podcast_transcript"]:
serializable_transcript.append({
"speaker_id": entry.speaker_id,
"dialog": entry.dialog
})
# Create a new podcast entry
podcast = Podcast(
title=f"{podcast_title}",
podcast_transcript=serializable_transcript,
file_location=result["final_podcast_file_path"],
search_space_id=search_space_id
)
# Add to session and commit
session.add(podcast)
await session.commit()
await session.refresh(podcast)
return podcast
try:
# Fetch the chat with the specified ID
await task_logger.log_task_progress(
log_entry,
f"Fetching chat {chat_id} from database",
{"stage": "fetch_chat"}
)
query = select(Chat).filter(
Chat.id == chat_id,
Chat.search_space_id == search_space_id
)
result = await session.execute(query)
chat = result.scalars().first()
if not chat:
await task_logger.log_task_failure(
log_entry,
f"Chat with id {chat_id} not found in search space {search_space_id}",
"Chat not found",
{"error_type": "ChatNotFound"}
)
raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}")
# Create chat history structure
await task_logger.log_task_progress(
log_entry,
f"Processing chat history for chat {chat_id}",
{"stage": "process_chat_history", "message_count": len(chat.messages)}
)
chat_history_str = "<chat_history>"
processed_messages = 0
for message in chat.messages:
if message["role"] == "user":
chat_history_str += f"<user_message>{message['content']}</user_message>"
processed_messages += 1
elif message["role"] == "assistant":
# Last annotation type will always be "ANSWER" here
answer_annotation = message["annotations"][-1]
answer_text = ""
if answer_annotation["type"] == "ANSWER":
answer_text = answer_annotation["content"]
# If content is a list, join it into a single string
if isinstance(answer_text, list):
answer_text = "\n".join(answer_text)
chat_history_str += f"<assistant_message>{answer_text}</assistant_message>"
processed_messages += 1
chat_history_str += "</chat_history>"
# Pass it to the SurfSense Podcaster
await task_logger.log_task_progress(
log_entry,
f"Initializing podcast generation for chat {chat_id}",
{"stage": "initialize_podcast_generation", "processed_messages": processed_messages, "content_length": len(chat_history_str)}
)
config = {
"configurable": {
"podcast_title": "SurfSense",
"user_id": str(user_id),
}
}
# Initialize state with database session and streaming service
initial_state = State(
source_content=chat_history_str,
db_session=session
)
# Run the graph directly
await task_logger.log_task_progress(
log_entry,
f"Running podcast generation graph for chat {chat_id}",
{"stage": "run_podcast_graph"}
)
result = await podcaster_graph.ainvoke(initial_state, config=config)
# Convert podcast transcript entries to serializable format
await task_logger.log_task_progress(
log_entry,
f"Processing podcast transcript for chat {chat_id}",
{"stage": "process_transcript", "transcript_entries": len(result["podcast_transcript"])}
)
serializable_transcript = []
for entry in result["podcast_transcript"]:
serializable_transcript.append({
"speaker_id": entry.speaker_id,
"dialog": entry.dialog
})
# Create a new podcast entry
await task_logger.log_task_progress(
log_entry,
f"Creating podcast database entry for chat {chat_id}",
{"stage": "create_podcast_entry", "file_location": result.get("final_podcast_file_path")}
)
podcast = Podcast(
title=f"{podcast_title}",
podcast_transcript=serializable_transcript,
file_location=result["final_podcast_file_path"],
search_space_id=search_space_id
)
# Add to session and commit
session.add(podcast)
await session.commit()
await session.refresh(podcast)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully generated podcast for chat {chat_id}",
{
"podcast_id": podcast.id,
"podcast_title": podcast_title,
"transcript_entries": len(serializable_transcript),
"file_location": result.get("final_podcast_file_path"),
"processed_messages": processed_messages,
"content_length": len(chat_history_str)
}
)
return podcast
except ValueError as ve:
# ValueError is already logged above for chat not found
if "not found" not in str(ve):
await task_logger.log_task_failure(
log_entry,
f"Value error during podcast generation for chat {chat_id}",
str(ve),
{"error_type": "ValueError"}
)
raise ve
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during podcast generation for chat {chat_id}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Unexpected error during podcast generation for chat {chat_id}",
str(e),
{"error_type": type(e).__name__}
)
raise RuntimeError(f"Failed to generate podcast for chat {chat_id}: {str(e)}")

View file

@ -83,9 +83,8 @@ async def stream_connector_search_results(
config=config,
stream_mode="custom",
):
# If the chunk contains a 'yeild_value' key, print its value
# Note: there's a typo in 'yeild_value' in the code, but we need to match it
if isinstance(chunk, dict) and 'yeild_value' in chunk:
yield chunk['yeild_value']
yield streaming_service.format_completion()
if isinstance(chunk, dict):
if "yield_value" in chunk:
yield chunk["yield_value"]
yield streaming_service.format_completion()