feat(documents): persist original upload bytes on file upload

This commit is contained in:
CREDO23 2026-06-02 16:10:50 +02:00
parent 7dad9ec3a9
commit a4b31929d0

View file

@ -44,10 +44,12 @@ except RuntimeError as e:
print("Error setting event loop policy", e) print("Error setting event loop policy", e)
pass pass
import logging
import os import os
os.environ["UNSTRUCTURED_HAS_PATCHED_LOOP"] = "1" os.environ["UNSTRUCTURED_HAS_PATCHED_LOOP"] = "1"
logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@ -142,9 +144,11 @@ async def create_documents_file_upload(
import os import os
import tempfile import tempfile
from datetime import datetime from datetime import datetime
from pathlib import Path
from app.db import DocumentStatus from app.db import DocumentStatus
from app.etl_pipeline.etl_document import ProcessingMode from app.etl_pipeline.etl_document import ProcessingMode
from app.file_storage.service import store_document_file
from app.tasks.document_processors.base import ( from app.tasks.document_processors.base import (
check_document_by_unique_identifier, check_document_by_unique_identifier,
get_current_timestamp, get_current_timestamp,
@ -175,11 +179,12 @@ async def create_documents_file_upload(
) )
# ===== Read all files concurrently to avoid blocking the event loop ===== # ===== Read all files concurrently to avoid blocking the event loop =====
async def _read_and_save(file: UploadFile) -> tuple[str, str, int]: async def _read_and_save(file: UploadFile) -> tuple[str, str, int, str | None]:
"""Read upload content and write to temp file off the event loop.""" """Read upload content and write to temp file off the event loop."""
content = await file.read() content = await file.read()
file_size = len(content) file_size = len(content)
filename = file.filename or "unknown" filename = file.filename or "unknown"
content_type = file.content_type
if file_size > MAX_FILE_SIZE_BYTES: if file_size > MAX_FILE_SIZE_BYTES:
raise HTTPException( raise HTTPException(
@ -196,17 +201,18 @@ async def create_documents_file_upload(
return tmp.name return tmp.name
temp_path = await asyncio.to_thread(_write_temp) temp_path = await asyncio.to_thread(_write_temp)
return temp_path, filename, file_size return temp_path, filename, file_size, content_type
saved_files = await asyncio.gather(*(_read_and_save(f) for f in files)) saved_files = await asyncio.gather(*(_read_and_save(f) for f in files))
# ===== PHASE 1: Create pending documents for all files ===== # ===== PHASE 1: Create pending documents for all files =====
created_documents: list[Document] = [] created_documents: list[Document] = []
files_to_process: list[tuple[Document, str, str]] = [] # (document, temp_path, filename, content_type)
files_to_process: list[tuple[Document, str, str, str | None]] = []
skipped_duplicates = 0 skipped_duplicates = 0
duplicate_document_ids: list[int] = [] duplicate_document_ids: list[int] = []
for temp_path, filename, file_size in saved_files: for temp_path, filename, file_size, content_type in saved_files:
try: try:
unique_identifier_hash = generate_unique_identifier_hash( unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id DocumentType.FILE, filename, search_space_id
@ -231,7 +237,9 @@ async def create_documents_file_upload(
} }
existing.updated_at = get_current_timestamp() existing.updated_at = get_current_timestamp()
created_documents.append(existing) created_documents.append(existing)
files_to_process.append((existing, temp_path, filename)) files_to_process.append(
(existing, temp_path, filename, content_type)
)
continue continue
document = Document( document = Document(
@ -253,7 +261,7 @@ async def create_documents_file_upload(
) )
session.add(document) session.add(document)
created_documents.append(document) created_documents.append(document)
files_to_process.append((document, temp_path, filename)) files_to_process.append((document, temp_path, filename, content_type))
except HTTPException: except HTTPException:
raise raise
@ -269,8 +277,32 @@ async def create_documents_file_upload(
for doc in created_documents: for doc in created_documents:
await session.refresh(doc) await session.refresh(doc)
# ===== PHASE 1.5: Persist the original uploads to durable storage =====
# Best-effort: a storage failure must not block parsing or the response.
for document, temp_path, filename, content_type in files_to_process:
try:
original_bytes = await asyncio.to_thread(
lambda p=temp_path: Path(p).read_bytes()
)
await store_document_file(
session,
document_id=document.id,
search_space_id=search_space_id,
data=original_bytes,
filename=filename,
mime_type=content_type,
created_by_id=str(user.id),
)
except Exception as storage_error:
logger.warning(
"Failed to store original upload for document %s: %s",
document.id,
storage_error,
)
await session.commit()
# ===== PHASE 2: Dispatch tasks for each file ===== # ===== PHASE 2: Dispatch tasks for each file =====
for document, temp_path, filename in files_to_process: for document, temp_path, filename, _content_type in files_to_process:
await dispatcher.dispatch_file_processing( await dispatcher.dispatch_file_processing(
document_id=document.id, document_id=document.id,
temp_path=temp_path, temp_path=temp_path,