Merge remote-tracking branch 'upstream/dev' into feat/local-folder-sync

This commit is contained in:
Anish Sarkar 2026-04-03 11:42:43 +05:30
commit 62b44889d1
66 changed files with 3359 additions and 2626 deletions

View file

@ -1,7 +1,7 @@
# Force asyncio to use standard event loop before unstructured imports
import asyncio
from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile
from fastapi import APIRouter, Depends, Form, HTTPException, Query, UploadFile
from pydantic import BaseModel as PydanticBaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
@ -20,6 +20,7 @@ from app.db import (
get_async_session,
)
from app.schemas import (
ChunkRead,
DocumentRead,
DocumentsCreate,
DocumentStatusBatchResponse,
@ -49,9 +50,7 @@ os.environ["UNSTRUCTURED_HAS_PATCHED_LOOP"] = "1"
router = APIRouter()
MAX_FILES_PER_UPLOAD = 10
MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024 # 50 MB per file
MAX_TOTAL_SIZE_BYTES = 200 * 1024 * 1024 # 200 MB total
MAX_FILE_SIZE_BYTES = 500 * 1024 * 1024 # 500 MB per file
@router.post("/documents")
@ -160,13 +159,6 @@ async def create_documents_file_upload(
if not files:
raise HTTPException(status_code=400, detail="No files provided")
if len(files) > MAX_FILES_PER_UPLOAD:
raise HTTPException(
status_code=413,
detail=f"Too many files. Maximum {MAX_FILES_PER_UPLOAD} files per upload.",
)
total_size = 0
for file in files:
file_size = file.size or 0
if file_size > MAX_FILE_SIZE_BYTES:
@ -175,14 +167,6 @@ async def create_documents_file_upload(
detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
)
total_size += file_size
if total_size > MAX_TOTAL_SIZE_BYTES:
raise HTTPException(
status_code=413,
detail=f"Total upload size ({total_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)
# ===== Read all files concurrently to avoid blocking the event loop =====
async def _read_and_save(file: UploadFile) -> tuple[str, str, int]:
@ -210,16 +194,6 @@ async def create_documents_file_upload(
saved_files = await asyncio.gather(*(_read_and_save(f) for f in files))
actual_total_size = sum(size for _, _, size in saved_files)
if actual_total_size > MAX_TOTAL_SIZE_BYTES:
for temp_path, _, _ in saved_files:
os.unlink(temp_path)
raise HTTPException(
status_code=413,
detail=f"Total upload size ({actual_total_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)
# ===== PHASE 1: Create pending documents for all files =====
created_documents: list[Document] = []
files_to_process: list[tuple[Document, str, str]] = []
@ -455,13 +429,15 @@ async def read_documents(
reason=doc.status.get("reason"),
)
raw_content = doc.content or ""
api_documents.append(
DocumentRead(
id=doc.id,
title=doc.title,
document_type=doc.document_type,
document_metadata=doc.document_metadata,
content=doc.content,
content="",
content_preview=raw_content[:300],
content_hash=doc.content_hash,
unique_identifier_hash=doc.unique_identifier_hash,
created_at=doc.created_at,
@ -613,13 +589,15 @@ async def search_documents(
reason=doc.status.get("reason"),
)
raw_content = doc.content or ""
api_documents.append(
DocumentRead(
id=doc.id,
title=doc.title,
document_type=doc.document_type,
document_metadata=doc.document_metadata,
content=doc.content,
content="",
content_preview=raw_content[:300],
content_hash=doc.content_hash,
unique_identifier_hash=doc.unique_identifier_hash,
created_at=doc.created_at,
@ -888,16 +866,19 @@ async def get_document_type_counts(
@router.get("/documents/by-chunk/{chunk_id}", response_model=DocumentWithChunksRead)
async def get_document_by_chunk_id(
chunk_id: int,
chunk_window: int = Query(
5, ge=0, description="Number of chunks before/after the cited chunk to include"
),
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""
Retrieves a document based on a chunk ID, including all its chunks ordered by creation time.
Requires DOCUMENTS_READ permission for the search space.
The document's embedding and chunk embeddings are excluded from the response.
Retrieves a document based on a chunk ID, including a window of chunks around the cited one.
Uses SQL-level pagination to avoid loading all chunks into memory.
"""
try:
# First, get the chunk and verify it exists
from sqlalchemy import and_, func, or_
chunk_result = await session.execute(select(Chunk).filter(Chunk.id == chunk_id))
chunk = chunk_result.scalars().first()
@ -906,11 +887,8 @@ async def get_document_by_chunk_id(
status_code=404, detail=f"Chunk with id {chunk_id} not found"
)
# Get the associated document
document_result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.filter(Document.id == chunk.document_id)
select(Document).filter(Document.id == chunk.document_id)
)
document = document_result.scalars().first()
@ -920,7 +898,6 @@ async def get_document_by_chunk_id(
detail="Document not found",
)
# Check permission for the search space
await check_permission(
session,
user,
@ -929,10 +906,38 @@ async def get_document_by_chunk_id(
"You don't have permission to read documents in this search space",
)
# Sort chunks by creation time
sorted_chunks = sorted(document.chunks, key=lambda x: x.created_at)
total_result = await session.execute(
select(func.count())
.select_from(Chunk)
.filter(Chunk.document_id == document.id)
)
total_chunks = total_result.scalar() or 0
cited_idx_result = await session.execute(
select(func.count())
.select_from(Chunk)
.filter(
Chunk.document_id == document.id,
or_(
Chunk.created_at < chunk.created_at,
and_(Chunk.created_at == chunk.created_at, Chunk.id < chunk.id),
),
)
)
cited_idx = cited_idx_result.scalar() or 0
start = max(0, cited_idx - chunk_window)
end = min(total_chunks, cited_idx + chunk_window + 1)
windowed_result = await session.execute(
select(Chunk)
.filter(Chunk.document_id == document.id)
.order_by(Chunk.created_at, Chunk.id)
.offset(start)
.limit(end - start)
)
windowed_chunks = windowed_result.scalars().all()
# Return the document with its chunks
return DocumentWithChunksRead(
id=document.id,
title=document.title,
@ -944,7 +949,9 @@ async def get_document_by_chunk_id(
created_at=document.created_at,
updated_at=document.updated_at,
search_space_id=document.search_space_id,
chunks=sorted_chunks,
chunks=windowed_chunks,
total_chunks=total_chunks,
chunk_start_index=start,
)
except HTTPException:
raise
@ -983,6 +990,75 @@ async def get_watched_folders(
return folders
@router.get(
"/documents/{document_id}/chunks",
response_model=PaginatedResponse[ChunkRead],
)
async def get_document_chunks_paginated(
document_id: int,
page: int = Query(0, ge=0),
page_size: int = Query(20, ge=1, le=100),
start_offset: int | None = Query(
None, ge=0, description="Direct offset; overrides page * page_size"
),
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""
Paginated chunk loading for a document.
Supports both page-based and offset-based access.
"""
try:
from sqlalchemy import func
doc_result = await session.execute(
select(Document).filter(Document.id == document_id)
)
document = doc_result.scalars().first()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
await check_permission(
session,
user,
document.search_space_id,
Permission.DOCUMENTS_READ.value,
"You don't have permission to read documents in this search space",
)
total_result = await session.execute(
select(func.count())
.select_from(Chunk)
.filter(Chunk.document_id == document_id)
)
total = total_result.scalar() or 0
offset = start_offset if start_offset is not None else page * page_size
chunks_result = await session.execute(
select(Chunk)
.filter(Chunk.document_id == document_id)
.order_by(Chunk.created_at, Chunk.id)
.offset(offset)
.limit(page_size)
)
chunks = chunks_result.scalars().all()
return PaginatedResponse(
items=chunks,
total=total,
page=offset // page_size if page_size else page,
page_size=page_size,
has_more=(offset + len(chunks)) < total,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to fetch chunks: {e!s}"
) from e
@router.get("/documents/{document_id}", response_model=DocumentRead)
async def read_document(
document_id: int,
@ -1013,13 +1089,14 @@ async def read_document(
"You don't have permission to read documents in this search space",
)
# Convert database object to API-friendly format
raw_content = document.content or ""
return DocumentRead(
id=document.id,
title=document.title,
document_type=document.document_type,
document_metadata=document.document_metadata,
content=document.content,
content=raw_content,
content_preview=raw_content[:300],
content_hash=document.content_hash,
unique_identifier_hash=document.unique_identifier_hash,
created_at=document.created_at,