feat(fix): document type filtering

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-10-21 21:53:55 -07:00
parent fec8deabcc
commit 18adf79649
7 changed files with 623 additions and 705 deletions

View file

@ -2,17 +2,14 @@
import asyncio import asyncio
from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile
from litellm import atranscription
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from app.config import config as app_config
from app.db import ( from app.db import (
Chunk, Chunk,
Document, Document,
DocumentType, DocumentType,
Log,
SearchSpace, SearchSpace,
User, User,
get_async_session, get_async_session,
@ -24,16 +21,6 @@ from app.schemas import (
DocumentWithChunksRead, DocumentWithChunksRead,
PaginatedResponse, PaginatedResponse,
) )
from app.services.task_logging_service import TaskLoggingService
from app.tasks.document_processors import (
add_crawled_url_document,
add_extension_received_document,
add_received_file_document_using_docling,
add_received_file_document_using_llamacloud,
add_received_file_document_using_unstructured,
add_received_markdown_file_document,
add_youtube_video_document,
)
from app.users import current_active_user from app.users import current_active_user
from app.utils.check_ownership import check_ownership from app.utils.check_ownership import check_ownership
@ -166,6 +153,7 @@ async def read_documents(
page: int | None = None, page: int | None = None,
page_size: int = 50, page_size: int = 50,
search_space_id: int | None = None, search_space_id: int | None = None,
document_types: str | None = None,
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
): ):
@ -177,6 +165,7 @@ async def read_documents(
page: Zero-based page index used when 'skip' is not provided. page: Zero-based page index used when 'skip' is not provided.
page_size: Number of items per page (default: 50). Use -1 to return all remaining items after the offset. page_size: Number of items per page (default: 50). Use -1 to return all remaining items after the offset.
search_space_id: If provided, restrict results to a specific search space. search_space_id: If provided, restrict results to a specific search space.
document_types: Comma-separated list of document types to filter by (e.g., "EXTENSION,FILE,SLACK_CONNECTOR").
session: Database session (injected). session: Database session (injected).
user: Current authenticated user (injected). user: Current authenticated user (injected).
@ -198,6 +187,12 @@ async def read_documents(
if search_space_id is not None: if search_space_id is not None:
query = query.filter(Document.search_space_id == search_space_id) query = query.filter(Document.search_space_id == search_space_id)
# Filter by document_types if provided
if document_types is not None and document_types.strip():
type_list = [t.strip() for t in document_types.split(",") if t.strip()]
if type_list:
query = query.filter(Document.document_type.in_(type_list))
# Get total count # Get total count
count_query = ( count_query = (
select(func.count()) select(func.count())
@ -209,6 +204,10 @@ async def read_documents(
count_query = count_query.filter( count_query = count_query.filter(
Document.search_space_id == search_space_id Document.search_space_id == search_space_id
) )
if document_types is not None and document_types.strip():
type_list = [t.strip() for t in document_types.split(",") if t.strip()]
if type_list:
count_query = count_query.filter(Document.document_type.in_(type_list))
total_result = await session.execute(count_query) total_result = await session.execute(count_query)
total = total_result.scalar() or 0 total = total_result.scalar() or 0
@ -256,11 +255,12 @@ async def search_documents(
page: int | None = None, page: int | None = None,
page_size: int = 50, page_size: int = 50,
search_space_id: int | None = None, search_space_id: int | None = None,
document_types: str | None = None,
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
): ):
""" """
Search documents by title substring, optionally filtered by search_space_id. Search documents by title substring, optionally filtered by search_space_id and document_types.
Args: Args:
title: Case-insensitive substring to match against document titles. Required. title: Case-insensitive substring to match against document titles. Required.
@ -268,6 +268,7 @@ async def search_documents(
page: Zero-based page index used when 'skip' is not provided. Default: None. page: Zero-based page index used when 'skip' is not provided. Default: None.
page_size: Number of items per page. Use -1 to return all remaining items after the offset. Default: 50. page_size: Number of items per page. Use -1 to return all remaining items after the offset. Default: 50.
search_space_id: Filter results to a specific search space. Default: None. search_space_id: Filter results to a specific search space. Default: None.
document_types: Comma-separated list of document types to filter by (e.g., "EXTENSION,FILE,SLACK_CONNECTOR").
session: Database session (injected). session: Database session (injected).
user: Current authenticated user (injected). user: Current authenticated user (injected).
@ -290,6 +291,12 @@ async def search_documents(
# Only search by title (case-insensitive) # Only search by title (case-insensitive)
query = query.filter(Document.title.ilike(f"%{title}%")) query = query.filter(Document.title.ilike(f"%{title}%"))
# Filter by document_types if provided
if document_types is not None and document_types.strip():
type_list = [t.strip() for t in document_types.split(",") if t.strip()]
if type_list:
query = query.filter(Document.document_type.in_(type_list))
# Get total count # Get total count
count_query = ( count_query = (
select(func.count()) select(func.count())
@ -302,6 +309,10 @@ async def search_documents(
Document.search_space_id == search_space_id Document.search_space_id == search_space_id
) )
count_query = count_query.filter(Document.title.ilike(f"%{title}%")) count_query = count_query.filter(Document.title.ilike(f"%{title}%"))
if document_types is not None and document_types.strip():
type_list = [t.strip() for t in document_types.split(",") if t.strip()]
if type_list:
count_query = count_query.filter(Document.document_type.in_(type_list))
total_result = await session.execute(count_query) total_result = await session.execute(count_query)
total = total_result.scalar() or 0 total = total_result.scalar() or 0
@ -455,6 +466,46 @@ async def delete_document(
) from e ) from e
@router.get("/documents/type-counts/")
async def get_document_type_counts(
search_space_id: int | None = None,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""
Get counts of documents by type for the current user.
Args:
search_space_id: If provided, restrict counts to a specific search space.
session: Database session (injected).
user: Current authenticated user (injected).
Returns:
Dict mapping document types to their counts.
"""
try:
from sqlalchemy import func
query = (
select(Document.document_type, func.count(Document.id))
.join(SearchSpace)
.filter(SearchSpace.user_id == user.id)
.group_by(Document.document_type)
)
if search_space_id is not None:
query = query.filter(Document.search_space_id == search_space_id)
result = await session.execute(query)
type_counts = dict(result.all())
return type_counts
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to fetch document type counts: {e!s}"
) from e
@router.get("/documents/by-chunk/{chunk_id}", response_model=DocumentWithChunksRead) @router.get("/documents/by-chunk/{chunk_id}", response_model=DocumentWithChunksRead)
async def get_document_by_chunk_id( async def get_document_by_chunk_id(
chunk_id: int, chunk_id: int,
@ -510,623 +561,3 @@ async def get_document_by_chunk_id(
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to retrieve document: {e!s}" status_code=500, detail=f"Failed to retrieve document: {e!s}"
) from e ) from e
async def process_extension_document_with_new_session(
individual_document, search_space_id: int, user_id: str
):
"""Create a new session and process extension document."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_extension_document",
source="document_processor",
message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}",
metadata={
"document_type": "EXTENSION",
"url": individual_document.metadata.VisitedWebPageURL,
"title": individual_document.metadata.VisitedWebPageTitle,
"user_id": user_id,
},
)
try:
result = await add_extension_received_document(
session, individual_document, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
{"document_id": result.id, "content_hash": result.content_hash},
)
else:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}",
str(e),
{"error_type": type(e).__name__},
)
import logging
logging.error(f"Error processing extension document: {e!s}")
async def process_crawled_url_with_new_session(
url: str, search_space_id: int, user_id: str
):
"""Create a new session and process crawled URL."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_crawled_url",
source="document_processor",
message=f"Starting URL crawling and processing for: {url}",
metadata={"document_type": "CRAWLED_URL", "url": url, "user_id": user_id},
)
try:
result = await add_crawled_url_document(
session, url, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully crawled and processed URL: {url}",
{
"document_id": result.id,
"title": result.title,
"content_hash": result.content_hash,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"URL document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to crawl URL: {url}",
str(e),
{"error_type": type(e).__name__},
)
import logging
logging.error(f"Error processing crawled URL: {e!s}")
async def process_file_in_background_with_new_session(
file_path: str, filename: str, search_space_id: int, user_id: str
):
"""Create a new session and process file."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
source="document_processor",
message=f"Starting file processing for: {filename}",
metadata={
"document_type": "FILE",
"filename": filename,
"file_path": file_path,
"user_id": user_id,
},
)
try:
await process_file_in_background(
file_path,
filename,
search_space_id,
user_id,
session,
task_logger,
log_entry,
)
# Note: success/failure logging is handled within process_file_in_background
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process file: {filename}",
str(e),
{"error_type": type(e).__name__},
)
import logging
logging.error(f"Error processing file: {e!s}")
async def process_youtube_video_with_new_session(
url: str, search_space_id: int, user_id: str
):
"""Create a new session and process YouTube video."""
from app.db import async_session_maker
from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
# Initialize task logging service
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="process_youtube_video",
source="document_processor",
message=f"Starting YouTube video processing for: {url}",
metadata={"document_type": "YOUTUBE_VIDEO", "url": url, "user_id": user_id},
)
try:
result = await add_youtube_video_document(
session, url, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed YouTube video: {result.title}",
{
"document_id": result.id,
"video_id": result.document_metadata.get("video_id"),
"content_hash": result.content_hash,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process YouTube video: {url}",
str(e),
{"error_type": type(e).__name__},
)
import logging
logging.error(f"Error processing YouTube video: {e!s}")
async def process_file_in_background(
file_path: str,
filename: str,
search_space_id: int,
user_id: str,
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
):
try:
# Check if the file is a markdown or text file
if filename.lower().endswith((".md", ".markdown", ".txt")):
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
)
# For markdown files, read the content directly
with open(file_path, encoding="utf-8") as f:
markdown_content = f.read()
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
await task_logger.log_task_progress(
log_entry,
f"Creating document from markdown content: {filename}",
{
"processing_stage": "creating_document",
"content_length": len(markdown_content),
},
)
# Process markdown directly through specialized function
result = await add_received_markdown_file_document(
session, filename, markdown_content, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed markdown file: {filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "markdown",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Markdown file already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "markdown"},
)
# Check if the file is an audio file
elif filename.lower().endswith(
(".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
):
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
# Determine STT service type
stt_service_type = (
"local"
if app_config.STT_SERVICE
and app_config.STT_SERVICE.startswith("local/")
else "external"
)
# Check if using local STT service
if stt_service_type == "local":
# Use local Faster-Whisper for transcription
from app.services.stt_service import stt_service
try:
result = stt_service.transcribe_file(file_path)
transcribed_text = result.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
# Add metadata about the transcription
transcribed_text = (
f"# Transcription of {filename}\n\n{transcribed_text}"
)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to transcribe audio file {filename}: {e!s}",
) from e
await task_logger.log_task_progress(
log_entry,
f"Local STT transcription completed: {filename}",
{
"processing_stage": "local_transcription_complete",
"language": result.get("language"),
"confidence": result.get("language_probability"),
"duration": result.get("duration"),
},
)
else:
# Use LiteLLM for audio transcription
with open(file_path, "rb") as audio_file:
transcription_kwargs = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
transcription_kwargs["api_base"] = (
app_config.STT_SERVICE_API_BASE
)
transcription_response = await atranscription(
**transcription_kwargs
)
# Extract the transcribed text
transcribed_text = transcription_response.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
# Add metadata about the transcription
transcribed_text = (
f"# Transcription of {filename}\n\n{transcribed_text}"
)
await task_logger.log_task_progress(
log_entry,
f"Transcription completed, creating document: {filename}",
{
"processing_stage": "transcription_complete",
"transcript_length": len(transcribed_text),
},
)
# Clean up the temp file
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
# Process transcription as markdown document
result = await add_received_markdown_file_document(
session, filename, transcribed_text, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully transcribed and processed audio file: {filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "audio",
"transcript_length": len(transcribed_text),
"stt_service": stt_service_type,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Audio file transcript already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "audio"},
)
else:
if app_config.ETL_SERVICE == "UNSTRUCTURED":
await task_logger.log_task_progress(
log_entry,
f"Processing file with Unstructured ETL: {filename}",
{
"file_type": "document",
"etl_service": "UNSTRUCTURED",
"processing_stage": "loading",
},
)
from langchain_unstructured import UnstructuredLoader
# Process the file
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
await task_logger.log_task_progress(
log_entry,
f"Unstructured ETL completed, creating document: {filename}",
{"processing_stage": "etl_complete", "elements_count": len(docs)},
)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
# Pass the documents to the existing background task
result = await add_received_file_document_using_unstructured(
session, filename, docs, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Unstructured: {filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
},
)
elif app_config.ETL_SERVICE == "LLAMACLOUD":
await task_logger.log_task_progress(
log_entry,
f"Processing file with LlamaCloud ETL: {filename}",
{
"file_type": "document",
"etl_service": "LLAMACLOUD",
"processing_stage": "parsing",
},
)
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Create LlamaParse parser instance
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1, # Use single worker for file processing
verbose=True,
language="en",
result_type=ResultType.MD,
)
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
# Get markdown documents from the result
markdown_documents = await result.aget_markdown_documents(
split_by_page=False
)
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud parsing completed, creating documents: {filename}",
{
"processing_stage": "parsing_complete",
"documents_count": len(markdown_documents),
},
)
for doc in markdown_documents:
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id,
)
if doc_result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with LlamaCloud: {filename}",
{
"document_id": doc_result.id,
"content_hash": doc_result.content_hash,
"file_type": "document",
"etl_service": "LLAMACLOUD",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "LLAMACLOUD",
},
)
elif app_config.ETL_SERVICE == "DOCLING":
await task_logger.log_task_progress(
log_entry,
f"Processing file with Docling ETL: {filename}",
{
"file_type": "document",
"etl_service": "DOCLING",
"processing_stage": "parsing",
},
)
# Use Docling service for document processing
from app.services.docling_service import create_docling_service
# Create Docling service
docling_service = create_docling_service()
# Process the document
result = await docling_service.process_document(file_path, filename)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
await task_logger.log_task_progress(
log_entry,
f"Docling parsing completed, creating document: {filename}",
{
"processing_stage": "parsing_complete",
"content_length": len(result["content"]),
},
)
# Process the document using our Docling background task
doc_result = await add_received_file_document_using_docling(
session,
filename,
docling_markdown_document=result["content"],
search_space_id=search_space_id,
user_id=user_id,
)
if doc_result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Docling: {filename}",
{
"document_id": doc_result.id,
"content_hash": doc_result.content_hash,
"file_type": "document",
"etl_service": "DOCLING",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "DOCLING",
},
)
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to process file: {filename}",
str(e),
{"error_type": type(e).__name__, "filename": filename},
)
import logging
logging.error(f"Error processing file in background: {e!s}")
raise # Re-raise so the wrapper can also handle it

View file

@ -280,7 +280,7 @@ async def _process_file_upload(
file_path: str, filename: str, search_space_id: int, user_id: str file_path: str, filename: str, search_space_id: int, user_id: str
): ):
"""Process file upload with new session.""" """Process file upload with new session."""
from app.routes.documents_routes import process_file_in_background from app.tasks.document_processors.file_processors import process_file_in_background
async with get_celery_session_maker()() as session: async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id) task_logger = TaskLoggingService(session, search_space_id)

View file

@ -4,12 +4,16 @@ File document processors for different ETL services (Unstructured, LlamaCloud, D
import logging import logging
from fastapi import HTTPException
from langchain_core.documents import Document as LangChainDocument from langchain_core.documents import Document as LangChainDocument
from litellm import atranscription
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType from app.config import config as app_config
from app.db import Document, DocumentType, Log
from app.services.llm_service import get_user_long_context_llm from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import ( from app.utils.document_converters import (
convert_document_to_markdown, convert_document_to_markdown,
create_document_chunks, create_document_chunks,
@ -21,6 +25,7 @@ from app.utils.document_converters import (
from .base import ( from .base import (
check_document_by_unique_identifier, check_document_by_unique_identifier,
) )
from .markdown_processor import add_received_markdown_file_document
async def add_received_file_document_using_unstructured( async def add_received_file_document_using_unstructured(
@ -391,3 +396,418 @@ async def add_received_file_document_using_docling(
raise RuntimeError( raise RuntimeError(
f"Failed to process file document using Docling: {e!s}" f"Failed to process file document using Docling: {e!s}"
) from e ) from e
async def process_file_in_background(
file_path: str,
filename: str,
search_space_id: int,
user_id: str,
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
):
try:
# Check if the file is a markdown or text file
if filename.lower().endswith((".md", ".markdown", ".txt")):
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
)
# For markdown files, read the content directly
with open(file_path, encoding="utf-8") as f:
markdown_content = f.read()
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
await task_logger.log_task_progress(
log_entry,
f"Creating document from markdown content: {filename}",
{
"processing_stage": "creating_document",
"content_length": len(markdown_content),
},
)
# Process markdown directly through specialized function
result = await add_received_markdown_file_document(
session, filename, markdown_content, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed markdown file: {filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "markdown",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Markdown file already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "markdown"},
)
# Check if the file is an audio file
elif filename.lower().endswith(
(".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
):
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
# Determine STT service type
stt_service_type = (
"local"
if app_config.STT_SERVICE
and app_config.STT_SERVICE.startswith("local/")
else "external"
)
# Check if using local STT service
if stt_service_type == "local":
# Use local Faster-Whisper for transcription
from app.services.stt_service import stt_service
try:
result = stt_service.transcribe_file(file_path)
transcribed_text = result.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
# Add metadata about the transcription
transcribed_text = (
f"# Transcription of {filename}\n\n{transcribed_text}"
)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to transcribe audio file {filename}: {e!s}",
) from e
await task_logger.log_task_progress(
log_entry,
f"Local STT transcription completed: {filename}",
{
"processing_stage": "local_transcription_complete",
"language": result.get("language"),
"confidence": result.get("language_probability"),
"duration": result.get("duration"),
},
)
else:
# Use LiteLLM for audio transcription
with open(file_path, "rb") as audio_file:
transcription_kwargs = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
transcription_kwargs["api_base"] = (
app_config.STT_SERVICE_API_BASE
)
transcription_response = await atranscription(
**transcription_kwargs
)
# Extract the transcribed text
transcribed_text = transcription_response.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
# Add metadata about the transcription
transcribed_text = (
f"# Transcription of {filename}\n\n{transcribed_text}"
)
await task_logger.log_task_progress(
log_entry,
f"Transcription completed, creating document: {filename}",
{
"processing_stage": "transcription_complete",
"transcript_length": len(transcribed_text),
},
)
# Clean up the temp file
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
# Process transcription as markdown document
result = await add_received_markdown_file_document(
session, filename, transcribed_text, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully transcribed and processed audio file: {filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "audio",
"transcript_length": len(transcribed_text),
"stt_service": stt_service_type,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Audio file transcript already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "audio"},
)
else:
if app_config.ETL_SERVICE == "UNSTRUCTURED":
await task_logger.log_task_progress(
log_entry,
f"Processing file with Unstructured ETL: {filename}",
{
"file_type": "document",
"etl_service": "UNSTRUCTURED",
"processing_stage": "loading",
},
)
from langchain_unstructured import UnstructuredLoader
# Process the file
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
await task_logger.log_task_progress(
log_entry,
f"Unstructured ETL completed, creating document: {filename}",
{"processing_stage": "etl_complete", "elements_count": len(docs)},
)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
# Pass the documents to the existing background task
result = await add_received_file_document_using_unstructured(
session, filename, docs, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Unstructured: {filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
},
)
elif app_config.ETL_SERVICE == "LLAMACLOUD":
await task_logger.log_task_progress(
log_entry,
f"Processing file with LlamaCloud ETL: {filename}",
{
"file_type": "document",
"etl_service": "LLAMACLOUD",
"processing_stage": "parsing",
},
)
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Create LlamaParse parser instance
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1, # Use single worker for file processing
verbose=True,
language="en",
result_type=ResultType.MD,
)
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
# Get markdown documents from the result
markdown_documents = await result.aget_markdown_documents(
split_by_page=False
)
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud parsing completed, creating documents: {filename}",
{
"processing_stage": "parsing_complete",
"documents_count": len(markdown_documents),
},
)
for doc in markdown_documents:
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id,
)
if doc_result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with LlamaCloud: {filename}",
{
"document_id": doc_result.id,
"content_hash": doc_result.content_hash,
"file_type": "document",
"etl_service": "LLAMACLOUD",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "LLAMACLOUD",
},
)
elif app_config.ETL_SERVICE == "DOCLING":
await task_logger.log_task_progress(
log_entry,
f"Processing file with Docling ETL: {filename}",
{
"file_type": "document",
"etl_service": "DOCLING",
"processing_stage": "parsing",
},
)
# Use Docling service for document processing
from app.services.docling_service import create_docling_service
# Create Docling service
docling_service = create_docling_service()
# Process the document
result = await docling_service.process_document(file_path, filename)
# Clean up the temp file
import os
try:
os.unlink(file_path)
except Exception as e:
print("Error deleting temp file", e)
pass
await task_logger.log_task_progress(
log_entry,
f"Docling parsing completed, creating document: {filename}",
{
"processing_stage": "parsing_complete",
"content_length": len(result["content"]),
},
)
# Process the document using our Docling background task
doc_result = await add_received_file_document_using_docling(
session,
filename,
docling_markdown_document=result["content"],
search_space_id=search_space_id,
user_id=user_id,
)
if doc_result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Docling: {filename}",
{
"document_id": doc_result.id,
"content_hash": doc_result.content_hash,
"file_type": "document",
"etl_service": "DOCLING",
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Document already exists (duplicate): {filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "DOCLING",
},
)
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to process file: {filename}",
str(e),
{"error_type": type(e).__name__, "filename": filename},
)
import logging
logging.error(f"Error processing file in background: {e!s}")
raise # Re-raise so the wrapper can also handle it

View file

@ -1,10 +1,10 @@
"use client"; "use client";
import { Eye, EyeOff } from "lucide-react";
import { AnimatePresence, motion } from "motion/react"; import { AnimatePresence, motion } from "motion/react";
import Link from "next/link"; import Link from "next/link";
import { useRouter } from "next/navigation"; import { useRouter } from "next/navigation";
import { useEffect, useState } from "react"; import { useEffect, useState } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import { Eye, EyeOff } from "lucide-react";
import { getAuthErrorDetails, isNetworkError, shouldRetry } from "@/lib/auth-errors"; import { getAuthErrorDetails, isNetworkError, shouldRetry } from "@/lib/auth-errors";
export function LocalLoginForm() { export function LocalLoginForm() {

View file

@ -26,7 +26,7 @@ import {
import { Input } from "@/components/ui/input"; import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label"; import { Label } from "@/components/ui/label";
import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover"; import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover";
import type { ColumnVisibility, Document } from "./types"; import type { ColumnVisibility } from "./types";
const fadeInScale: Variants = { const fadeInScale: Variants = {
hidden: { opacity: 0, scale: 0.95 }, hidden: { opacity: 0, scale: 0.95 },
@ -35,8 +35,7 @@ const fadeInScale: Variants = {
}; };
export function DocumentsFilters({ export function DocumentsFilters({
allDocuments, typeCounts: typeCountsRecord,
visibleDocuments: _visibleDocuments,
selectedIds, selectedIds,
onSearch, onSearch,
searchValue, searchValue,
@ -46,8 +45,7 @@ export function DocumentsFilters({
columnVisibility, columnVisibility,
onToggleColumn, onToggleColumn,
}: { }: {
allDocuments: Document[]; typeCounts: Record<string, number>;
visibleDocuments: Document[];
selectedIds: Set<number>; selectedIds: Set<number>;
onSearch: (v: string) => void; onSearch: (v: string) => void;
searchValue: string; searchValue: string;
@ -61,16 +59,16 @@ export function DocumentsFilters({
const inputRef = useRef<HTMLInputElement>(null); const inputRef = useRef<HTMLInputElement>(null);
const uniqueTypes = useMemo(() => { const uniqueTypes = useMemo(() => {
const set = new Set<string>(); return Object.keys(typeCountsRecord).sort();
for (const d of allDocuments) set.add(d.document_type); }, [typeCountsRecord]);
return Array.from(set).sort();
}, [allDocuments]);
const typeCounts = useMemo(() => { const typeCounts = useMemo(() => {
const map = new Map<string, number>(); const map = new Map<string, number>();
for (const d of allDocuments) map.set(d.document_type, (map.get(d.document_type) ?? 0) + 1); for (const [type, count] of Object.entries(typeCountsRecord)) {
map.set(type, count);
}
return map; return map;
}, [allDocuments]); }, [typeCountsRecord]);
return ( return (
<motion.div <motion.div

View file

@ -40,40 +40,59 @@ export default function DocumentsTable() {
const [sortKey, setSortKey] = useState<SortKey>("title"); const [sortKey, setSortKey] = useState<SortKey>("title");
const [sortDesc, setSortDesc] = useState(false); const [sortDesc, setSortDesc] = useState(false);
const [selectedIds, setSelectedIds] = useState<Set<number>>(new Set()); const [selectedIds, setSelectedIds] = useState<Set<number>>(new Set());
const [typeCounts, setTypeCounts] = useState<Record<string, number>>({});
// Use server-side pagination and search // Use server-side pagination, search, and filtering
const { documents, total, loading, error, fetchDocuments, searchDocuments, deleteDocument } = const {
useDocuments(searchSpaceId, { documents,
total,
loading,
error,
fetchDocuments,
searchDocuments,
deleteDocument,
getDocumentTypeCounts,
} = useDocuments(searchSpaceId, {
page: pageIndex, page: pageIndex,
pageSize: pageSize, pageSize: pageSize,
}); });
// Fetch document type counts on mount and when search space changes
useEffect(() => {
if (searchSpaceId && getDocumentTypeCounts) {
getDocumentTypeCounts().then(setTypeCounts);
}
}, [searchSpaceId, getDocumentTypeCounts]);
// Refetch when pagination changes or when search/filters change // Refetch when pagination changes or when search/filters change
useEffect(() => { useEffect(() => {
if (searchSpaceId) { if (searchSpaceId) {
if (debouncedSearch.trim()) { if (debouncedSearch.trim()) {
// Use search endpoint if there's a search query // Use search endpoint if there's a search query
searchDocuments?.(debouncedSearch, pageIndex, pageSize); searchDocuments?.(
debouncedSearch,
pageIndex,
pageSize,
activeTypes.length > 0 ? activeTypes : undefined
);
} else { } else {
// Use regular fetch if no search // Use regular fetch if no search
fetchDocuments?.(pageIndex, pageSize); fetchDocuments?.(pageIndex, pageSize, activeTypes.length > 0 ? activeTypes : undefined);
} }
} }
}, [pageIndex, pageSize, debouncedSearch, searchSpaceId, fetchDocuments, searchDocuments]); }, [
pageIndex,
pageSize,
debouncedSearch,
activeTypes,
searchSpaceId,
fetchDocuments,
searchDocuments,
]);
// Client-side filtering for document types only // Display server-filtered results directly
// Note: This could also be moved to the backend for better performance const displayDocs = documents || [];
const filtered = useMemo(() => { const displayTotal = total;
let result = documents || [];
if (activeTypes.length > 0) {
result = result.filter((d) => activeTypes.includes(d.document_type));
}
return result;
}, [documents, activeTypes]);
// Display filtered results
const displayDocs = filtered;
const displayTotal = activeTypes.length > 0 ? filtered.length : total;
const pageStart = pageIndex * pageSize; const pageStart = pageIndex * pageSize;
const pageEnd = Math.min(pageStart + pageSize, displayTotal); const pageEnd = Math.min(pageStart + pageSize, displayTotal);
@ -88,11 +107,16 @@ export default function DocumentsTable() {
const refreshCurrentView = useCallback(async () => { const refreshCurrentView = useCallback(async () => {
if (debouncedSearch.trim()) { if (debouncedSearch.trim()) {
await searchDocuments?.(debouncedSearch, pageIndex, pageSize); await searchDocuments?.(
debouncedSearch,
pageIndex,
pageSize,
activeTypes.length > 0 ? activeTypes : undefined
);
} else { } else {
await fetchDocuments?.(pageIndex, pageSize); await fetchDocuments?.(pageIndex, pageSize, activeTypes.length > 0 ? activeTypes : undefined);
} }
}, [debouncedSearch, pageIndex, pageSize, searchDocuments, fetchDocuments]); }, [debouncedSearch, pageIndex, pageSize, activeTypes, searchDocuments, fetchDocuments]);
const onBulkDelete = async () => { const onBulkDelete = async () => {
if (selectedIds.size === 0) { if (selectedIds.size === 0) {
@ -133,8 +157,7 @@ export default function DocumentsTable() {
className="w-full px-6 py-4" className="w-full px-6 py-4"
> >
<DocumentsFilters <DocumentsFilters
allDocuments={documents || []} typeCounts={typeCounts}
visibleDocuments={displayDocs}
selectedIds={selectedIds} selectedIds={selectedIds}
onSearch={setSearch} onSearch={setSearch}
searchValue={search} searchValue={search}

View file

@ -36,12 +36,13 @@ export interface UseDocumentsOptions {
page?: number; page?: number;
pageSize?: number; pageSize?: number;
lazy?: boolean; lazy?: boolean;
documentTypes?: string[];
} }
export function useDocuments(searchSpaceId: number, options?: UseDocumentsOptions | boolean) { export function useDocuments(searchSpaceId: number, options?: UseDocumentsOptions | boolean) {
// Support both old boolean API and new options API for backward compatibility // Support both old boolean API and new options API for backward compatibility
const opts = typeof options === "boolean" ? { lazy: options } : options || {}; const opts = typeof options === "boolean" ? { lazy: options } : options || {};
const { page, pageSize = 300, lazy = false } = opts; const { page, pageSize = 300, lazy = false, documentTypes } = opts;
const [documents, setDocuments] = useState<Document[]>([]); const [documents, setDocuments] = useState<Document[]>([]);
const [total, setTotal] = useState(0); const [total, setTotal] = useState(0);
@ -50,7 +51,7 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
const [isLoaded, setIsLoaded] = useState(false); // Memoization flag const [isLoaded, setIsLoaded] = useState(false); // Memoization flag
const fetchDocuments = useCallback( const fetchDocuments = useCallback(
async (fetchPage?: number, fetchPageSize?: number) => { async (fetchPage?: number, fetchPageSize?: number, fetchDocumentTypes?: string[]) => {
if (isLoaded && lazy) return; // Avoid redundant calls in lazy mode if (isLoaded && lazy) return; // Avoid redundant calls in lazy mode
try { try {
@ -64,6 +65,8 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
// Use passed parameters or fall back to state/options // Use passed parameters or fall back to state/options
const effectivePage = fetchPage !== undefined ? fetchPage : page; const effectivePage = fetchPage !== undefined ? fetchPage : page;
const effectivePageSize = fetchPageSize !== undefined ? fetchPageSize : pageSize; const effectivePageSize = fetchPageSize !== undefined ? fetchPageSize : pageSize;
const effectiveDocumentTypes =
fetchDocumentTypes !== undefined ? fetchDocumentTypes : documentTypes;
if (effectivePage !== undefined) { if (effectivePage !== undefined) {
params.append("page", effectivePage.toString()); params.append("page", effectivePage.toString());
@ -71,6 +74,9 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
if (effectivePageSize !== undefined) { if (effectivePageSize !== undefined) {
params.append("page_size", effectivePageSize.toString()); params.append("page_size", effectivePageSize.toString());
} }
if (effectiveDocumentTypes && effectiveDocumentTypes.length > 0) {
params.append("document_types", effectiveDocumentTypes.join(","));
}
const response = await fetch( const response = await fetch(
`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents?${params.toString()}`, `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents?${params.toString()}`,
@ -100,7 +106,7 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
setLoading(false); setLoading(false);
} }
}, },
[searchSpaceId, page, pageSize, isLoaded, lazy] [searchSpaceId, page, pageSize, documentTypes, isLoaded, lazy]
); );
useEffect(() => { useEffect(() => {
@ -117,10 +123,15 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
// Function to search documents by title // Function to search documents by title
const searchDocuments = useCallback( const searchDocuments = useCallback(
async (searchQuery: string, fetchPage?: number, fetchPageSize?: number) => { async (
searchQuery: string,
fetchPage?: number,
fetchPageSize?: number,
fetchDocumentTypes?: string[]
) => {
if (!searchQuery.trim()) { if (!searchQuery.trim()) {
// If search is empty, fetch all documents // If search is empty, fetch all documents
return fetchDocuments(fetchPage, fetchPageSize); return fetchDocuments(fetchPage, fetchPageSize, fetchDocumentTypes);
} }
try { try {
@ -135,6 +146,8 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
// Use passed parameters or fall back to state/options // Use passed parameters or fall back to state/options
const effectivePage = fetchPage !== undefined ? fetchPage : page; const effectivePage = fetchPage !== undefined ? fetchPage : page;
const effectivePageSize = fetchPageSize !== undefined ? fetchPageSize : pageSize; const effectivePageSize = fetchPageSize !== undefined ? fetchPageSize : pageSize;
const effectiveDocumentTypes =
fetchDocumentTypes !== undefined ? fetchDocumentTypes : documentTypes;
if (effectivePage !== undefined) { if (effectivePage !== undefined) {
params.append("page", effectivePage.toString()); params.append("page", effectivePage.toString());
@ -142,6 +155,9 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
if (effectivePageSize !== undefined) { if (effectivePageSize !== undefined) {
params.append("page_size", effectivePageSize.toString()); params.append("page_size", effectivePageSize.toString());
} }
if (effectiveDocumentTypes && effectiveDocumentTypes.length > 0) {
params.append("document_types", effectiveDocumentTypes.join(","));
}
const response = await fetch( const response = await fetch(
`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/search/?${params.toString()}`, `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/search/?${params.toString()}`,
@ -170,7 +186,7 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
setLoading(false); setLoading(false);
} }
}, },
[searchSpaceId, page, pageSize, fetchDocuments] [searchSpaceId, page, pageSize, documentTypes, fetchDocuments]
); );
// Function to delete a document // Function to delete a document
@ -205,6 +221,35 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
[documents] [documents]
); );
// Function to get document type counts
const getDocumentTypeCounts = useCallback(async () => {
try {
const params = new URLSearchParams({
search_space_id: searchSpaceId.toString(),
});
const response = await fetch(
`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/type-counts/?${params.toString()}`,
{
headers: {
Authorization: `Bearer ${localStorage.getItem("surfsense_bearer_token")}`,
},
method: "GET",
}
);
if (!response.ok) {
throw new Error("Failed to fetch document type counts");
}
const counts = await response.json();
return counts as Record<string, number>;
} catch (err: any) {
console.error("Error fetching document type counts:", err);
return {};
}
}, [searchSpaceId]);
return { return {
documents, documents,
total, total,
@ -215,5 +260,6 @@ export function useDocuments(searchSpaceId: number, options?: UseDocumentsOption
searchDocuments, // Search function searchDocuments, // Search function
refreshDocuments, refreshDocuments,
deleteDocument, deleteDocument,
getDocumentTypeCounts, // Get type counts function
}; };
} }