From a4b31929d0a71aa36554bfae20686ff1901407f6 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 2 Jun 2026 16:10:50 +0200 Subject: [PATCH] feat(documents): persist original upload bytes on file upload --- .../app/routes/documents_routes.py | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 96c5d2344..4501f2111 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -44,10 +44,12 @@ except RuntimeError as e: print("Error setting event loop policy", e) pass +import logging import os os.environ["UNSTRUCTURED_HAS_PATCHED_LOOP"] = "1" +logger = logging.getLogger(__name__) router = APIRouter() @@ -142,9 +144,11 @@ async def create_documents_file_upload( import os import tempfile from datetime import datetime + from pathlib import Path from app.db import DocumentStatus from app.etl_pipeline.etl_document import ProcessingMode + from app.file_storage.service import store_document_file from app.tasks.document_processors.base import ( check_document_by_unique_identifier, get_current_timestamp, @@ -175,11 +179,12 @@ async def create_documents_file_upload( ) # ===== Read all files concurrently to avoid blocking the event loop ===== - async def _read_and_save(file: UploadFile) -> tuple[str, str, int]: + async def _read_and_save(file: UploadFile) -> tuple[str, str, int, str | None]: """Read upload content and write to temp file off the event loop.""" content = await file.read() file_size = len(content) filename = file.filename or "unknown" + content_type = file.content_type if file_size > MAX_FILE_SIZE_BYTES: raise HTTPException( @@ -196,17 +201,18 @@ async def create_documents_file_upload( return tmp.name temp_path = await asyncio.to_thread(_write_temp) - return temp_path, filename, file_size + return temp_path, filename, file_size, content_type saved_files = await asyncio.gather(*(_read_and_save(f) for f in files)) # ===== PHASE 1: Create pending documents for all files ===== created_documents: list[Document] = [] - files_to_process: list[tuple[Document, str, str]] = [] + # (document, temp_path, filename, content_type) + files_to_process: list[tuple[Document, str, str, str | None]] = [] skipped_duplicates = 0 duplicate_document_ids: list[int] = [] - for temp_path, filename, file_size in saved_files: + for temp_path, filename, file_size, content_type in saved_files: try: unique_identifier_hash = generate_unique_identifier_hash( DocumentType.FILE, filename, search_space_id @@ -231,7 +237,9 @@ async def create_documents_file_upload( } existing.updated_at = get_current_timestamp() created_documents.append(existing) - files_to_process.append((existing, temp_path, filename)) + files_to_process.append( + (existing, temp_path, filename, content_type) + ) continue document = Document( @@ -253,7 +261,7 @@ async def create_documents_file_upload( ) session.add(document) created_documents.append(document) - files_to_process.append((document, temp_path, filename)) + files_to_process.append((document, temp_path, filename, content_type)) except HTTPException: raise @@ -269,8 +277,32 @@ async def create_documents_file_upload( for doc in created_documents: await session.refresh(doc) + # ===== PHASE 1.5: Persist the original uploads to durable storage ===== + # Best-effort: a storage failure must not block parsing or the response. + for document, temp_path, filename, content_type in files_to_process: + try: + original_bytes = await asyncio.to_thread( + lambda p=temp_path: Path(p).read_bytes() + ) + await store_document_file( + session, + document_id=document.id, + search_space_id=search_space_id, + data=original_bytes, + filename=filename, + mime_type=content_type, + created_by_id=str(user.id), + ) + except Exception as storage_error: + logger.warning( + "Failed to store original upload for document %s: %s", + document.id, + storage_error, + ) + await session.commit() + # ===== PHASE 2: Dispatch tasks for each file ===== - for document, temp_path, filename in files_to_process: + for document, temp_path, filename, _content_type in files_to_process: await dispatcher.dispatch_file_processing( document_id=document.id, temp_path=temp_path,