From 1eb072cc690cdbb399cc7df51e2f7581c1166675 Mon Sep 17 00:00:00 2001
From: "MSI\\ModSetter"
Date: Wed, 16 Jul 2025 01:10:33 -0700
Subject: [PATCH 01/16] feat(BACKEND): Added Log Management System for better
Bug's Tracking
- Background tasks are now logged so non tech users can effectively track the failurte points easily.
---
.../alembic/versions/12_add_logs_table.py | 71 ++++
surfsense_backend/app/db.py | 25 ++
surfsense_backend/app/routes/__init__.py | 2 +
.../app/routes/documents_routes.py | 266 ++++++++++++++-
surfsense_backend/app/routes/logs_routes.py | 280 ++++++++++++++++
surfsense_backend/app/schemas/__init__.py | 6 +
surfsense_backend/app/schemas/logs.py | 44 +++
.../app/services/task_logging_service.py | 204 ++++++++++++
.../app/tasks/background_tasks.py | 305 +++++++++++++++++-
9 files changed, 1193 insertions(+), 10 deletions(-)
create mode 100644 surfsense_backend/alembic/versions/12_add_logs_table.py
create mode 100644 surfsense_backend/app/routes/logs_routes.py
create mode 100644 surfsense_backend/app/schemas/logs.py
create mode 100644 surfsense_backend/app/services/task_logging_service.py
diff --git a/surfsense_backend/alembic/versions/12_add_logs_table.py b/surfsense_backend/alembic/versions/12_add_logs_table.py
new file mode 100644
index 000000000..0b2cc13c8
--- /dev/null
+++ b/surfsense_backend/alembic/versions/12_add_logs_table.py
@@ -0,0 +1,71 @@
+"""Add LogLevel and LogStatus enums and logs table
+
+Revision ID: 12
+Revises: 11
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import JSON
+
+
+# revision identifiers, used by Alembic.
+revision: str = "12"
+down_revision: Union[str, None] = "11"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ """Upgrade schema - add LogLevel and LogStatus enums and logs table."""
+
+ # Create LogLevel enum
+ op.execute("""
+ CREATE TYPE loglevel AS ENUM ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
+ """)
+
+ # Create LogStatus enum
+ op.execute("""
+ CREATE TYPE logstatus AS ENUM ('IN_PROGRESS', 'SUCCESS', 'FAILED')
+ """)
+
+ # Create logs table
+ op.execute("""
+ CREATE TABLE logs (
+ id SERIAL PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
+ level loglevel NOT NULL,
+ status logstatus NOT NULL,
+ message TEXT NOT NULL,
+ source VARCHAR(200),
+ log_metadata JSONB DEFAULT '{}',
+ search_space_id INTEGER NOT NULL REFERENCES searchspaces(id) ON DELETE CASCADE
+ )
+ """)
+
+ # Create indexes
+ op.create_index(op.f('ix_logs_id'), 'logs', ['id'], unique=False)
+ op.create_index(op.f('ix_logs_created_at'), 'logs', ['created_at'], unique=False)
+ op.create_index(op.f('ix_logs_level'), 'logs', ['level'], unique=False)
+ op.create_index(op.f('ix_logs_status'), 'logs', ['status'], unique=False)
+ op.create_index(op.f('ix_logs_source'), 'logs', ['source'], unique=False)
+
+
+def downgrade() -> None:
+ """Downgrade schema - remove logs table and enums."""
+
+ # Drop indexes
+ op.drop_index(op.f('ix_logs_source'), table_name='logs')
+ op.drop_index(op.f('ix_logs_status'), table_name='logs')
+ op.drop_index(op.f('ix_logs_level'), table_name='logs')
+ op.drop_index(op.f('ix_logs_created_at'), table_name='logs')
+ op.drop_index(op.f('ix_logs_id'), table_name='logs')
+
+ # Drop logs table
+ op.drop_table('logs')
+
+ # Drop enums
+ op.execute("DROP TYPE IF EXISTS logstatus")
+ op.execute("DROP TYPE IF EXISTS loglevel")
\ No newline at end of file
diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py
index 0c6311a7b..7caf36533 100644
--- a/surfsense_backend/app/db.py
+++ b/surfsense_backend/app/db.py
@@ -91,6 +91,18 @@ class LiteLLMProvider(str, Enum):
ALEPH_ALPHA = "ALEPH_ALPHA"
PETALS = "PETALS"
CUSTOM = "CUSTOM"
+
+class LogLevel(str, Enum):
+ DEBUG = "DEBUG"
+ INFO = "INFO"
+ WARNING = "WARNING"
+ ERROR = "ERROR"
+ CRITICAL = "CRITICAL"
+
+class LogStatus(str, Enum):
+ IN_PROGRESS = "IN_PROGRESS"
+ SUCCESS = "SUCCESS"
+ FAILED = "FAILED"
class Base(DeclarativeBase):
pass
@@ -163,6 +175,7 @@ class SearchSpace(BaseModel, TimestampMixin):
documents = relationship("Document", back_populates="search_space", order_by="Document.id", cascade="all, delete-orphan")
podcasts = relationship("Podcast", back_populates="search_space", order_by="Podcast.id", cascade="all, delete-orphan")
chats = relationship('Chat', back_populates='search_space', order_by='Chat.id', cascade="all, delete-orphan")
+ logs = relationship("Log", back_populates="search_space", order_by="Log.id", cascade="all, delete-orphan")
class SearchSourceConnector(BaseModel, TimestampMixin):
__tablename__ = "search_source_connectors"
@@ -196,6 +209,18 @@ class LLMConfig(BaseModel, TimestampMixin):
user_id = Column(UUID(as_uuid=True), ForeignKey("user.id", ondelete='CASCADE'), nullable=False)
user = relationship("User", back_populates="llm_configs", foreign_keys=[user_id])
+class Log(BaseModel, TimestampMixin):
+ __tablename__ = "logs"
+
+ level = Column(SQLAlchemyEnum(LogLevel), nullable=False, index=True)
+ status = Column(SQLAlchemyEnum(LogStatus), nullable=False, index=True)
+ message = Column(Text, nullable=False)
+ source = Column(String(200), nullable=True, index=True) # Service/component that generated the log
+ log_metadata = Column(JSON, nullable=True, default={}) # Additional context data
+
+ search_space_id = Column(Integer, ForeignKey("searchspaces.id", ondelete='CASCADE'), nullable=False)
+ search_space = relationship("SearchSpace", back_populates="logs")
+
if config.AUTH_TYPE == "GOOGLE":
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
pass
diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py
index 3420f35ef..dd6be9b47 100644
--- a/surfsense_backend/app/routes/__init__.py
+++ b/surfsense_backend/app/routes/__init__.py
@@ -5,6 +5,7 @@ from .podcasts_routes import router as podcasts_router
from .chats_routes import router as chats_router
from .search_source_connectors_routes import router as search_source_connectors_router
from .llm_config_routes import router as llm_config_router
+from .logs_routes import router as logs_router
router = APIRouter()
@@ -14,3 +15,4 @@ router.include_router(podcasts_router)
router.include_router(chats_router)
router.include_router(search_source_connectors_router)
router.include_router(llm_config_router)
+router.include_router(logs_router)
diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py
index 9ca59c096..7d30d9ed5 100644
--- a/surfsense_backend/app/routes/documents_routes.py
+++ b/surfsense_backend/app/routes/documents_routes.py
@@ -135,11 +135,19 @@ async def process_file_in_background(
filename: str,
search_space_id: int,
user_id: str,
- session: AsyncSession
+ session: AsyncSession,
+ task_logger: 'TaskLoggingService',
+ log_entry: 'Log'
):
try:
# Check if the file is a markdown or text file
if filename.lower().endswith(('.md', '.markdown', '.txt')):
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing markdown/text file: {filename}",
+ {"file_type": "markdown", "processing_stage": "reading_file"}
+ )
+
# For markdown files, read the content directly
with open(file_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
@@ -151,16 +159,42 @@ async def process_file_in_background(
except:
pass
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating document from markdown content: {filename}",
+ {"processing_stage": "creating_document", "content_length": len(markdown_content)}
+ )
+
# Process markdown directly through specialized function
- await add_received_markdown_file_document(
+ result = await add_received_markdown_file_document(
session,
filename,
markdown_content,
search_space_id,
user_id
)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed markdown file: {filename}",
+ {"document_id": result.id, "content_hash": result.content_hash, "file_type": "markdown"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Markdown file already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "markdown"}
+ )
+
# Check if the file is an audio file
elif filename.lower().endswith(('.mp3', '.mp4', '.mpeg', '.mpga', '.m4a', '.wav', '.webm')):
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing audio file for transcription: {filename}",
+ {"file_type": "audio", "processing_stage": "starting_transcription"}
+ )
+
# Open the audio file for transcription
with open(file_path, "rb") as audio_file:
# Use LiteLLM for audio transcription
@@ -184,6 +218,12 @@ async def process_file_in_background(
# Add metadata about the transcription
transcribed_text = f"# Transcription of {filename}\n\n{transcribed_text}"
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Transcription completed, creating document: {filename}",
+ {"processing_stage": "transcription_complete", "transcript_length": len(transcribed_text)}
+ )
+
# Clean up the temp file
try:
os.unlink(file_path)
@@ -191,15 +231,35 @@ async def process_file_in_background(
pass
# Process transcription as markdown document
- await add_received_markdown_file_document(
+ result = await add_received_markdown_file_document(
session,
filename,
transcribed_text,
search_space_id,
user_id
)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully transcribed and processed audio file: {filename}",
+ {"document_id": result.id, "content_hash": result.content_hash, "file_type": "audio", "transcript_length": len(transcribed_text)}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Audio file transcript already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "audio"}
+ )
+
else:
if app_config.ETL_SERVICE == "UNSTRUCTURED":
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing file with Unstructured ETL: {filename}",
+ {"file_type": "document", "etl_service": "UNSTRUCTURED", "processing_stage": "loading"}
+ )
+
from langchain_unstructured import UnstructuredLoader
# Process the file
@@ -215,6 +275,12 @@ async def process_file_in_background(
docs = await loader.aload()
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Unstructured ETL completed, creating document: {filename}",
+ {"processing_stage": "etl_complete", "elements_count": len(docs)}
+ )
+
# Clean up the temp file
import os
try:
@@ -223,14 +289,34 @@ async def process_file_in_background(
pass
# Pass the documents to the existing background task
- await add_received_file_document_using_unstructured(
+ result = await add_received_file_document_using_unstructured(
session,
filename,
docs,
search_space_id,
user_id
)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed file with Unstructured: {filename}",
+ {"document_id": result.id, "content_hash": result.content_hash, "file_type": "document", "etl_service": "UNSTRUCTURED"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "document", "etl_service": "UNSTRUCTURED"}
+ )
+
elif app_config.ETL_SERVICE == "LLAMACLOUD":
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing file with LlamaCloud ETL: {filename}",
+ {"file_type": "document", "etl_service": "LLAMACLOUD", "processing_stage": "parsing"}
+ )
+
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
@@ -257,21 +343,48 @@ async def process_file_in_background(
# Get markdown documents from the result
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
+ await task_logger.log_task_progress(
+ log_entry,
+ f"LlamaCloud parsing completed, creating documents: {filename}",
+ {"processing_stage": "parsing_complete", "documents_count": len(markdown_documents)}
+ )
+
for doc in markdown_documents:
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
- await add_received_file_document_using_llamacloud(
+ doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id
)
+
+ if doc_result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed file with LlamaCloud: {filename}",
+ {"document_id": doc_result.id, "content_hash": doc_result.content_hash, "file_type": "document", "etl_service": "LLAMACLOUD"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "document", "etl_service": "LLAMACLOUD"}
+ )
+
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process file: {filename}",
+ str(e),
+ {"error_type": type(e).__name__, "filename": filename}
+ )
import logging
logging.error(f"Error processing file in background: {str(e)}")
+ raise # Re-raise so the wrapper can also handle it
@router.get("/documents/", response_model=List[DocumentRead])
@@ -442,11 +555,47 @@ async def process_extension_document_with_new_session(
):
"""Create a new session and process extension document."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_extension_document",
+ source="document_processor",
+ message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}",
+ metadata={
+ "document_type": "EXTENSION",
+ "url": individual_document.metadata.VisitedWebPageURL,
+ "title": individual_document.metadata.VisitedWebPageTitle,
+ "user_id": user_id
+ }
+ )
+
try:
- await add_extension_received_document(session, individual_document, search_space_id, user_id)
+ result = await add_extension_received_document(session, individual_document, search_space_id, user_id)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
+ {"document_id": result.id, "content_hash": result.content_hash}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
+ {"duplicate_detected": True}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
import logging
logging.error(f"Error processing extension document: {str(e)}")
@@ -458,11 +607,46 @@ async def process_crawled_url_with_new_session(
):
"""Create a new session and process crawled URL."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_crawled_url",
+ source="document_processor",
+ message=f"Starting URL crawling and processing for: {url}",
+ metadata={
+ "document_type": "CRAWLED_URL",
+ "url": url,
+ "user_id": user_id
+ }
+ )
+
try:
- await add_crawled_url_document(session, url, search_space_id, user_id)
+ result = await add_crawled_url_document(session, url, search_space_id, user_id)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully crawled and processed URL: {url}",
+ {"document_id": result.id, "title": result.title, "content_hash": result.content_hash}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"URL document already exists (duplicate): {url}",
+ {"duplicate_detected": True}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to crawl URL: {url}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
import logging
logging.error(f"Error processing crawled URL: {str(e)}")
@@ -475,9 +659,38 @@ async def process_file_in_background_with_new_session(
):
"""Create a new session and process file."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
- await process_file_in_background(file_path, filename, search_space_id, user_id, session)
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_file_upload",
+ source="document_processor",
+ message=f"Starting file processing for: {filename}",
+ metadata={
+ "document_type": "FILE",
+ "filename": filename,
+ "file_path": file_path,
+ "user_id": user_id
+ }
+ )
+
+ try:
+ await process_file_in_background(file_path, filename, search_space_id, user_id, session, task_logger, log_entry)
+
+ # Note: success/failure logging is handled within process_file_in_background
+ except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process file: {filename}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
+ import logging
+ logging.error(f"Error processing file: {str(e)}")
async def process_youtube_video_with_new_session(
@@ -487,11 +700,46 @@ async def process_youtube_video_with_new_session(
):
"""Create a new session and process YouTube video."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_youtube_video",
+ source="document_processor",
+ message=f"Starting YouTube video processing for: {url}",
+ metadata={
+ "document_type": "YOUTUBE_VIDEO",
+ "url": url,
+ "user_id": user_id
+ }
+ )
+
try:
- await add_youtube_video_document(session, url, search_space_id, user_id)
+ result = await add_youtube_video_document(session, url, search_space_id, user_id)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed YouTube video: {result.title}",
+ {"document_id": result.id, "video_id": result.document_metadata.get("video_id"), "content_hash": result.content_hash}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"YouTube video document already exists (duplicate): {url}",
+ {"duplicate_detected": True}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process YouTube video: {url}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
import logging
logging.error(f"Error processing YouTube video: {str(e)}")
diff --git a/surfsense_backend/app/routes/logs_routes.py b/surfsense_backend/app/routes/logs_routes.py
new file mode 100644
index 000000000..65e33ec9e
--- /dev/null
+++ b/surfsense_backend/app/routes/logs_routes.py
@@ -0,0 +1,280 @@
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
+from sqlalchemy import and_, desc
+from typing import List, Optional
+from datetime import datetime, timedelta
+
+from app.db import get_async_session, User, SearchSpace, Log, LogLevel, LogStatus
+from app.schemas import LogCreate, LogUpdate, LogRead, LogFilter
+from app.users import current_active_user
+from app.utils.check_ownership import check_ownership
+
+router = APIRouter()
+
+@router.post("/logs/", response_model=LogRead)
+async def create_log(
+ log: LogCreate,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Create a new log entry."""
+ try:
+ # Check if the user owns the search space
+ await check_ownership(session, SearchSpace, log.search_space_id, user)
+
+ db_log = Log(**log.model_dump())
+ session.add(db_log)
+ await session.commit()
+ await session.refresh(db_log)
+ return db_log
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to create log: {str(e)}"
+ )
+
+@router.get("/logs/", response_model=List[LogRead])
+async def read_logs(
+ skip: int = 0,
+ limit: int = 100,
+ search_space_id: Optional[int] = None,
+ level: Optional[LogLevel] = None,
+ status: Optional[LogStatus] = None,
+ source: Optional[str] = None,
+ start_date: Optional[datetime] = None,
+ end_date: Optional[datetime] = None,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Get logs with optional filtering."""
+ try:
+ # Build base query - only logs from user's search spaces
+ query = (
+ select(Log)
+ .join(SearchSpace)
+ .filter(SearchSpace.user_id == user.id)
+ .order_by(desc(Log.created_at)) # Most recent first
+ )
+
+ # Apply filters
+ filters = []
+
+ if search_space_id is not None:
+ await check_ownership(session, SearchSpace, search_space_id, user)
+ filters.append(Log.search_space_id == search_space_id)
+
+ if level is not None:
+ filters.append(Log.level == level)
+
+ if status is not None:
+ filters.append(Log.status == status)
+
+ if source is not None:
+ filters.append(Log.source.ilike(f"%{source}%"))
+
+ if start_date is not None:
+ filters.append(Log.created_at >= start_date)
+
+ if end_date is not None:
+ filters.append(Log.created_at <= end_date)
+
+ if filters:
+ query = query.filter(and_(*filters))
+
+ # Apply pagination
+ result = await session.execute(query.offset(skip).limit(limit))
+ return result.scalars().all()
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to fetch logs: {str(e)}"
+ )
+
+@router.get("/logs/{log_id}", response_model=LogRead)
+async def read_log(
+ log_id: int,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Get a specific log by ID."""
+ try:
+ # Get log and verify user owns the search space
+ result = await session.execute(
+ select(Log)
+ .join(SearchSpace)
+ .filter(Log.id == log_id, SearchSpace.user_id == user.id)
+ )
+ log = result.scalars().first()
+
+ if not log:
+ raise HTTPException(status_code=404, detail="Log not found")
+
+ return log
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to fetch log: {str(e)}"
+ )
+
+@router.put("/logs/{log_id}", response_model=LogRead)
+async def update_log(
+ log_id: int,
+ log_update: LogUpdate,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Update a log entry."""
+ try:
+ # Get log and verify user owns the search space
+ result = await session.execute(
+ select(Log)
+ .join(SearchSpace)
+ .filter(Log.id == log_id, SearchSpace.user_id == user.id)
+ )
+ db_log = result.scalars().first()
+
+ if not db_log:
+ raise HTTPException(status_code=404, detail="Log not found")
+
+ # Update only provided fields
+ update_data = log_update.model_dump(exclude_unset=True)
+ for field, value in update_data.items():
+ setattr(db_log, field, value)
+
+ await session.commit()
+ await session.refresh(db_log)
+ return db_log
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to update log: {str(e)}"
+ )
+
+@router.delete("/logs/{log_id}")
+async def delete_log(
+ log_id: int,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Delete a log entry."""
+ try:
+ # Get log and verify user owns the search space
+ result = await session.execute(
+ select(Log)
+ .join(SearchSpace)
+ .filter(Log.id == log_id, SearchSpace.user_id == user.id)
+ )
+ db_log = result.scalars().first()
+
+ if not db_log:
+ raise HTTPException(status_code=404, detail="Log not found")
+
+ await session.delete(db_log)
+ await session.commit()
+ return {"message": "Log deleted successfully"}
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to delete log: {str(e)}"
+ )
+
+@router.get("/logs/search-space/{search_space_id}/summary")
+async def get_logs_summary(
+ search_space_id: int,
+ hours: int = 24,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Get a summary of logs for a search space in the last X hours."""
+ try:
+ # Check ownership
+ await check_ownership(session, SearchSpace, search_space_id, user)
+
+ # Calculate time window
+ since = datetime.utcnow().replace(microsecond=0) - timedelta(hours=hours)
+
+ # Get logs from the time window
+ result = await session.execute(
+ select(Log)
+ .filter(
+ and_(
+ Log.search_space_id == search_space_id,
+ Log.created_at >= since
+ )
+ )
+ .order_by(desc(Log.created_at))
+ )
+ logs = result.scalars().all()
+
+ # Create summary
+ summary = {
+ "total_logs": len(logs),
+ "time_window_hours": hours,
+ "by_status": {},
+ "by_level": {},
+ "by_source": {},
+ "active_tasks": [],
+ "recent_failures": []
+ }
+
+ # Count by status and level
+ for log in logs:
+ # Status counts
+ status_str = log.status.value
+ summary["by_status"][status_str] = summary["by_status"].get(status_str, 0) + 1
+
+ # Level counts
+ level_str = log.level.value
+ summary["by_level"][level_str] = summary["by_level"].get(level_str, 0) + 1
+
+ # Source counts
+ if log.source:
+ summary["by_source"][log.source] = summary["by_source"].get(log.source, 0) + 1
+
+ # Active tasks (IN_PROGRESS)
+ if log.status == LogStatus.IN_PROGRESS:
+ task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown"
+ summary["active_tasks"].append({
+ "id": log.id,
+ "task_name": task_name,
+ "message": log.message,
+ "started_at": log.created_at,
+ "source": log.source
+ })
+
+ # Recent failures
+ if log.status == LogStatus.FAILED and len(summary["recent_failures"]) < 10:
+ task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown"
+ summary["recent_failures"].append({
+ "id": log.id,
+ "task_name": task_name,
+ "message": log.message,
+ "failed_at": log.created_at,
+ "source": log.source,
+ "error_details": log.log_metadata.get("error_details") if log.log_metadata else None
+ })
+
+ return summary
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to generate logs summary: {str(e)}"
+ )
\ No newline at end of file
diff --git a/surfsense_backend/app/schemas/__init__.py b/surfsense_backend/app/schemas/__init__.py
index f62172ad8..89525c92f 100644
--- a/surfsense_backend/app/schemas/__init__.py
+++ b/surfsense_backend/app/schemas/__init__.py
@@ -14,6 +14,7 @@ from .podcasts import PodcastBase, PodcastCreate, PodcastUpdate, PodcastRead, Po
from .chats import ChatBase, ChatCreate, ChatUpdate, ChatRead, AISDKChatRequest
from .search_source_connector import SearchSourceConnectorBase, SearchSourceConnectorCreate, SearchSourceConnectorUpdate, SearchSourceConnectorRead
from .llm_config import LLMConfigBase, LLMConfigCreate, LLMConfigUpdate, LLMConfigRead
+from .logs import LogBase, LogCreate, LogUpdate, LogRead, LogFilter
__all__ = [
"AISDKChatRequest",
@@ -53,4 +54,9 @@ __all__ = [
"LLMConfigCreate",
"LLMConfigUpdate",
"LLMConfigRead",
+ "LogBase",
+ "LogCreate",
+ "LogUpdate",
+ "LogRead",
+ "LogFilter",
]
\ No newline at end of file
diff --git a/surfsense_backend/app/schemas/logs.py b/surfsense_backend/app/schemas/logs.py
new file mode 100644
index 000000000..1d9d7e70b
--- /dev/null
+++ b/surfsense_backend/app/schemas/logs.py
@@ -0,0 +1,44 @@
+from datetime import datetime
+from typing import Optional, Dict, Any
+from pydantic import BaseModel, ConfigDict
+from .base import IDModel, TimestampModel
+from app.db import LogLevel, LogStatus
+
+class LogBase(BaseModel):
+ level: LogLevel
+ status: LogStatus
+ message: str
+ source: Optional[str] = None
+ log_metadata: Optional[Dict[str, Any]] = None
+
+class LogCreate(BaseModel):
+ level: LogLevel
+ status: LogStatus
+ message: str
+ source: Optional[str] = None
+ log_metadata: Optional[Dict[str, Any]] = None
+ search_space_id: int
+
+class LogUpdate(BaseModel):
+ level: Optional[LogLevel] = None
+ status: Optional[LogStatus] = None
+ message: Optional[str] = None
+ source: Optional[str] = None
+ log_metadata: Optional[Dict[str, Any]] = None
+
+class LogRead(LogBase, IDModel, TimestampModel):
+ id: int
+ created_at: datetime
+ search_space_id: int
+
+ model_config = ConfigDict(from_attributes=True)
+
+class LogFilter(BaseModel):
+ level: Optional[LogLevel] = None
+ status: Optional[LogStatus] = None
+ source: Optional[str] = None
+ search_space_id: Optional[int] = None
+ start_date: Optional[datetime] = None
+ end_date: Optional[datetime] = None
+
+ model_config = ConfigDict(from_attributes=True)
\ No newline at end of file
diff --git a/surfsense_backend/app/services/task_logging_service.py b/surfsense_backend/app/services/task_logging_service.py
new file mode 100644
index 000000000..c50e42047
--- /dev/null
+++ b/surfsense_backend/app/services/task_logging_service.py
@@ -0,0 +1,204 @@
+from typing import Optional, Dict, Any
+from sqlalchemy.ext.asyncio import AsyncSession
+from app.db import Log, LogLevel, LogStatus
+import logging
+import json
+from datetime import datetime
+
+logger = logging.getLogger(__name__)
+
+class TaskLoggingService:
+ """Service for logging background tasks using the database Log model"""
+
+ def __init__(self, session: AsyncSession, search_space_id: int):
+ self.session = session
+ self.search_space_id = search_space_id
+
+ async def log_task_start(
+ self,
+ task_name: str,
+ source: str,
+ message: str,
+ metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Log the start of a task with IN_PROGRESS status
+
+ Args:
+ task_name: Name/identifier of the task
+ source: Source service/component (e.g., 'document_processor', 'slack_indexer')
+ message: Human-readable message about the task
+ metadata: Additional context data
+
+ Returns:
+ Log: The created log entry
+ """
+ log_metadata = metadata or {}
+ log_metadata.update({
+ "task_name": task_name,
+ "started_at": datetime.utcnow().isoformat()
+ })
+
+ log_entry = Log(
+ level=LogLevel.INFO,
+ status=LogStatus.IN_PROGRESS,
+ message=message,
+ source=source,
+ log_metadata=log_metadata,
+ search_space_id=self.search_space_id
+ )
+
+ self.session.add(log_entry)
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ logger.info(f"Started task {task_name}: {message}")
+ return log_entry
+
+ async def log_task_success(
+ self,
+ log_entry: Log,
+ message: str,
+ additional_metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Update a log entry to SUCCESS status
+
+ Args:
+ log_entry: The original log entry to update
+ message: Success message
+ additional_metadata: Additional metadata to merge
+
+ Returns:
+ Log: The updated log entry
+ """
+ # Update the existing log entry
+ log_entry.status = LogStatus.SUCCESS
+ log_entry.message = message
+
+ # Merge additional metadata
+ if additional_metadata:
+ if log_entry.log_metadata is None:
+ log_entry.log_metadata = {}
+ log_entry.log_metadata.update(additional_metadata)
+ log_entry.log_metadata["completed_at"] = datetime.utcnow().isoformat()
+
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
+ logger.info(f"Completed task {task_name}: {message}")
+ return log_entry
+
+ async def log_task_failure(
+ self,
+ log_entry: Log,
+ error_message: str,
+ error_details: Optional[str] = None,
+ additional_metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Update a log entry to FAILED status
+
+ Args:
+ log_entry: The original log entry to update
+ error_message: Error message
+ error_details: Detailed error information
+ additional_metadata: Additional metadata to merge
+
+ Returns:
+ Log: The updated log entry
+ """
+ # Update the existing log entry
+ log_entry.status = LogStatus.FAILED
+ log_entry.level = LogLevel.ERROR
+ log_entry.message = error_message
+
+ # Merge additional metadata
+ if log_entry.log_metadata is None:
+ log_entry.log_metadata = {}
+
+ log_entry.log_metadata.update({
+ "failed_at": datetime.utcnow().isoformat(),
+ "error_details": error_details
+ })
+
+ if additional_metadata:
+ log_entry.log_metadata.update(additional_metadata)
+
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
+ logger.error(f"Failed task {task_name}: {error_message}")
+ if error_details:
+ logger.error(f"Error details: {error_details}")
+
+ return log_entry
+
+ async def log_task_progress(
+ self,
+ log_entry: Log,
+ progress_message: str,
+ progress_metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Update a log entry with progress information while keeping IN_PROGRESS status
+
+ Args:
+ log_entry: The log entry to update
+ progress_message: Progress update message
+ progress_metadata: Additional progress metadata
+
+ Returns:
+ Log: The updated log entry
+ """
+ log_entry.message = progress_message
+
+ if progress_metadata:
+ if log_entry.log_metadata is None:
+ log_entry.log_metadata = {}
+ log_entry.log_metadata.update(progress_metadata)
+ log_entry.log_metadata["last_progress_update"] = datetime.utcnow().isoformat()
+
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
+ logger.info(f"Progress update for task {task_name}: {progress_message}")
+ return log_entry
+
+ async def log_simple_event(
+ self,
+ level: LogLevel,
+ source: str,
+ message: str,
+ metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Log a simple event (not a long-running task)
+
+ Args:
+ level: Log level
+ source: Source service/component
+ message: Log message
+ metadata: Additional context data
+
+ Returns:
+ Log: The created log entry
+ """
+ log_entry = Log(
+ level=level,
+ status=LogStatus.SUCCESS, # Simple events are immediately complete
+ message=message,
+ source=source,
+ log_metadata=metadata or {},
+ search_space_id=self.search_space_id
+ )
+
+ self.session.add(log_entry)
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ logger.info(f"Logged event from {source}: {message}")
+ return log_entry
\ No newline at end of file
diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py
index 53347b23c..f18890f0f 100644
--- a/surfsense_backend/app/tasks/background_tasks.py
+++ b/surfsense_backend/app/tasks/background_tasks.py
@@ -8,6 +8,7 @@ from app.config import config
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.utils.document_converters import convert_document_to_markdown, generate_content_hash
from app.services.llm_service import get_user_long_context_llm
+from app.services.task_logging_service import TaskLoggingService
from langchain_core.documents import Document as LangChainDocument
from langchain_community.document_loaders import FireCrawlLoader, AsyncChromiumLoader
from langchain_community.document_transformers import MarkdownifyTransformer
@@ -22,10 +23,34 @@ md = MarkdownifyTransformer()
async def add_crawled_url_document(
session: AsyncSession, url: str, search_space_id: int, user_id: str
) -> Optional[Document]:
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="crawl_url_document",
+ source="background_task",
+ message=f"Starting URL crawling process for: {url}",
+ metadata={"url": url, "user_id": user_id}
+ )
+
try:
+ # URL validation step
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Validating URL: {url}",
+ {"stage": "validation"}
+ )
+
if not validators.url(url):
raise ValueError(f"Url {url} is not a valid URL address")
+ # Set up crawler
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Setting up crawler for URL: {url}",
+ {"stage": "crawler_setup", "firecrawl_available": bool(config.FIRECRAWL_API_KEY)}
+ )
+
if config.FIRECRAWL_API_KEY:
crawl_loader = FireCrawlLoader(
url=url,
@@ -39,6 +64,13 @@ async def add_crawled_url_document(
else:
crawl_loader = AsyncChromiumLoader(urls=[url], headless=True)
+ # Perform crawling
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Crawling URL content: {url}",
+ {"stage": "crawling", "crawler_type": type(crawl_loader).__name__}
+ )
+
url_crawled = await crawl_loader.aload()
if type(crawl_loader) == FireCrawlLoader:
@@ -46,6 +78,13 @@ async def add_crawled_url_document(
elif type(crawl_loader) == AsyncChromiumLoader:
content_in_markdown = md.transform_documents(url_crawled)[0].page_content
+ # Format document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing crawled content from: {url}",
+ {"stage": "content_processing", "content_length": len(content_in_markdown)}
+ )
+
# Format document metadata in a more maintainable way
metadata_sections = [
(
@@ -74,6 +113,13 @@ async def add_crawled_url_document(
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
+ # Check for duplicates
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Checking for duplicate content: {url}",
+ {"stage": "duplicate_check", "content_hash": content_hash}
+ )
+
# Check if document with this content hash already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
@@ -81,15 +127,33 @@ async def add_crawled_url_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists for URL: {url}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
+ # Get LLM for summary generation
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Preparing for summary generation: {url}",
+ {"stage": "llm_setup"}
+ )
+
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
raise RuntimeError(f"No long context LLM configured for user {user_id}")
# Generate summary
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Generating summary for URL content: {url}",
+ {"stage": "summary_generation"}
+ )
+
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
summary_result = await summary_chain.ainvoke(
{"document": combined_document_string}
@@ -98,6 +162,12 @@ async def add_crawled_url_document(
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing content chunks for URL: {url}",
+ {"stage": "chunk_processing"}
+ )
+
chunks = [
Chunk(
content=chunk.text,
@@ -107,6 +177,12 @@ async def add_crawled_url_document(
]
# Create and store document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating document in database for URL: {url}",
+ {"stage": "document_creation", "chunks_count": len(chunks)}
+ )
+
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
@@ -124,13 +200,38 @@ async def add_crawled_url_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully crawled and processed URL: {url}",
+ {
+ "document_id": document.id,
+ "title": document.title,
+ "content_hash": content_hash,
+ "chunks_count": len(chunks),
+ "summary_length": len(summary_content)
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error while processing URL: {url}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to crawl URL: {url}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
raise RuntimeError(f"Failed to crawl URL: {str(e)}")
@@ -148,6 +249,20 @@ async def add_extension_received_document(
Returns:
Document object if successful, None if failed
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="extension_document",
+ source="background_task",
+ message=f"Processing extension document: {content.metadata.VisitedWebPageTitle}",
+ metadata={
+ "url": content.metadata.VisitedWebPageURL,
+ "title": content.metadata.VisitedWebPageTitle,
+ "user_id": user_id
+ }
+ )
+
try:
# Format document metadata in a more maintainable way
metadata_sections = [
@@ -188,6 +303,11 @@ async def add_extension_received_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Extension document already exists: {content.metadata.VisitedWebPageTitle}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
@@ -229,19 +349,52 @@ async def add_extension_received_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed extension document: {content.metadata.VisitedWebPageTitle}",
+ {
+ "document_id": document.id,
+ "content_hash": content_hash,
+ "url": content.metadata.VisitedWebPageURL
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error processing extension document: {content.metadata.VisitedWebPageTitle}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process extension document: {content.metadata.VisitedWebPageTitle}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
raise RuntimeError(f"Failed to process extension document: {str(e)}")
async def add_received_markdown_file_document(
session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int, user_id: str
) -> Optional[Document]:
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="markdown_file_document",
+ source="background_task",
+ message=f"Processing markdown file: {file_name}",
+ metadata={"filename": file_name, "user_id": user_id, "content_length": len(file_in_markdown)}
+ )
+
try:
content_hash = generate_content_hash(file_in_markdown, search_space_id)
@@ -252,6 +405,11 @@ async def add_received_markdown_file_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Markdown file document already exists: {file_name}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
@@ -293,12 +451,36 @@ async def add_received_markdown_file_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed markdown file: {file_name}",
+ {
+ "document_id": document.id,
+ "content_hash": content_hash,
+ "chunks_count": len(chunks),
+ "summary_length": len(summary_content)
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error processing markdown file: {file_name}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process markdown file: {file_name}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
raise RuntimeError(f"Failed to process file document: {str(e)}")
@@ -478,8 +660,24 @@ async def add_youtube_video_document(
SQLAlchemyError: If there's a database error
RuntimeError: If the video processing fails
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="youtube_video_document",
+ source="background_task",
+ message=f"Starting YouTube video processing for: {url}",
+ metadata={"url": url, "user_id": user_id}
+ )
+
try:
# Extract video ID from URL
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Extracting video ID from URL: {url}",
+ {"stage": "video_id_extraction"}
+ )
+
def get_youtube_video_id(url: str):
parsed_url = urlparse(url)
hostname = parsed_url.hostname
@@ -501,7 +699,19 @@ async def add_youtube_video_document(
if not video_id:
raise ValueError(f"Could not extract video ID from URL: {url}")
- # Get video metadata using async HTTP client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Video ID extracted: {video_id}",
+ {"stage": "video_id_extracted", "video_id": video_id}
+ )
+
+ # Get video metadata
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching video metadata for: {video_id}",
+ {"stage": "metadata_fetch"}
+ )
+
params = {
"format": "json",
"url": f"https://www.youtube.com/watch?v={video_id}",
@@ -512,7 +722,19 @@ async def add_youtube_video_document(
async with http_session.get(oembed_url, params=params) as response:
video_data = await response.json()
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Video metadata fetched: {video_data.get('title', 'Unknown')}",
+ {"stage": "metadata_fetched", "title": video_data.get('title'), "author": video_data.get('author_name')}
+ )
+
# Get video transcript
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching transcript for video: {video_id}",
+ {"stage": "transcript_fetch"}
+ )
+
try:
captions = YouTubeTranscriptApi.get_transcript(video_id)
# Include complete caption information with timestamps
@@ -524,8 +746,26 @@ async def add_youtube_video_document(
timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]"
transcript_segments.append(f"{timestamp} {text}")
transcript_text = "\n".join(transcript_segments)
+
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Transcript fetched successfully: {len(captions)} segments",
+ {"stage": "transcript_fetched", "segments_count": len(captions), "transcript_length": len(transcript_text)}
+ )
except Exception as e:
transcript_text = f"No captions available for this video. Error: {str(e)}"
+ await task_logger.log_task_progress(
+ log_entry,
+ f"No transcript available for video: {video_id}",
+ {"stage": "transcript_unavailable", "error": str(e)}
+ )
+
+ # Format document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing video content: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "content_processing"}
+ )
# Format document metadata in a more maintainable way
metadata_sections = [
@@ -558,6 +798,13 @@ async def add_youtube_video_document(
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
+ # Check for duplicates
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Checking for duplicate video content: {video_id}",
+ {"stage": "duplicate_check", "content_hash": content_hash}
+ )
+
# Check if document with this content hash already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
@@ -565,15 +812,33 @@ async def add_youtube_video_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id, "video_id": video_id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
+ # Get LLM for summary generation
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "llm_setup"}
+ )
+
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
raise RuntimeError(f"No long context LLM configured for user {user_id}")
# Generate summary
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Generating summary for video: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "summary_generation"}
+ )
+
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
summary_result = await summary_chain.ainvoke(
{"document": combined_document_string}
@@ -582,6 +847,12 @@ async def add_youtube_video_document(
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing content chunks for video: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "chunk_processing"}
+ )
+
chunks = [
Chunk(
content=chunk.text,
@@ -591,6 +862,11 @@ async def add_youtube_video_document(
]
# Create document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "document_creation", "chunks_count": len(chunks)}
+ )
document = Document(
title=video_data.get("title", "YouTube Video"),
@@ -613,11 +889,38 @@ async def add_youtube_video_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed YouTube video: {video_data.get('title', 'YouTube Video')}",
+ {
+ "document_id": document.id,
+ "video_id": video_id,
+ "title": document.title,
+ "content_hash": content_hash,
+ "chunks_count": len(chunks),
+ "summary_length": len(summary_content),
+ "has_transcript": "No captions available" not in transcript_text
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error while processing YouTube video: {url}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError", "video_id": video_id if 'video_id' in locals() else None}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process YouTube video: {url}",
+ str(e),
+ {"error_type": type(e).__name__, "video_id": video_id if 'video_id' in locals() else None}
+ )
logging.error(f"Failed to process YouTube video: {str(e)}")
raise
From d821b30008ce53158c99b16da0e84efeb400ba47 Mon Sep 17 00:00:00 2001
From: "MSI\\ModSetter"
Date: Wed, 16 Jul 2025 01:19:22 -0700
Subject: [PATCH 02/16] feat(FRONTEND): Added Log Mangement UI
---
.../documents/upload/page.tsx | 10 +-
.../dashboard/[search_space_id]/layout.tsx | 15 +-
.../[search_space_id]/logs/(manage)/page.tsx | 1085 +++++++++++++++++
.../components/sidebar/app-sidebar.tsx | 4 +-
surfsense_web/hooks/index.ts | 3 +-
surfsense_web/hooks/use-logs.ts | 313 +++++
6 files changed, 1419 insertions(+), 11 deletions(-)
create mode 100644 surfsense_web/app/dashboard/[search_space_id]/logs/(manage)/page.tsx
create mode 100644 surfsense_web/hooks/use-logs.ts
diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/upload/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/upload/page.tsx
index 5f9f76f9f..e6f3ec99a 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/documents/upload/page.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/documents/upload/page.tsx
@@ -170,9 +170,9 @@ export default function FileUploader() {
formData.append('search_space_id', search_space_id)
try {
- toast("File Upload", {
- description: "Files Uploading Initiated",
- })
+ // toast("File Upload", {
+ // description: "Files Uploading Initiated",
+ // })
const response = await fetch(`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL!}/api/v1/documents/fileupload`, {
method: "POST",
@@ -188,8 +188,8 @@ export default function FileUploader() {
await response.json()
- toast("Upload Successful", {
- description: "Files Uploaded Successfully",
+ toast("Upload Task Initiated", {
+ description: "Files Uploading Initiated",
})
router.push(`/dashboard/${search_space_id}/documents`);
diff --git a/surfsense_web/app/dashboard/[search_space_id]/layout.tsx b/surfsense_web/app/dashboard/[search_space_id]/layout.tsx
index 93e9f2d76..ff077f5fc 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/layout.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/layout.tsx
@@ -43,10 +43,10 @@ export default function DashboardLayout({
title: "Upload Documents",
url: `/dashboard/${search_space_id}/documents/upload`,
},
- {
- title: "Add Webpages",
- url: `/dashboard/${search_space_id}/documents/webpage`,
- },
+ // { TODO: FIX THIS AND ADD IT BACK
+ // title: "Add Webpages",
+ // url: `/dashboard/${search_space_id}/documents/webpage`,
+ // },
{
title: "Add Youtube Videos",
url: `/dashboard/${search_space_id}/documents/youtube`,
@@ -78,6 +78,13 @@ export default function DashboardLayout({
icon: "Podcast",
items: [
],
+ },
+ {
+ title: "Logs",
+ url: `/dashboard/${search_space_id}/logs`,
+ icon: "FileText",
+ items: [
+ ],
}
]
diff --git a/surfsense_web/app/dashboard/[search_space_id]/logs/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/logs/(manage)/page.tsx
new file mode 100644
index 000000000..e43e03bce
--- /dev/null
+++ b/surfsense_web/app/dashboard/[search_space_id]/logs/(manage)/page.tsx
@@ -0,0 +1,1085 @@
+"use client";
+
+import {
+ AlertDialog,
+ AlertDialogAction,
+ AlertDialogCancel,
+ AlertDialogContent,
+ AlertDialogDescription,
+ AlertDialogFooter,
+ AlertDialogHeader,
+ AlertDialogTitle,
+ AlertDialogTrigger,
+} from "@/components/ui/alert-dialog";
+import { Button } from "@/components/ui/button";
+import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
+import { Checkbox } from "@/components/ui/checkbox";
+import {
+ DropdownMenu,
+ DropdownMenuCheckboxItem,
+ DropdownMenuContent,
+ DropdownMenuItem,
+ DropdownMenuLabel,
+ DropdownMenuSeparator,
+ DropdownMenuTrigger,
+} from "@/components/ui/dropdown-menu";
+import { Input } from "@/components/ui/input";
+import { Label } from "@/components/ui/label";
+import { Pagination, PaginationContent, PaginationItem } from "@/components/ui/pagination";
+import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover";
+import {
+ Select,
+ SelectContent,
+ SelectItem,
+ SelectTrigger,
+ SelectValue,
+} from "@/components/ui/select";
+import {
+ Table,
+ TableBody,
+ TableCell,
+ TableHead,
+ TableHeader,
+ TableRow,
+} from "@/components/ui/table";
+import { JsonMetadataViewer } from "@/components/json-metadata-viewer";
+import { useLogs, useLogsSummary, Log, LogLevel, LogStatus } from "@/hooks/use-logs";
+import { cn } from "@/lib/utils";
+import {
+ ColumnDef,
+ ColumnFiltersState,
+ PaginationState,
+ Row,
+ SortingState,
+ VisibilityState,
+ flexRender,
+ getCoreRowModel,
+ getFacetedUniqueValues,
+ getFilteredRowModel,
+ getPaginationRowModel,
+ getSortedRowModel,
+ useReactTable,
+} from "@tanstack/react-table";
+import { AnimatePresence, motion } from "framer-motion";
+import {
+ Activity,
+ AlertCircle,
+ AlertTriangle,
+ Bug,
+ CheckCircle2,
+ ChevronDown,
+ ChevronFirst,
+ ChevronLast,
+ ChevronLeft,
+ ChevronRight,
+ ChevronUp,
+ CircleAlert,
+ CircleX,
+ Clock,
+ Columns3,
+ Filter,
+ Info,
+ ListFilter,
+ MoreHorizontal,
+ RefreshCw,
+ Terminal,
+ Trash,
+ X,
+ Zap,
+} from "lucide-react";
+import { useParams } from "next/navigation";
+import React, { useContext, useEffect, useId, useMemo, useRef, useState } from "react";
+import { toast } from "sonner";
+
+// Define animation variants for reuse
+const fadeInScale = {
+ hidden: { opacity: 0, scale: 0.95 },
+ visible: {
+ opacity: 1,
+ scale: 1,
+ transition: { type: "spring", stiffness: 300, damping: 30 }
+ },
+ exit: {
+ opacity: 0,
+ scale: 0.95,
+ transition: { duration: 0.15 }
+ }
+};
+
+// Log level icons and colors
+const logLevelConfig = {
+ DEBUG: { icon: Bug, color: "text-muted-foreground", bgColor: "bg-muted/50" },
+ INFO: { icon: Info, color: "text-blue-600", bgColor: "bg-blue-50" },
+ WARNING: { icon: AlertTriangle, color: "text-yellow-600", bgColor: "bg-yellow-50" },
+ ERROR: { icon: AlertCircle, color: "text-red-600", bgColor: "bg-red-50" },
+ CRITICAL: { icon: Zap, color: "text-purple-600", bgColor: "bg-purple-50" },
+} as const;
+
+// Log status icons and colors
+const logStatusConfig = {
+ IN_PROGRESS: { icon: Clock, color: "text-blue-600", bgColor: "bg-blue-50" },
+ SUCCESS: { icon: CheckCircle2, color: "text-green-600", bgColor: "bg-green-50" },
+ FAILED: { icon: X, color: "text-red-600", bgColor: "bg-red-50" },
+} as const;
+
+const columns: ColumnDef[] = [
+ {
+ id: "select",
+ header: ({ table }) => (
+ table.toggleAllPageRowsSelected(!!value)}
+ aria-label="Select all"
+ />
+ ),
+ cell: ({ row }) => (
+ row.toggleSelected(!!value)}
+ aria-label="Select row"
+ />
+ ),
+ size: 28,
+ enableSorting: false,
+ enableHiding: false,
+ },
+ {
+ header: "Level",
+ accessorKey: "level",
+ cell: ({ row }) => {
+ const level = row.getValue("level") as LogLevel;
+ const config = logLevelConfig[level];
+ const Icon = config.icon;
+ return (
+
+
+
+
+
+ {level}
+
+
+ );
+ },
+ size: 120,
+ },
+ {
+ header: "Status",
+ accessorKey: "status",
+ cell: ({ row }) => {
+ const status = row.getValue("status") as LogStatus;
+ const config = logStatusConfig[status];
+ const Icon = config.icon;
+ return (
+
+
+
+
+
+ {status.replace('_', ' ')}
+
+
+ );
+ },
+ size: 140,
+ },
+ {
+ header: "Source",
+ accessorKey: "source",
+ cell: ({ row }) => {
+ const source = row.getValue("source") as string;
+ return (
+
+
+ {source || "System"}
+
+ );
+ },
+ size: 150,
+ },
+ {
+ header: "Message",
+ accessorKey: "message",
+ cell: ({ row }) => {
+ const message = row.getValue("message") as string;
+ const taskName = row.original.log_metadata?.task_name;
+
+ return (
+
+ {taskName && (
+
+ {taskName}
+
+ )}
+
+ {message.length > 100 ? `${message.substring(0, 100)}...` : message}
+
+
+ );
+ },
+ size: 400,
+ },
+ {
+ header: "Created At",
+ accessorKey: "created_at",
+ cell: ({ row }) => {
+ const date = new Date(row.getValue("created_at"));
+ return (
+
+
{date.toLocaleDateString()}
+
{date.toLocaleTimeString()}
+
+ );
+ },
+ size: 120,
+ },
+ {
+ id: "actions",
+ header: () => Actions ,
+ cell: ({ row }) => ,
+ size: 60,
+ enableHiding: false,
+ },
+];
+
+// Create a context to share functions
+const LogsContext = React.createContext<{
+ deleteLog: (id: number) => Promise;
+ refreshLogs: () => Promise;
+} | null>(null);
+
+export default function LogsManagePage() {
+ const id = useId();
+ const params = useParams();
+ const searchSpaceId = Number(params.search_space_id);
+
+ const { logs, loading: logsLoading, error: logsError, refreshLogs, deleteLog } = useLogs(searchSpaceId);
+ const { summary, loading: summaryLoading, error: summaryError, refreshSummary } = useLogsSummary(searchSpaceId, 24);
+
+ const [columnFilters, setColumnFilters] = useState([]);
+ const [columnVisibility, setColumnVisibility] = useState({});
+ const [pagination, setPagination] = useState({
+ pageIndex: 0,
+ pageSize: 20,
+ });
+ const [sorting, setSorting] = useState([
+ {
+ id: "created_at",
+ desc: true,
+ },
+ ]);
+
+ const inputRef = useRef(null);
+
+ const table = useReactTable({
+ data: logs,
+ columns,
+ getCoreRowModel: getCoreRowModel(),
+ getSortedRowModel: getSortedRowModel(),
+ onSortingChange: setSorting,
+ enableSortingRemoval: false,
+ getPaginationRowModel: getPaginationRowModel(),
+ onPaginationChange: setPagination,
+ onColumnFiltersChange: setColumnFilters,
+ onColumnVisibilityChange: setColumnVisibility,
+ getFilteredRowModel: getFilteredRowModel(),
+ getFacetedUniqueValues: getFacetedUniqueValues(),
+ state: {
+ sorting,
+ pagination,
+ columnFilters,
+ columnVisibility,
+ },
+ });
+
+ // Get unique values for filters
+ const uniqueLevels = useMemo(() => {
+ const levelColumn = table.getColumn("level");
+ if (!levelColumn) return [];
+ return Array.from(levelColumn.getFacetedUniqueValues().keys()).sort();
+ }, [table.getColumn("level")?.getFacetedUniqueValues()]);
+
+ const uniqueStatuses = useMemo(() => {
+ const statusColumn = table.getColumn("status");
+ if (!statusColumn) return [];
+ return Array.from(statusColumn.getFacetedUniqueValues().keys()).sort();
+ }, [table.getColumn("status")?.getFacetedUniqueValues()]);
+
+ const handleDeleteRows = async () => {
+ const selectedRows = table.getSelectedRowModel().rows;
+
+ if (selectedRows.length === 0) {
+ toast.error("No rows selected");
+ return;
+ }
+
+ const deletePromises = selectedRows.map(row => deleteLog(row.original.id));
+
+ try {
+ const results = await Promise.all(deletePromises);
+ const allSuccessful = results.every(result => result === true);
+
+ if (allSuccessful) {
+ toast.success(`Successfully deleted ${selectedRows.length} log(s)`);
+ } else {
+ toast.error("Some logs could not be deleted");
+ }
+
+ await refreshLogs();
+ table.resetRowSelection();
+ } catch (error: any) {
+ console.error("Error deleting logs:", error);
+ toast.error("Error deleting logs");
+ }
+ };
+
+ const handleRefresh = async () => {
+ await Promise.all([refreshLogs(), refreshSummary()]);
+ toast.success("Logs refreshed");
+ };
+
+ return (
+ Promise.resolve(false)),
+ refreshLogs: refreshLogs || (() => Promise.resolve())
+ }}>
+
+ {/* Summary Dashboard */}
+
+
+ {/* Logs Table Header */}
+
+
+
Task Logs
+
+ Monitor and analyze all task execution logs
+
+
+
+
+ Refresh
+
+
+
+ {/* Filters */}
+
+
+ {/* Delete Button */}
+ {table.getSelectedRowModel().rows.length > 0 && (
+
+
+
+
+
+ Delete Selected
+
+ {table.getSelectedRowModel().rows.length}
+
+
+
+
+
+
+
+
+
+ Are you absolutely sure?
+
+ This action cannot be undone. This will permanently delete{" "}
+ {table.getSelectedRowModel().rows.length} selected log(s).
+
+
+
+
+ Cancel
+ Delete
+
+
+
+
+ )}
+
+ {/* Logs Table */}
+
+
+
+ );
+}
+
+// Summary Dashboard Component
+function LogsSummaryDashboard({
+ summary,
+ loading,
+ error,
+ onRefresh
+}: {
+ summary: any;
+ loading: boolean;
+ error: string | null;
+ onRefresh: () => void;
+}) {
+ if (loading) {
+ return (
+
+ {[...Array(4)].map((_, i) => (
+
+
+
+
+
+
+
+
+ ))}
+
+ );
+ }
+
+ if (error || !summary) {
+ return (
+
+
+
+
+
Failed to load summary
+
+ Retry
+
+
+
+
+ );
+ }
+
+ return (
+
+ {/* Total Logs */}
+
+
+
+ Total Logs
+
+
+
+ {summary.total_logs}
+
+ Last {summary.time_window_hours} hours
+
+
+
+
+
+ {/* Active Tasks */}
+
+
+
+ Active Tasks
+
+
+
+
+ {summary.active_tasks?.length || 0}
+
+
+ Currently running
+
+
+
+
+
+ {/* Success Rate */}
+
+
+
+ Success Rate
+
+
+
+
+ {summary.total_logs > 0
+ ? Math.round(((summary.by_status?.SUCCESS || 0) / summary.total_logs) * 100)
+ : 0
+ }%
+
+
+ {summary.by_status?.SUCCESS || 0} successful
+
+
+
+
+
+ {/* Recent Failures */}
+
+
+
+ Recent Failures
+
+
+
+
+ {summary.recent_failures?.length || 0}
+
+
+ Need attention
+
+
+
+
+
+ );
+}
+
+// Filters Component
+function LogsFilters({
+ table,
+ uniqueLevels,
+ uniqueStatuses,
+ inputRef,
+ id
+}: {
+ table: any;
+ uniqueLevels: string[];
+ uniqueStatuses: string[];
+ inputRef: React.RefObject;
+ id: string;
+}) {
+ return (
+
+
+ {/* Search Input */}
+
+ table.getColumn("message")?.setFilterValue(e.target.value)}
+ placeholder="Filter by message..."
+ type="text"
+ />
+
+
+
+ {Boolean(table.getColumn("message")?.getFilterValue()) && (
+ {
+ table.getColumn("message")?.setFilterValue("");
+ inputRef.current?.focus();
+ }}
+ >
+
+
+ )}
+
+
+ {/* Level Filter */}
+
+
+ {/* Status Filter */}
+
+
+ {/* Column Visibility */}
+
+
+
+
+ View
+
+
+
+ Toggle columns
+ {table
+ .getAllColumns()
+ .filter((column: any) => column.getCanHide())
+ .map((column: any) => (
+ column.toggleVisibility(!!value)}
+ onSelect={(event) => event.preventDefault()}
+ >
+ {column.id}
+
+ ))}
+
+
+
+
+ );
+}
+
+// Filter Dropdown Component
+function FilterDropdown({
+ title,
+ column,
+ options,
+ id
+}: {
+ title: string;
+ column: any;
+ options: string[];
+ id: string;
+}) {
+ const selectedValues = useMemo(() => {
+ const filterValue = column?.getFilterValue() as string[];
+ return filterValue ?? [];
+ }, [column?.getFilterValue()]);
+
+ const handleValueChange = (checked: boolean, value: string) => {
+ const filterValue = column?.getFilterValue() as string[];
+ const newFilterValue = filterValue ? [...filterValue] : [];
+
+ if (checked) {
+ newFilterValue.push(value);
+ } else {
+ const index = newFilterValue.indexOf(value);
+ if (index > -1) {
+ newFilterValue.splice(index, 1);
+ }
+ }
+
+ column?.setFilterValue(newFilterValue.length ? newFilterValue : undefined);
+ };
+
+ return (
+
+
+
+
+ {title}
+ {selectedValues.length > 0 && (
+
+ {selectedValues.length}
+
+ )}
+
+
+
+
+
Filter by {title}
+
+ {options.map((value, i) => (
+
+ handleValueChange(checked, value)}
+ />
+
+ {value}
+
+
+ ))}
+
+
+
+
+ );
+}
+
+// Logs Table Component
+function LogsTable({
+ table,
+ logs,
+ loading,
+ error,
+ onRefresh,
+ id
+}: {
+ table: any;
+ logs: Log[];
+ loading: boolean;
+ error: string | null;
+ onRefresh: () => void;
+ id: string;
+}) {
+ if (loading) {
+ return (
+
+
+
+ );
+ }
+
+ if (error) {
+ return (
+
+
+
+
+
Error loading logs
+
+ Retry
+
+
+
+
+ );
+ }
+
+ if (logs.length === 0) {
+ return (
+
+
+
+ );
+ }
+
+ return (
+ <>
+
+
+
+ {table.getHeaderGroups().map((headerGroup: any) => (
+
+ {headerGroup.headers.map((header: any) => (
+
+ {header.isPlaceholder ? null : header.column.getCanSort() ? (
+
+ {flexRender(header.column.columnDef.header, header.getContext())}
+ {{
+ asc: ,
+ desc: ,
+ }[header.column.getIsSorted() as string] ?? null}
+
+ ) : (
+ flexRender(header.column.columnDef.header, header.getContext())
+ )}
+
+ ))}
+
+ ))}
+
+
+
+ {table.getRowModel().rows?.length ? (
+ table.getRowModel().rows.map((row: any, index: number) => (
+
+ {row.getVisibleCells().map((cell: any) => (
+
+ {flexRender(cell.column.columnDef.cell, cell.getContext())}
+
+ ))}
+
+ ))
+ ) : (
+
+
+ No logs found.
+
+
+ )}
+
+
+
+
+
+ {/* Pagination */}
+
+ >
+ );
+}
+
+// Pagination Component
+function LogsPagination({ table, id }: { table: any; id: string }) {
+ return (
+
+
+
+ Rows per page
+
+ table.setPageSize(Number(value))}
+ >
+
+
+
+
+ {[10, 20, 50, 100].map((pageSize) => (
+
+ {pageSize}
+
+ ))}
+
+
+
+
+
+
+
+ {table.getState().pagination.pageIndex * table.getState().pagination.pageSize + 1}-
+ {Math.min(
+ table.getState().pagination.pageIndex * table.getState().pagination.pageSize +
+ table.getState().pagination.pageSize,
+ table.getRowCount(),
+ )}
+ {" "}
+ of {table.getRowCount()}
+
+
+
+
+
+
+
+ table.firstPage()}
+ disabled={!table.getCanPreviousPage()}
+ >
+
+
+
+
+ table.previousPage()}
+ disabled={!table.getCanPreviousPage()}
+ >
+
+
+
+
+ table.nextPage()}
+ disabled={!table.getCanNextPage()}
+ >
+
+
+
+
+ table.lastPage()}
+ disabled={!table.getCanNextPage()}
+ >
+
+
+
+
+
+
+
+ );
+}
+
+// Row Actions Component
+function LogRowActions({ row }: { row: Row }) {
+ const [isOpen, setIsOpen] = useState(false);
+ const [isDeleting, setIsDeleting] = useState(false);
+ const { deleteLog, refreshLogs } = useContext(LogsContext)!;
+ const log = row.original;
+
+ const handleDelete = async () => {
+ setIsDeleting(true);
+ try {
+ await deleteLog(log.id);
+ toast.success("Log deleted successfully");
+ await refreshLogs();
+ } catch (error) {
+ console.error("Error deleting log:", error);
+ toast.error("Failed to delete log");
+ } finally {
+ setIsDeleting(false);
+ setIsOpen(false);
+ }
+ };
+
+ return (
+
+
+
+
+
+
+
+
+ e.preventDefault()}>
+ View Metadata
+
+ }
+ />
+
+
+
+ {
+ e.preventDefault();
+ setIsOpen(true);
+ }}
+ >
+ Delete
+
+
+
+
+ Are you sure?
+
+ This action cannot be undone. This will permanently delete the log entry.
+
+
+
+ Cancel
+
+ {isDeleting ? "Deleting..." : "Delete"}
+
+
+
+
+
+
+
+ );
+}
\ No newline at end of file
diff --git a/surfsense_web/components/sidebar/app-sidebar.tsx b/surfsense_web/components/sidebar/app-sidebar.tsx
index 62f8cf6b4..cd437f3b7 100644
--- a/surfsense_web/components/sidebar/app-sidebar.tsx
+++ b/surfsense_web/components/sidebar/app-sidebar.tsx
@@ -16,6 +16,7 @@ import {
Trash2,
Podcast,
type LucideIcon,
+ FileText,
} from "lucide-react"
import { Logo } from "@/components/Logo";
@@ -47,7 +48,8 @@ export const iconMap: Record = {
Info,
ExternalLink,
Trash2,
- Podcast
+ Podcast,
+ FileText
}
const defaultData = {
diff --git a/surfsense_web/hooks/index.ts b/surfsense_web/hooks/index.ts
index e12a86800..9ed390988 100644
--- a/surfsense_web/hooks/index.ts
+++ b/surfsense_web/hooks/index.ts
@@ -1 +1,2 @@
-export * from './useSearchSourceConnectors';
\ No newline at end of file
+export * from './useSearchSourceConnectors';
+export * from './use-logs';
\ No newline at end of file
diff --git a/surfsense_web/hooks/use-logs.ts b/surfsense_web/hooks/use-logs.ts
new file mode 100644
index 000000000..17bcc3293
--- /dev/null
+++ b/surfsense_web/hooks/use-logs.ts
@@ -0,0 +1,313 @@
+"use client"
+import { useState, useEffect, useCallback, useMemo } from 'react';
+import { toast } from 'sonner';
+
+export type LogLevel = "DEBUG" | "INFO" | "WARNING" | "ERROR" | "CRITICAL";
+export type LogStatus = "IN_PROGRESS" | "SUCCESS" | "FAILED";
+
+export interface Log {
+ id: number;
+ level: LogLevel;
+ status: LogStatus;
+ message: string;
+ source?: string;
+ log_metadata?: Record;
+ created_at: string;
+ search_space_id: number;
+}
+
+export interface LogFilters {
+ search_space_id?: number;
+ level?: LogLevel;
+ status?: LogStatus;
+ source?: string;
+ start_date?: string;
+ end_date?: string;
+}
+
+export interface LogSummary {
+ total_logs: number;
+ time_window_hours: number;
+ by_status: Record;
+ by_level: Record;
+ by_source: Record;
+ active_tasks: Array<{
+ id: number;
+ task_name: string;
+ message: string;
+ started_at: string;
+ source?: string;
+ }>;
+ recent_failures: Array<{
+ id: number;
+ task_name: string;
+ message: string;
+ failed_at: string;
+ source?: string;
+ error_details?: string;
+ }>;
+}
+
+export function useLogs(searchSpaceId?: number, filters: LogFilters = {}) {
+ const [logs, setLogs] = useState([]);
+ const [loading, setLoading] = useState(true);
+ const [error, setError] = useState(null);
+
+ // Memoize filters to prevent infinite re-renders
+ const memoizedFilters = useMemo(() => filters, [JSON.stringify(filters)]);
+
+ const buildQueryParams = useCallback((customFilters: LogFilters = {}) => {
+ const params = new URLSearchParams();
+
+ const allFilters = { ...memoizedFilters, ...customFilters };
+
+ if (allFilters.search_space_id) {
+ params.append('search_space_id', allFilters.search_space_id.toString());
+ }
+ if (allFilters.level) {
+ params.append('level', allFilters.level);
+ }
+ if (allFilters.status) {
+ params.append('status', allFilters.status);
+ }
+ if (allFilters.source) {
+ params.append('source', allFilters.source);
+ }
+ if (allFilters.start_date) {
+ params.append('start_date', allFilters.start_date);
+ }
+ if (allFilters.end_date) {
+ params.append('end_date', allFilters.end_date);
+ }
+
+ return params.toString();
+ }, [memoizedFilters]);
+
+ const fetchLogs = useCallback(async (customFilters: LogFilters = {}, options: { skip?: number; limit?: number } = {}) => {
+ try {
+ setLoading(true);
+
+ const params = new URLSearchParams(buildQueryParams(customFilters));
+ if (options.skip !== undefined) params.append('skip', options.skip.toString());
+ if (options.limit !== undefined) params.append('limit', options.limit.toString());
+
+ const response = await fetch(
+ `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/logs/?${params}`,
+ {
+ headers: {
+ Authorization: `Bearer ${localStorage.getItem('surfsense_bearer_token')}`,
+ },
+ method: "GET",
+ }
+ );
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}));
+ throw new Error(errorData.detail || "Failed to fetch logs");
+ }
+
+ const data = await response.json();
+ setLogs(data);
+ setError(null);
+ return data;
+ } catch (err: any) {
+ setError(err.message || 'Failed to fetch logs');
+ console.error('Error fetching logs:', err);
+ throw err;
+ } finally {
+ setLoading(false);
+ }
+ }, [buildQueryParams]);
+
+ // Initial fetch
+ useEffect(() => {
+ const initialFilters = searchSpaceId ? { ...memoizedFilters, search_space_id: searchSpaceId } : memoizedFilters;
+ fetchLogs(initialFilters);
+ }, [searchSpaceId, fetchLogs, memoizedFilters]);
+
+ // Function to refresh the logs list
+ const refreshLogs = useCallback(async (customFilters: LogFilters = {}) => {
+ const finalFilters = searchSpaceId ? { ...customFilters, search_space_id: searchSpaceId } : customFilters;
+ return await fetchLogs(finalFilters);
+ }, [searchSpaceId, fetchLogs]);
+
+ // Function to create a new log
+ const createLog = useCallback(async (logData: Omit) => {
+ try {
+ const response = await fetch(
+ `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/logs/`,
+ {
+ headers: {
+ 'Content-Type': 'application/json',
+ Authorization: `Bearer ${localStorage.getItem('surfsense_bearer_token')}`,
+ },
+ method: "POST",
+ body: JSON.stringify(logData),
+ }
+ );
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}));
+ throw new Error(errorData.detail || "Failed to create log");
+ }
+
+ const newLog = await response.json();
+ setLogs(prevLogs => [newLog, ...prevLogs]);
+ toast.success("Log created successfully");
+ return newLog;
+ } catch (err: any) {
+ toast.error(err.message || 'Failed to create log');
+ console.error('Error creating log:', err);
+ throw err;
+ }
+ }, []);
+
+ // Function to update a log
+ const updateLog = useCallback(async (logId: number, updateData: Partial>) => {
+ try {
+ const response = await fetch(
+ `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/logs/${logId}`,
+ {
+ headers: {
+ 'Content-Type': 'application/json',
+ Authorization: `Bearer ${localStorage.getItem('surfsense_bearer_token')}`,
+ },
+ method: "PUT",
+ body: JSON.stringify(updateData),
+ }
+ );
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}));
+ throw new Error(errorData.detail || "Failed to update log");
+ }
+
+ const updatedLog = await response.json();
+ setLogs(prevLogs =>
+ prevLogs.map(log => log.id === logId ? updatedLog : log)
+ );
+ toast.success("Log updated successfully");
+ return updatedLog;
+ } catch (err: any) {
+ toast.error(err.message || 'Failed to update log');
+ console.error('Error updating log:', err);
+ throw err;
+ }
+ }, []);
+
+ // Function to delete a log
+ const deleteLog = useCallback(async (logId: number) => {
+ try {
+ const response = await fetch(
+ `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/logs/${logId}`,
+ {
+ headers: {
+ Authorization: `Bearer ${localStorage.getItem('surfsense_bearer_token')}`,
+ },
+ method: "DELETE",
+ }
+ );
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}));
+ throw new Error(errorData.detail || "Failed to delete log");
+ }
+
+ setLogs(prevLogs => prevLogs.filter(log => log.id !== logId));
+ toast.success("Log deleted successfully");
+ return true;
+ } catch (err: any) {
+ toast.error(err.message || 'Failed to delete log');
+ console.error('Error deleting log:', err);
+ return false;
+ }
+ }, []);
+
+ // Function to get a single log
+ const getLog = useCallback(async (logId: number) => {
+ try {
+ const response = await fetch(
+ `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/logs/${logId}`,
+ {
+ headers: {
+ Authorization: `Bearer ${localStorage.getItem('surfsense_bearer_token')}`,
+ },
+ method: "GET",
+ }
+ );
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}));
+ throw new Error(errorData.detail || "Failed to fetch log");
+ }
+
+ return await response.json();
+ } catch (err: any) {
+ toast.error(err.message || 'Failed to fetch log');
+ console.error('Error fetching log:', err);
+ throw err;
+ }
+ }, []);
+
+ return {
+ logs,
+ loading,
+ error,
+ refreshLogs,
+ createLog,
+ updateLog,
+ deleteLog,
+ getLog,
+ fetchLogs
+ };
+}
+
+// Separate hook for log summary
+export function useLogsSummary(searchSpaceId: number, hours: number = 24) {
+ const [summary, setSummary] = useState(null);
+ const [loading, setLoading] = useState(true);
+ const [error, setError] = useState(null);
+
+ const fetchSummary = useCallback(async () => {
+ if (!searchSpaceId) return;
+
+ try {
+ setLoading(true);
+ const response = await fetch(
+ `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/logs/search-space/${searchSpaceId}/summary?hours=${hours}`,
+ {
+ headers: {
+ Authorization: `Bearer ${localStorage.getItem('surfsense_bearer_token')}`,
+ },
+ method: "GET",
+ }
+ );
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}));
+ throw new Error(errorData.detail || "Failed to fetch logs summary");
+ }
+
+ const data = await response.json();
+ setSummary(data);
+ setError(null);
+ return data;
+ } catch (err: any) {
+ setError(err.message || 'Failed to fetch logs summary');
+ console.error('Error fetching logs summary:', err);
+ throw err;
+ } finally {
+ setLoading(false);
+ }
+ }, [searchSpaceId, hours]);
+
+ useEffect(() => {
+ fetchSummary();
+ }, [fetchSummary]);
+
+ const refreshSummary = useCallback(() => {
+ return fetchSummary();
+ }, [fetchSummary]);
+
+ return { summary, loading, error, refreshSummary };
+}
\ No newline at end of file
From 3f62121cede5a3262a2d012d2d83de4fa4fc4d2c Mon Sep 17 00:00:00 2001
From: "MSI\\ModSetter"
Date: Thu, 17 Jul 2025 02:16:53 -0700
Subject: [PATCH 03/16] feat(BACKEND): Implement task logging for connector
indexing tasks
- Added TaskLoggingService to log the start, progress, success, and failure of indexing tasks for Slack, Notion, GitHub, Linear, and Discord connectors.
- Updated frontend to reflect changes in indexing status messages.
---
.../app/tasks/connectors_indexing_tasks.py | 400 +++++++++++++++++-
.../connectors/(manage)/page.tsx | 4 +-
2 files changed, 401 insertions(+), 3 deletions(-)
diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
index c25a7f4c2..6a972be1e 100644
--- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py
+++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
@@ -7,6 +7,7 @@ from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchS
from app.config import config
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.services.llm_service import get_user_long_context_llm
+from app.services.task_logging_service import TaskLoggingService
from app.connectors.slack_history import SlackHistory
from app.connectors.notion_history import NotionHistoryConnector
from app.connectors.github_connector import GitHubConnector
@@ -42,8 +43,24 @@ async def index_slack_messages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="slack_messages_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Slack messages indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Slack connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -54,17 +71,41 @@ async def index_slack_messages(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Slack connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Slack connector"
# Get the Slack token from the connector config
slack_token = connector.config.get("SLACK_BOT_TOKEN")
if not slack_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Slack token not found in connector config for connector {connector_id}",
+ "Missing Slack token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Slack token not found in connector config"
# Initialize Slack client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Slack client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
slack_client = SlackHistory(token=slack_token)
# Calculate date range
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Calculating date range for Slack indexing",
+ {"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date}
+ )
+
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
@@ -95,13 +136,30 @@ async def index_slack_messages(
logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Slack channels from {start_date_str} to {end_date_str}",
+ {"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str}
+ )
+
# Get all channels
try:
channels = slack_client.get_all_channels()
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to get Slack channels for connector {connector_id}",
+ str(e),
+ {"error_type": "ChannelFetchError"}
+ )
return 0, f"Failed to get Slack channels: {str(e)}"
if not channels:
+ await task_logger.log_task_success(
+ log_entry,
+ f"No Slack channels found for connector {connector_id}",
+ {"channels_found": 0}
+ )
return 0, "No Slack channels found"
# Track the number of documents indexed
@@ -109,6 +167,12 @@ async def index_slack_messages(
documents_skipped = 0
skipped_channels = []
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(channels)} Slack channels",
+ {"stage": "process_channels", "total_channels": len(channels)}
+ )
+
# Process each channel
for channel_obj in channels: # Modified loop to iterate over list of channel objects
channel_id = channel_obj["id"]
@@ -283,15 +347,40 @@ async def index_slack_messages(
else:
result_message = f"Processed {total_processed} channels."
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Slack indexing for connector {connector_id}",
+ {
+ "channels_processed": total_processed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_channels_count": len(skipped_channels),
+ "result_message": result_message
+ }
+ )
+
logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Slack indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error: {str(db_error)}")
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Slack messages for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Slack messages: {str(e)}")
return 0, f"Failed to index Slack messages: {str(e)}"
@@ -316,8 +405,24 @@ async def index_notion_pages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="notion_pages_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Notion pages indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Notion connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -328,14 +433,32 @@ async def index_notion_pages(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Notion connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Notion connector"
# Get the Notion token from the connector config
notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN")
if not notion_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Notion integration token not found in connector config for connector {connector_id}",
+ "Missing Notion token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Notion integration token not found in connector config"
# Initialize Notion client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Notion client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
logger.info(f"Initializing Notion client for connector {connector_id}")
notion_client = NotionHistoryConnector(token=notion_token)
@@ -364,15 +487,32 @@ async def index_notion_pages(
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Notion pages from {start_date_iso} to {end_date_iso}",
+ {"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso}
+ )
+
# Get all pages
try:
pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso)
logger.info(f"Found {len(pages)} Notion pages")
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to get Notion pages for connector {connector_id}",
+ str(e),
+ {"error_type": "PageFetchError"}
+ )
logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to get Notion pages: {str(e)}"
if not pages:
+ await task_logger.log_task_success(
+ log_entry,
+ f"No Notion pages found for connector {connector_id}",
+ {"pages_found": 0}
+ )
logger.info("No Notion pages found to index")
return 0, "No Notion pages found"
@@ -381,6 +521,12 @@ async def index_notion_pages(
documents_skipped = 0
skipped_pages = []
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(pages)} Notion pages",
+ {"stage": "process_pages", "total_pages": len(pages)}
+ )
+
# Process each page
for page in pages:
try:
@@ -552,15 +698,40 @@ async def index_notion_pages(
else:
result_message = f"Processed {total_processed} pages."
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Notion indexing for connector {connector_id}",
+ {
+ "pages_processed": total_processed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_pages_count": len(skipped_pages),
+ "result_message": result_message
+ }
+ )
+
logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Notion indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Notion pages for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to index Notion pages: {str(e)}"
@@ -585,11 +756,27 @@ async def index_github_repos(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="github_repos_indexing",
+ source="connector_indexing_task",
+ message=f"Starting GitHub repositories indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ )
+
documents_processed = 0
errors = []
try:
# 1. Get the GitHub connector from the database
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving GitHub connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -600,6 +787,12 @@ async def index_github_repos(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a GitHub connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector"
# 2. Get the GitHub PAT and selected repositories from the connector config
@@ -607,20 +800,50 @@ async def index_github_repos(
repo_full_names_to_index = connector.config.get("repo_full_names")
if not github_pat:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}",
+ "Missing GitHub PAT",
+ {"error_type": "MissingToken"}
+ )
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list):
- return 0, "'repo_full_names' not found or is not a list in connector config"
+ await task_logger.log_task_failure(
+ log_entry,
+ f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}",
+ "Invalid repo configuration",
+ {"error_type": "InvalidConfiguration"}
+ )
+ return 0, "'repo_full_names' not found or is not a list in connector config"
# 3. Initialize GitHub connector client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing GitHub client for connector {connector_id}",
+ {"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)}
+ )
+
try:
github_client = GitHubConnector(token=github_pat)
except ValueError as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to initialize GitHub client for connector {connector_id}",
+ str(e),
+ {"error_type": "ClientInitializationError"}
+ )
return 0, f"Failed to initialize GitHub client: {str(e)}"
# 4. Validate selected repositories
# For simplicity, we'll proceed with the list provided.
# If a repo is inaccessible, get_repository_files will likely fail gracefully later.
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting indexing for {len(repo_full_names_to_index)} selected repositories",
+ {"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date}
+ )
+
logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.")
if start_date and end_date:
logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)")
@@ -719,13 +942,36 @@ async def index_github_repos(
await session.commit()
logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.")
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed GitHub indexing for connector {connector_id}",
+ {
+ "documents_processed": documents_processed,
+ "errors_count": len(errors),
+ "repo_count": len(repo_full_names_to_index)
+ }
+ )
+
except SQLAlchemyError as db_err:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during GitHub indexing for connector {connector_id}",
+ str(db_err),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}")
errors.append(f"Database error: {db_err}")
return documents_processed, "; ".join(errors) if errors else str(db_err)
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Unexpected error during GitHub indexing for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True)
errors.append(f"Unexpected error: {e}")
return documents_processed, "; ".join(errors) if errors else str(e)
@@ -754,8 +1000,24 @@ async def index_linear_issues(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="linear_issues_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Linear issues indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Linear connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -766,14 +1028,32 @@ async def index_linear_issues(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Linear connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Linear connector"
# Get the Linear token from the connector config
linear_token = connector.config.get("LINEAR_API_KEY")
if not linear_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Linear API token not found in connector config for connector {connector_id}",
+ "Missing Linear token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Linear API token not found in connector config"
# Initialize Linear client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Linear client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
linear_client = LinearConnector(token=linear_token)
# Calculate date range
@@ -807,6 +1087,12 @@ async def index_linear_issues(
logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Linear issues from {start_date_str} to {end_date_str}",
+ {"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str}
+ )
+
# Get issues within date range
try:
issues, error = linear_client.get_issues_by_date_range(
@@ -855,6 +1141,12 @@ async def index_linear_issues(
documents_skipped = 0
skipped_issues = []
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(issues)} Linear issues",
+ {"stage": "process_issues", "total_issues": len(issues)}
+ )
+
# Process each issue
for issue in issues:
try:
@@ -959,16 +1251,39 @@ async def index_linear_issues(
await session.commit()
logger.info(f"Successfully committed all Linear document changes to database")
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Linear indexing for connector {connector_id}",
+ {
+ "issues_processed": total_processed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_issues_count": len(skipped_issues)
+ }
+ )
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped")
return total_processed, None # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Linear indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Linear issues for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Linear issues: {str(e)}"
@@ -993,8 +1308,24 @@ async def index_discord_messages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="discord_messages_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Discord messages indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Discord connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -1005,16 +1336,34 @@ async def index_discord_messages(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Discord connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Discord connector"
# Get the Discord token from the connector config
discord_token = connector.config.get("DISCORD_BOT_TOKEN")
if not discord_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Discord token not found in connector config for connector {connector_id}",
+ "Missing Discord token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Discord token not found in connector config"
logger.info(f"Starting Discord indexing for connector {connector_id}")
# Initialize Discord client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Discord client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
discord_client = DiscordConnector(token=discord_token)
# Calculate date range
@@ -1054,6 +1403,12 @@ async def index_discord_messages(
skipped_channels = []
try:
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting Discord bot and fetching guilds for connector {connector_id}",
+ {"stage": "fetch_guilds"}
+ )
+
logger.info("Starting Discord bot to fetch guilds")
discord_client._bot_task = asyncio.create_task(discord_client.start_bot())
await discord_client._wait_until_ready()
@@ -1062,15 +1417,32 @@ async def index_discord_messages(
guilds = await discord_client.get_guilds()
logger.info(f"Found {len(guilds)} guilds")
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to get Discord guilds for connector {connector_id}",
+ str(e),
+ {"error_type": "GuildFetchError"}
+ )
logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True)
await discord_client.close_bot()
return 0, f"Failed to get Discord guilds: {str(e)}"
if not guilds:
+ await task_logger.log_task_success(
+ log_entry,
+ f"No Discord guilds found for connector {connector_id}",
+ {"guilds_found": 0}
+ )
logger.info("No Discord guilds found to index")
await discord_client.close_bot()
return 0, "No Discord guilds found"
# Process each guild and channel
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(guilds)} Discord guilds",
+ {"stage": "process_guilds", "total_guilds": len(guilds)}
+ )
+
for guild in guilds:
guild_id = guild["id"]
guild_name = guild["name"]
@@ -1242,14 +1614,40 @@ async def index_discord_messages(
else:
result_message = f"Processed {documents_indexed} channels."
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Discord indexing for connector {connector_id}",
+ {
+ "channels_processed": documents_indexed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_channels_count": len(skipped_channels),
+ "guilds_processed": len(guilds),
+ "result_message": result_message
+ }
+ )
+
logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return documents_indexed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Discord indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Discord messages for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
return 0, f"Failed to index Discord messages: {str(e)}"
diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx
index 1093621e5..d01d54ad5 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx
@@ -134,7 +134,7 @@ export default function ConnectorsPage() {
const endDateStr = endDate ? format(endDate, "yyyy-MM-dd") : undefined;
await indexConnector(selectedConnectorForIndexing, searchSpaceId, startDateStr, endDateStr);
- toast.success("Connector content indexed successfully");
+ toast.success("Connector content indexing started");
} catch (error) {
console.error("Error indexing connector content:", error);
toast.error(
@@ -155,7 +155,7 @@ export default function ConnectorsPage() {
setIndexingConnectorId(connectorId);
try {
await indexConnector(connectorId, searchSpaceId);
- toast.success("Connector content indexed successfully");
+ toast.success("Connector content indexing started");
} catch (error) {
console.error("Error indexing connector content:", error);
toast.error(
From ba44256bf028a1f86c5b9ba8eb198106fffefc4d Mon Sep 17 00:00:00 2001
From: "MSI\\ModSetter"
Date: Thu, 17 Jul 2025 02:39:36 -0700
Subject: [PATCH 04/16] feat(BACKEND): Added task logging for podcast
generation
- Integrated TaskLoggingService to log the start, progress, success, and failure of podcast generation tasks.
- Updated user ID handling to ensure it is consistently converted to a string across various tasks.
- Modified frontend success message to direct users to the logs tab for status updates on podcast generation.
---
.../app/tasks/background_tasks.py | 8 +-
.../app/tasks/connectors_indexing_tasks.py | 10 +-
surfsense_backend/app/tasks/podcast_tasks.py | 236 +++++++++++++-----
.../[search_space_id]/chats/chats-client.tsx | 2 +-
4 files changed, 181 insertions(+), 75 deletions(-)
diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py
index f18890f0f..07e0f5fba 100644
--- a/surfsense_backend/app/tasks/background_tasks.py
+++ b/surfsense_backend/app/tasks/background_tasks.py
@@ -30,7 +30,7 @@ async def add_crawled_url_document(
task_name="crawl_url_document",
source="background_task",
message=f"Starting URL crawling process for: {url}",
- metadata={"url": url, "user_id": user_id}
+ metadata={"url": url, "user_id": str(user_id)}
)
try:
@@ -259,7 +259,7 @@ async def add_extension_received_document(
metadata={
"url": content.metadata.VisitedWebPageURL,
"title": content.metadata.VisitedWebPageTitle,
- "user_id": user_id
+ "user_id": str(user_id)
}
)
@@ -392,7 +392,7 @@ async def add_received_markdown_file_document(
task_name="markdown_file_document",
source="background_task",
message=f"Processing markdown file: {file_name}",
- metadata={"filename": file_name, "user_id": user_id, "content_length": len(file_in_markdown)}
+ metadata={"filename": file_name, "user_id": str(user_id), "content_length": len(file_in_markdown)}
)
try:
@@ -667,7 +667,7 @@ async def add_youtube_video_document(
task_name="youtube_video_document",
source="background_task",
message=f"Starting YouTube video processing for: {url}",
- metadata={"url": url, "user_id": user_id}
+ metadata={"url": url, "user_id": str(user_id)}
)
try:
diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
index 6a972be1e..e0b3cd1e0 100644
--- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py
+++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
@@ -50,7 +50,7 @@ async def index_slack_messages(
task_name="slack_messages_indexing",
source="connector_indexing_task",
message=f"Starting Slack messages indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
@@ -412,7 +412,7 @@ async def index_notion_pages(
task_name="notion_pages_indexing",
source="connector_indexing_task",
message=f"Starting Notion pages indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
@@ -763,7 +763,7 @@ async def index_github_repos(
task_name="github_repos_indexing",
source="connector_indexing_task",
message=f"Starting GitHub repositories indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
documents_processed = 0
@@ -1007,7 +1007,7 @@ async def index_linear_issues(
task_name="linear_issues_indexing",
source="connector_indexing_task",
message=f"Starting Linear issues indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
@@ -1315,7 +1315,7 @@ async def index_discord_messages(
task_name="discord_messages_indexing",
source="connector_indexing_task",
message=f"Starting Discord messages indexing for connector {connector_id}",
- metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date}
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
)
try:
diff --git a/surfsense_backend/app/tasks/podcast_tasks.py b/surfsense_backend/app/tasks/podcast_tasks.py
index a6be54600..f4907af61 100644
--- a/surfsense_backend/app/tasks/podcast_tasks.py
+++ b/surfsense_backend/app/tasks/podcast_tasks.py
@@ -2,8 +2,10 @@
from app.agents.podcaster.graph import graph as podcaster_graph
from app.agents.podcaster.state import State
from app.db import Chat, Podcast
+from app.services.task_logging_service import TaskLoggingService
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.exc import SQLAlchemyError
async def generate_document_podcast(
@@ -24,73 +26,177 @@ async def generate_chat_podcast(
podcast_title: str,
user_id: int
):
- # Fetch the chat with the specified ID
- query = select(Chat).filter(
- Chat.id == chat_id,
- Chat.search_space_id == search_space_id
- )
+ task_logger = TaskLoggingService(session, search_space_id)
- result = await session.execute(query)
- chat = result.scalars().first()
-
- if not chat:
- raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}")
-
- # Create chat history structure
- chat_history_str = ""
-
- for message in chat.messages:
- if message["role"] == "user":
- chat_history_str += f"{message['content']} "
- elif message["role"] == "assistant":
- # Last annotation type will always be "ANSWER" here
- answer_annotation = message["annotations"][-1]
- answer_text = ""
- if answer_annotation["type"] == "ANSWER":
- answer_text = answer_annotation["content"]
- # If content is a list, join it into a single string
- if isinstance(answer_text, list):
- answer_text = "\n".join(answer_text)
- chat_history_str += f"{answer_text} "
-
- chat_history_str += " "
-
- # Pass it to the SurfSense Podcaster
- config = {
- "configurable": {
- "podcast_title": "SurfSense",
- "user_id": str(user_id),
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="generate_chat_podcast",
+ source="podcast_task",
+ message=f"Starting podcast generation for chat {chat_id}",
+ metadata={
+ "chat_id": chat_id,
+ "search_space_id": search_space_id,
+ "podcast_title": podcast_title,
+ "user_id": str(user_id)
}
- }
- # Initialize state with database session and streaming service
- initial_state = State(
- source_content=chat_history_str,
- db_session=session
)
- # Run the graph directly
- result = await podcaster_graph.ainvoke(initial_state, config=config)
-
- # Convert podcast transcript entries to serializable format
- serializable_transcript = []
- for entry in result["podcast_transcript"]:
- serializable_transcript.append({
- "speaker_id": entry.speaker_id,
- "dialog": entry.dialog
- })
-
- # Create a new podcast entry
- podcast = Podcast(
- title=f"{podcast_title}",
- podcast_transcript=serializable_transcript,
- file_location=result["final_podcast_file_path"],
- search_space_id=search_space_id
- )
-
- # Add to session and commit
- session.add(podcast)
- await session.commit()
- await session.refresh(podcast)
-
- return podcast
+ try:
+ # Fetch the chat with the specified ID
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching chat {chat_id} from database",
+ {"stage": "fetch_chat"}
+ )
+
+ query = select(Chat).filter(
+ Chat.id == chat_id,
+ Chat.search_space_id == search_space_id
+ )
+
+ result = await session.execute(query)
+ chat = result.scalars().first()
+
+ if not chat:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Chat with id {chat_id} not found in search space {search_space_id}",
+ "Chat not found",
+ {"error_type": "ChatNotFound"}
+ )
+ raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}")
+
+ # Create chat history structure
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing chat history for chat {chat_id}",
+ {"stage": "process_chat_history", "message_count": len(chat.messages)}
+ )
+
+ chat_history_str = ""
+
+ processed_messages = 0
+ for message in chat.messages:
+ if message["role"] == "user":
+ chat_history_str += f"{message['content']} "
+ processed_messages += 1
+ elif message["role"] == "assistant":
+ # Last annotation type will always be "ANSWER" here
+ answer_annotation = message["annotations"][-1]
+ answer_text = ""
+ if answer_annotation["type"] == "ANSWER":
+ answer_text = answer_annotation["content"]
+ # If content is a list, join it into a single string
+ if isinstance(answer_text, list):
+ answer_text = "\n".join(answer_text)
+ chat_history_str += f"{answer_text} "
+ processed_messages += 1
+
+ chat_history_str += " "
+
+ # Pass it to the SurfSense Podcaster
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing podcast generation for chat {chat_id}",
+ {"stage": "initialize_podcast_generation", "processed_messages": processed_messages, "content_length": len(chat_history_str)}
+ )
+
+ config = {
+ "configurable": {
+ "podcast_title": "SurfSense",
+ "user_id": str(user_id),
+ }
+ }
+ # Initialize state with database session and streaming service
+ initial_state = State(
+ source_content=chat_history_str,
+ db_session=session
+ )
+
+ # Run the graph directly
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Running podcast generation graph for chat {chat_id}",
+ {"stage": "run_podcast_graph"}
+ )
+
+ result = await podcaster_graph.ainvoke(initial_state, config=config)
+
+ # Convert podcast transcript entries to serializable format
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing podcast transcript for chat {chat_id}",
+ {"stage": "process_transcript", "transcript_entries": len(result["podcast_transcript"])}
+ )
+
+ serializable_transcript = []
+ for entry in result["podcast_transcript"]:
+ serializable_transcript.append({
+ "speaker_id": entry.speaker_id,
+ "dialog": entry.dialog
+ })
+
+ # Create a new podcast entry
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating podcast database entry for chat {chat_id}",
+ {"stage": "create_podcast_entry", "file_location": result.get("final_podcast_file_path")}
+ )
+
+ podcast = Podcast(
+ title=f"{podcast_title}",
+ podcast_transcript=serializable_transcript,
+ file_location=result["final_podcast_file_path"],
+ search_space_id=search_space_id
+ )
+
+ # Add to session and commit
+ session.add(podcast)
+ await session.commit()
+ await session.refresh(podcast)
+
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully generated podcast for chat {chat_id}",
+ {
+ "podcast_id": podcast.id,
+ "podcast_title": podcast_title,
+ "transcript_entries": len(serializable_transcript),
+ "file_location": result.get("final_podcast_file_path"),
+ "processed_messages": processed_messages,
+ "content_length": len(chat_history_str)
+ }
+ )
+
+ return podcast
+
+ except ValueError as ve:
+ # ValueError is already logged above for chat not found
+ if "not found" not in str(ve):
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Value error during podcast generation for chat {chat_id}",
+ str(ve),
+ {"error_type": "ValueError"}
+ )
+ raise ve
+ except SQLAlchemyError as db_error:
+ await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during podcast generation for chat {chat_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
+ raise db_error
+ except Exception as e:
+ await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Unexpected error during podcast generation for chat {chat_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
+ raise RuntimeError(f"Failed to generate podcast for chat {chat_id}: {str(e)}")
diff --git a/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx b/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx
index 7b14c7ed1..45f6d4610 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/chats/chats-client.tsx
@@ -329,7 +329,7 @@ export default function ChatsPageClient({ searchSpaceId }: ChatsPageClientProps)
// Helper to finish the podcast generation process
const finishPodcastGeneration = () => {
- toast.success("All podcasts are being generated! Check the podcasts tab to see them when ready.");
+ toast.success("All podcasts are being generated! Check the logs tab to see their status.");
setPodcastDialogOpen(false);
setSelectedChats([]);
setSelectionMode(false);
From d103e21a68da461bd31c476902ce63ff515d2dd9 Mon Sep 17 00:00:00 2001
From: "MSI\\ModSetter"
Date: Thu, 17 Jul 2025 04:10:24 -0700
Subject: [PATCH 05/16] feat(FRONTEND): Moved User Dropdown to Search Space
Dashboard
---
.../api-key/api-key-client.tsx | 0
.../api-key/client-wrapper.tsx | 0
.../{[search_space_id] => }/api-key/page.tsx | 0
surfsense_web/app/dashboard/page.tsx | 74 ++++++++++-----
surfsense_web/components/UserDropdown.tsx | 92 +++++++++++++++++++
.../components/sidebar/AppSidebarProvider.tsx | 88 +++++-------------
.../components/sidebar/app-sidebar.tsx | 13 +--
7 files changed, 169 insertions(+), 98 deletions(-)
rename surfsense_web/app/dashboard/{[search_space_id] => }/api-key/api-key-client.tsx (100%)
rename surfsense_web/app/dashboard/{[search_space_id] => }/api-key/client-wrapper.tsx (100%)
rename surfsense_web/app/dashboard/{[search_space_id] => }/api-key/page.tsx (100%)
create mode 100644 surfsense_web/components/UserDropdown.tsx
diff --git a/surfsense_web/app/dashboard/[search_space_id]/api-key/api-key-client.tsx b/surfsense_web/app/dashboard/api-key/api-key-client.tsx
similarity index 100%
rename from surfsense_web/app/dashboard/[search_space_id]/api-key/api-key-client.tsx
rename to surfsense_web/app/dashboard/api-key/api-key-client.tsx
diff --git a/surfsense_web/app/dashboard/[search_space_id]/api-key/client-wrapper.tsx b/surfsense_web/app/dashboard/api-key/client-wrapper.tsx
similarity index 100%
rename from surfsense_web/app/dashboard/[search_space_id]/api-key/client-wrapper.tsx
rename to surfsense_web/app/dashboard/api-key/client-wrapper.tsx
diff --git a/surfsense_web/app/dashboard/[search_space_id]/api-key/page.tsx b/surfsense_web/app/dashboard/api-key/page.tsx
similarity index 100%
rename from surfsense_web/app/dashboard/[search_space_id]/api-key/page.tsx
rename to surfsense_web/app/dashboard/api-key/page.tsx
diff --git a/surfsense_web/app/dashboard/page.tsx b/surfsense_web/app/dashboard/page.tsx
index 176c9bf15..c0ca5623e 100644
--- a/surfsense_web/app/dashboard/page.tsx
+++ b/surfsense_web/app/dashboard/page.tsx
@@ -1,14 +1,15 @@
"use client";
-import React from 'react'
+import React, { useEffect, useState } from 'react'
import Link from 'next/link'
import { motion } from 'framer-motion'
import { Button } from '@/components/ui/button'
-import { Plus, Search, Trash2, AlertCircle, Loader2, LogOut } from 'lucide-react'
+import { Plus, Search, Trash2, AlertCircle, Loader2 } from 'lucide-react'
import { Tilt } from '@/components/ui/tilt'
import { Spotlight } from '@/components/ui/spotlight'
import { Logo } from '@/components/Logo';
import { ThemeTogglerComponent } from '@/components/theme/theme-toggle';
+import { UserDropdown } from '@/components/UserDropdown';
import { toast } from 'sonner';
import {
AlertDialog,
@@ -28,8 +29,17 @@ import {
} from "@/components/ui/alert";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";
import { useSearchSpaces } from '@/hooks/use-search-spaces';
+import { apiClient } from '@/lib/api';
import { useRouter } from 'next/navigation';
+interface User {
+ id: string;
+ email: string;
+ is_active: boolean;
+ is_superuser: boolean;
+ is_verified: boolean;
+}
+
/**
* Formats a date string into a readable format
* @param dateString - The date string to format
@@ -147,17 +157,47 @@ const DashboardPage = () => {
const router = useRouter();
const { searchSpaces, loading, error, refreshSearchSpaces } = useSearchSpaces();
+
+ // User state management
+ const [user, setUser] = useState(null);
+ const [isLoadingUser, setIsLoadingUser] = useState(true);
+ const [userError, setUserError] = useState(null);
+
+ // Fetch user details
+ useEffect(() => {
+ const fetchUser = async () => {
+ try {
+ if (typeof window === 'undefined') return;
+
+ try {
+ const userData = await apiClient.get('users/me');
+ setUser(userData);
+ setUserError(null);
+ } catch (error) {
+ console.error('Error fetching user:', error);
+ setUserError(error instanceof Error ? error.message : 'Unknown error occurred');
+ } finally {
+ setIsLoadingUser(false);
+ }
+ } catch (error) {
+ console.error('Error in fetchUser:', error);
+ setIsLoadingUser(false);
+ }
+ };
+
+ fetchUser();
+ }, []);
+
+ // Create user object for UserDropdown
+ const customUser = {
+ name: user?.email ? user.email.split('@')[0] : 'User',
+ email: user?.email || (isLoadingUser ? 'Loading...' : userError ? 'Error loading user' : 'Unknown User'),
+ avatar: '/icon-128.png', // Default avatar
+ };
if (loading) return ;
if (error) return ;
- const handleLogout = () => {
- if (typeof window !== 'undefined') {
- localStorage.removeItem('surfsense_bearer_token');
- router.push('/');
- }
- };
-
const handleDeleteSearchSpace = async (id: number) => {
// Send DELETE request to the API
try {
@@ -201,18 +241,10 @@ const DashboardPage = () => {
-
-
-
-
-
-
+
+
+
+
diff --git a/surfsense_web/components/UserDropdown.tsx b/surfsense_web/components/UserDropdown.tsx
new file mode 100644
index 000000000..30ac87979
--- /dev/null
+++ b/surfsense_web/components/UserDropdown.tsx
@@ -0,0 +1,92 @@
+"use client"
+
+import {
+ BadgeCheck,
+ ChevronsUpDown,
+ LogOut,
+ Settings,
+} from "lucide-react"
+
+import {
+ Avatar,
+ AvatarFallback,
+ AvatarImage,
+} from "@/components/ui/avatar"
+import {
+ DropdownMenu,
+ DropdownMenuContent,
+ DropdownMenuGroup,
+ DropdownMenuItem,
+ DropdownMenuLabel,
+ DropdownMenuSeparator,
+ DropdownMenuTrigger,
+} from "@/components/ui/dropdown-menu"
+import { Button } from "@/components/ui/button"
+import { useRouter, useParams } from "next/navigation"
+
+export function UserDropdown({
+ user,
+}: {
+ user: {
+ name: string
+ email: string
+ avatar: string
+ }
+}) {
+ const router = useRouter()
+
+ const handleLogout = () => {
+ if (typeof window !== 'undefined') {
+ localStorage.removeItem('surfsense_bearer_token');
+ router.push('/');
+ }
+ };
+
+ return (
+
+
+
+
+
+ {user.name.charAt(0).toUpperCase()}
+
+
+
+
+
+
+
{user.name}
+
+ {user.email}
+
+
+
+
+
+
+ router.push(`/dashboard/api-key`)}>
+
+ API Key
+
+
+
+
+ router.push(`/settings`)}>
+
+ Settings
+
+
+
+ Log out
+
+
+
+ )
+}
\ No newline at end of file
diff --git a/surfsense_web/components/sidebar/AppSidebarProvider.tsx b/surfsense_web/components/sidebar/AppSidebarProvider.tsx
index 98593287d..0039d3bb7 100644
--- a/surfsense_web/components/sidebar/AppSidebarProvider.tsx
+++ b/surfsense_web/components/sidebar/AppSidebarProvider.tsx
@@ -31,14 +31,6 @@ interface SearchSpace {
user_id: string;
}
-interface User {
- id: string;
- email: string;
- is_active: boolean;
- is_superuser: boolean;
- is_verified: boolean;
-}
-
interface AppSidebarProviderProps {
searchSpaceId: string;
navSecondary: {
@@ -58,20 +50,17 @@ interface AppSidebarProviderProps {
}[];
}
-export function AppSidebarProvider({
- searchSpaceId,
- navSecondary,
- navMain
+export function AppSidebarProvider({
+ searchSpaceId,
+ navSecondary,
+ navMain
}: AppSidebarProviderProps) {
const [recentChats, setRecentChats] = useState<{ name: string; url: string; icon: string; id: number; search_space_id: number; actions: { name: string; icon: string; onClick: () => void }[] }[]>([]);
const [searchSpace, setSearchSpace] = useState
(null);
- const [user, setUser] = useState(null);
const [isLoadingChats, setIsLoadingChats] = useState(true);
const [isLoadingSearchSpace, setIsLoadingSearchSpace] = useState(true);
- const [isLoadingUser, setIsLoadingUser] = useState(true);
const [chatError, setChatError] = useState(null);
const [searchSpaceError, setSearchSpaceError] = useState(null);
- const [userError, setUserError] = useState(null);
const [showDeleteDialog, setShowDeleteDialog] = useState(false);
const [chatToDelete, setChatToDelete] = useState<{ id: number, name: string } | null>(null);
const [isDeleting, setIsDeleting] = useState(false);
@@ -82,33 +71,6 @@ export function AppSidebarProvider({
setIsClient(true);
}, []);
- // Fetch user details
- useEffect(() => {
- const fetchUser = async () => {
- try {
- // Only run on client-side
- if (typeof window === 'undefined') return;
-
- try {
- // Use the API client instead of direct fetch
- const userData = await apiClient.get('users/me');
- setUser(userData);
- setUserError(null);
- } catch (error) {
- console.error('Error fetching user:', error);
- setUserError(error instanceof Error ? error.message : 'Unknown error occurred');
- } finally {
- setIsLoadingUser(false);
- }
- } catch (error) {
- console.error('Error in fetchUser:', error);
- setIsLoadingUser(false);
- }
- };
-
- fetchUser();
- }, []);
-
// Fetch recent chats
useEffect(() => {
const fetchRecentChats = async () => {
@@ -119,9 +81,9 @@ export function AppSidebarProvider({
try {
// Use the API client instead of direct fetch - filter by current search space ID
const chats: Chat[] = await apiClient.get(`api/v1/chats/?limit=5&skip=0&search_space_id=${searchSpaceId}`);
-
+
// Sort chats by created_at in descending order (newest first)
- const sortedChats = chats.sort((a, b) =>
+ const sortedChats = chats.sort((a, b) =>
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
// console.log("sortedChats", sortedChats);
@@ -171,7 +133,7 @@ export function AppSidebarProvider({
// Set up a refresh interval (every 5 minutes)
const intervalId = setInterval(fetchRecentChats, 5 * 60 * 1000);
-
+
// Clean up interval on component unmount
return () => clearInterval(intervalId);
}, [searchSpaceId]);
@@ -179,16 +141,16 @@ export function AppSidebarProvider({
// Handle delete chat
const handleDeleteChat = async () => {
if (!chatToDelete) return;
-
+
try {
setIsDeleting(true);
-
+
// Use the API client instead of direct fetch
await apiClient.delete(`api/v1/chats/${chatToDelete.id}`);
-
+
// Close dialog and refresh chats
setRecentChats(recentChats.filter(chat => chat.id !== chatToDelete.id));
-
+
} catch (error) {
console.error('Error deleting chat:', error);
} finally {
@@ -226,15 +188,15 @@ export function AppSidebarProvider({
}, [searchSpaceId]);
// Create a fallback chat if there's an error or no chats
- const fallbackChats = chatError || (!isLoadingChats && recentChats.length === 0)
- ? [{
- name: chatError ? "Error loading chats" : "No recent chats",
- url: "#",
- icon: chatError ? "AlertCircle" : "MessageCircleMore",
- id: 0,
- search_space_id: Number(searchSpaceId),
- actions: []
- }]
+ const fallbackChats = chatError || (!isLoadingChats && recentChats.length === 0)
+ ? [{
+ name: chatError ? "Error loading chats" : "No recent chats",
+ url: "#",
+ icon: chatError ? "AlertCircle" : "MessageCircleMore",
+ id: 0,
+ search_space_id: Number(searchSpaceId),
+ actions: []
+ }]
: [];
// Use fallback chats if there's an error or no chats
@@ -249,22 +211,14 @@ export function AppSidebarProvider({
};
}
- // Create user object for AppSidebar
- const customUser = {
- name: isClient && user?.email ? user.email.split('@')[0] : 'User',
- email: isClient ? (user?.email || (isLoadingUser ? 'Loading...' : userError ? 'Error loading user' : 'Unknown User')) : 'Loading...',
- avatar: '/icon-128.png', // Default avatar
- };
-
return (
<>
-
+
{/* Delete Confirmation Dialog - Only render on client */}
{isClient && (
diff --git a/surfsense_web/components/sidebar/app-sidebar.tsx b/surfsense_web/components/sidebar/app-sidebar.tsx
index cd437f3b7..6b9cd9eb3 100644
--- a/surfsense_web/components/sidebar/app-sidebar.tsx
+++ b/surfsense_web/components/sidebar/app-sidebar.tsx
@@ -23,7 +23,6 @@ import { Logo } from "@/components/Logo";
import { NavMain } from "@/components/sidebar/nav-main"
import { NavProjects } from "@/components/sidebar/nav-projects"
import { NavSecondary } from "@/components/sidebar/nav-secondary"
-import { NavUser } from "@/components/sidebar/nav-user"
import {
Sidebar,
SidebarContent,
@@ -143,11 +142,6 @@ const defaultData = {
}
interface AppSidebarProps extends React.ComponentProps {
- user?: {
- name: string
- email: string
- avatar: string
- }
navMain?: {
title: string
url: string
@@ -178,7 +172,6 @@ interface AppSidebarProps extends React.ComponentProps {
}
export function AppSidebar({
- user = defaultData.user,
navMain = defaultData.navMain,
navSecondary = defaultData.navSecondary,
RecentChats = defaultData.RecentChats,
@@ -232,9 +225,9 @@ export function AppSidebar({
{processedRecentChats.length > 0 && }
-
-
-
+ {/*
+ footer
+ */}
)
}
From c7d75490528d68e21a4b4c545ee6883e86fab23d Mon Sep 17 00:00:00 2001
From: Swapnilpatil03
Date: Thu, 17 Jul 2025 19:54:35 +0530
Subject: [PATCH 06/16] Add back button to settings page to navigate to
/dashboard
---
surfsense_web/app/settings/page.tsx | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/surfsense_web/app/settings/page.tsx b/surfsense_web/app/settings/page.tsx
index ff3bad2bf..b9e36e751 100644
--- a/surfsense_web/app/settings/page.tsx
+++ b/surfsense_web/app/settings/page.tsx
@@ -1,13 +1,16 @@
"use client";
import React from 'react';
+import { useRouter } from 'next/navigation'; // Add this import
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import { Separator } from '@/components/ui/separator';
-import { Bot, Settings, Brain } from 'lucide-react';
+import { Bot, Settings, Brain, ArrowLeft } from 'lucide-react'; // Import ArrowLeft icon
import { ModelConfigManager } from '@/components/settings/model-config-manager';
import { LLMRoleManager } from '@/components/settings/llm-role-manager';
export default function SettingsPage() {
+ const router = useRouter(); // Initialize router
+
return (
@@ -15,6 +18,15 @@ export default function SettingsPage() {
{/* Header Section */}
+ {/* Back Button */}
+
router.push('/dashboard')}
+ className="flex items-center justify-center h-10 w-10 rounded-lg bg-primary/10 hover:bg-primary/20 transition-colors"
+ aria-label="Back to Dashboard"
+ type="button"
+ >
+
+
@@ -57,4 +69,4 @@ export default function SettingsPage() {
);
-}
\ No newline at end of file
+}
\ No newline at end of file
From d90177f6d34d079d548e646f93802c4d7c166b46 Mon Sep 17 00:00:00 2001
From: Swapnilpatil03
Date: Fri, 18 Jul 2025 10:23:25 +0530
Subject: [PATCH 07/16] fix(ui): Refactor logout function in UserDropdown.tsx
#199
---
surfsense_web/components/UserDropdown.tsx | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/surfsense_web/components/UserDropdown.tsx b/surfsense_web/components/UserDropdown.tsx
index 30ac87979..fceb0c74c 100644
--- a/surfsense_web/components/UserDropdown.tsx
+++ b/surfsense_web/components/UserDropdown.tsx
@@ -36,9 +36,18 @@ export function UserDropdown({
const router = useRouter()
const handleLogout = () => {
- if (typeof window !== 'undefined') {
- localStorage.removeItem('surfsense_bearer_token');
- router.push('/');
+ try {
+ if (typeof window !== 'undefined') {
+ localStorage.removeItem('surfsense_bearer_token');
+ router.push('/');
+ }
+ } catch (error) {
+ console.error('Error during logout:', error);
+ // Optionally, provide user feedback
+ if (typeof window !== 'undefined') {
+ alert('Logout failed. Please try again.');
+ router.push('/');
+ }
}
};
From 014a161fe353ddb42ddd538c019fd8d7b1395eea Mon Sep 17 00:00:00 2001
From: Sabin Shrestha
Date: Fri, 18 Jul 2025 11:54:15 +0545
Subject: [PATCH 08/16] fix(ui): Refactor UserDropdown.tsx to add safety check
for user name
---
surfsense_web/components/UserDropdown.tsx | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/surfsense_web/components/UserDropdown.tsx b/surfsense_web/components/UserDropdown.tsx
index 30ac87979..a66660d6a 100644
--- a/surfsense_web/components/UserDropdown.tsx
+++ b/surfsense_web/components/UserDropdown.tsx
@@ -51,7 +51,7 @@ export function UserDropdown({
>
- {user.name.charAt(0).toUpperCase()}
+ {user.name.charAt(0)?.toUpperCase() || '?'}
From d8fceb76ca66ea79c460e409a47b6874ad298991 Mon Sep 17 00:00:00 2001
From: Sabin Shrestha
Date: Fri, 18 Jul 2025 16:18:28 +0545
Subject: [PATCH 09/16] fix(ui): Add back button in Create Search Space Page
---
.../components/search-space-form.tsx | 49 +++++++++++++------
1 file changed, 33 insertions(+), 16 deletions(-)
diff --git a/surfsense_web/components/search-space-form.tsx b/surfsense_web/components/search-space-form.tsx
index d4eedba57..d03cab385 100644
--- a/surfsense_web/components/search-space-form.tsx
+++ b/surfsense_web/components/search-space-form.tsx
@@ -3,7 +3,7 @@
import { useState } from "react";
import { motion } from "framer-motion";
import { cn } from "@/lib/utils";
-import { Plus, Search, Trash2 } from "lucide-react";
+import { MoveLeftIcon, Plus, Search, Trash2 } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
@@ -33,6 +33,7 @@ import {
FormLabel,
FormMessage,
} from "@/components/ui/form";
+import { useRouter } from "next/navigation";
// Define the form schema with Zod
const searchSpaceFormSchema = z.object({
@@ -59,7 +60,8 @@ export function SearchSpaceForm({
initialData = { name: "", description: "" }
}: SearchSpaceFormProps) {
const [showDeleteDialog, setShowDeleteDialog] = useState(false);
-
+ const router = useRouter();
+
// Initialize the form with React Hook Form and Zod validation
const form = useForm({
resolver: zodResolver(searchSpaceFormSchema),
@@ -115,17 +117,32 @@ export function SearchSpaceForm({
animate="visible"
variants={containerVariants}
>
-
-
- {isEditing ? "Edit Search Space" : "Create Search Space"}
-
-
- {isEditing
- ? "Update your search space details"
- : "Create a new search space to organize your documents, chats, and podcasts."}
-
+
+
+
+ {isEditing ? "Edit Search Space" : "Create Search Space"}
+
+
+ {isEditing
+ ? "Update your search space details"
+ : "Create a new search space to organize your documents, chats, and podcasts."}
+
+
+ {
+ router.push('/dashboard')
+ }}
+ >
+
+
+
+
-
+
-
+
-
+