diff --git a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py index 2e1a803d8..b4438ce4d 100644 --- a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py +++ b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py @@ -80,7 +80,14 @@ class EtlPipelineService: request.filename, ) - return await self._extract_document(request) + try: + return await self._extract_document(request) + except (EtlUnsupportedFileError, EtlServiceUnavailableError): + raise EtlUnsupportedFileError( + f"Cannot process image {request.filename}: vision LLM " + f"{'failed' if self._vision_llm else 'not configured'} and " + f"document parser does not support this format" + ) from None async def _extract_document(self, request: EtlRequest) -> EtlResult: from pathlib import PurePosixPath diff --git a/surfsense_backend/app/etl_pipeline/parsers/vision_llm.py b/surfsense_backend/app/etl_pipeline/parsers/vision_llm.py index d3b778801..c80fbca0a 100644 --- a/surfsense_backend/app/etl_pipeline/parsers/vision_llm.py +++ b/surfsense_backend/app/etl_pipeline/parsers/vision_llm.py @@ -1,3 +1,4 @@ +import asyncio import base64 import os @@ -13,6 +14,8 @@ _MAX_IMAGE_BYTES = ( 5 * 1024 * 1024 ) # 5 MB (Anthropic Claude's limit, the most restrictive) +_INVOKE_TIMEOUT_SECONDS = 120 + _EXT_TO_MIME: dict[str, str] = { ".png": "image/png", ".jpg": "image/jpeg", @@ -52,7 +55,9 @@ async def parse_with_vision_llm(file_path: str, filename: str, llm) -> str: {"type": "image_url", "image_url": {"url": data_url}}, ] ) - response = await llm.ainvoke([message]) + response = await asyncio.wait_for( + llm.ainvoke([message]), timeout=_INVOKE_TIMEOUT_SECONDS + ) text = response.content if hasattr(response, "content") else str(response) if not text or not text.strip(): raise ValueError(f"Vision LLM returned empty content for {filename}") diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 8093084f0..25841a107 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -2,7 +2,7 @@ import asyncio from fastapi import APIRouter, Depends, Form, HTTPException, Query, UploadFile -from pydantic import BaseModel as PydanticBaseModel +from pydantic import BaseModel as PydanticBaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload @@ -1395,10 +1395,13 @@ class FolderMtimeCheckFile(PydanticBaseModel): mtime: float +_MAX_MTIME_CHECK_FILES = 10_000 + + class FolderMtimeCheckRequest(PydanticBaseModel): folder_name: str search_space_id: int - files: list[FolderMtimeCheckFile] + files: list[FolderMtimeCheckFile] = Field(max_length=_MAX_MTIME_CHECK_FILES) class FolderUnlinkRequest(PydanticBaseModel): @@ -1531,6 +1534,23 @@ async def folder_upload( f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.", ) + from app.services.folder_service import MAX_FOLDER_DEPTH + + max_subfolder_depth = max((p.count("/") for p in rel_paths if "/" in p), default=0) + if 1 + max_subfolder_depth > MAX_FOLDER_DEPTH: + raise HTTPException( + status_code=400, + detail=f"Folder structure too deep: {1 + max_subfolder_depth} levels " + f"exceeds the maximum of {MAX_FOLDER_DEPTH}.", + ) + + if root_folder_id: + root_folder = await session.get(Folder, root_folder_id) + if not root_folder or root_folder.search_space_id != search_space_id: + raise HTTPException( + status_code=404, detail="Root folder not found in this search space" + ) + if not root_folder_id: watched_metadata = { "watched": True, diff --git a/surfsense_backend/app/services/export_service.py b/surfsense_backend/app/services/export_service.py index 2d36bfaab..97f952223 100644 --- a/surfsense_backend/app/services/export_service.py +++ b/surfsense_backend/app/services/export_service.py @@ -1,5 +1,6 @@ """Service for exporting knowledge base content as a ZIP archive.""" +import asyncio import logging import os import tempfile @@ -106,23 +107,38 @@ async def build_export_zip( folder_path_map = _build_folder_path_map(folders) - doc_query = select(Document).where(Document.search_space_id == search_space_id) + batch_size = 100 + + base_doc_query = select(Document).where(Document.search_space_id == search_space_id) if target_folder_ids is not None: - doc_query = doc_query.where(Document.folder_id.in_(target_folder_ids)) - doc_result = await session.execute(doc_query) - documents = list(doc_result.scalars().all()) + base_doc_query = base_doc_query.where(Document.folder_id.in_(target_folder_ids)) + base_doc_query = base_doc_query.order_by(Document.id) fd, tmp_path = tempfile.mkstemp(suffix=".zip") os.close(fd) - try: - used_paths: dict[str, int] = {} - skipped_docs: list[str] = [] + used_paths: dict[str, int] = {} + skipped_docs: list[str] = [] + is_first_batch = True + + try: + offset = 0 + while True: + batch_query = base_doc_query.limit(batch_size).offset(offset) + batch_result = await session.execute(batch_query) + documents = list(batch_result.scalars().all()) + if not documents: + break + + entries: list[tuple[str, str]] = [] - with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zf: for doc in documents: status = doc.status or {} - state = status.get("state", "ready") if isinstance(status, dict) else "ready" + state = ( + status.get("state", "ready") + if isinstance(status, dict) + else "ready" + ) if state in ("pending", "processing"): skipped_docs.append(doc.title or "Untitled") continue @@ -137,7 +153,9 @@ async def build_export_zip( dir_path = "" base_name = _sanitize_filename(doc.title or "Untitled") - file_path = f"{dir_path}/{base_name}.md" if dir_path else f"{base_name}.md" + file_path = ( + f"{dir_path}/{base_name}.md" if dir_path else f"{base_name}.md" + ) if file_path in used_paths: used_paths[file_path] += 1 @@ -149,7 +167,21 @@ async def build_export_zip( ) used_paths[file_path] = used_paths.get(file_path, 0) + 1 - zf.writestr(file_path, markdown) + entries.append((file_path, markdown)) + + if entries: + mode = "w" if is_first_batch else "a" + batch_entries = entries + + def _write_batch(m: str = mode, e: list = batch_entries) -> None: + with zipfile.ZipFile(tmp_path, m, zipfile.ZIP_DEFLATED) as zf: + for path, content in e: + zf.writestr(path, content) + + await asyncio.to_thread(_write_batch) + is_first_batch = False + + offset += batch_size export_name = "knowledge-base" if folder_id is not None and folder_id in folder_path_map: