SurfSense/surfsense_backend/app/file_storage/service.py

129 lines
3.9 KiB
Python

"""Application service: persist, locate, and remove a document's stored files.
Coordinates the storage backend (bytes) with the ``document_files`` table
(metadata). Callers own the surrounding DB transaction/commit.
"""
from __future__ import annotations
import hashlib
import logging
from collections.abc import AsyncIterator, Sequence
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.file_storage.backends.base import StorageBackend
from app.file_storage.factory import get_storage_backend
from app.file_storage.keys import build_document_file_key
from app.file_storage.persistence.enums import DocumentFileKind
from app.file_storage.persistence.models import DocumentFile
logger = logging.getLogger(__name__)
async def store_document_file(
session: AsyncSession,
*,
document_id: int,
search_space_id: int,
data: bytes,
filename: str,
mime_type: str | None = None,
kind: DocumentFileKind = DocumentFileKind.ORIGINAL,
created_by_id: str | UUID | None = None,
backend: StorageBackend | None = None,
) -> DocumentFile:
"""Write bytes to storage and add a ``DocumentFile`` row to the session."""
backend = backend or get_storage_backend()
key = build_document_file_key(
search_space_id=search_space_id,
document_id=document_id,
kind=kind,
filename=filename,
)
await backend.put(key, data, content_type=mime_type)
record = DocumentFile(
document_id=document_id,
search_space_id=search_space_id,
kind=kind,
storage_backend=backend.backend_name,
storage_key=key,
original_filename=filename,
mime_type=mime_type,
size_bytes=len(data),
checksum_sha256=hashlib.sha256(data).hexdigest(),
created_by_id=created_by_id,
)
session.add(record)
return record
async def list_document_files(
session: AsyncSession, *, document_id: int
) -> list[DocumentFile]:
"""Return all stored files for a document, newest first."""
result = await session.execute(
select(DocumentFile)
.where(DocumentFile.document_id == document_id)
.order_by(DocumentFile.created_at.desc())
)
return list(result.scalars().all())
async def get_document_file(
session: AsyncSession,
*,
document_id: int,
kind: DocumentFileKind = DocumentFileKind.ORIGINAL,
) -> DocumentFile | None:
"""Return the most recent stored file of ``kind`` for a document."""
result = await session.execute(
select(DocumentFile)
.where(
DocumentFile.document_id == document_id,
DocumentFile.kind == kind,
)
.order_by(DocumentFile.created_at.desc())
)
return result.scalars().first()
def open_document_file_stream(
record: DocumentFile, *, backend: StorageBackend | None = None
) -> AsyncIterator[bytes]:
"""Open a chunked byte stream for a stored file."""
backend = backend or get_storage_backend()
return backend.open_stream(record.storage_key)
async def purge_document_blobs(
session: AsyncSession,
*,
document_ids: Sequence[int],
backend: StorageBackend | None = None,
) -> None:
"""Delete stored blobs for the given documents.
Call this before the ``document_files`` rows are removed (they cascade with
the document). Best-effort: a failed blob delete is logged, not raised, so
document deletion is never blocked by an orphaned blob.
"""
if not document_ids:
return
backend = backend or get_storage_backend()
result = await session.execute(
select(DocumentFile.storage_key).where(
DocumentFile.document_id.in_(document_ids)
)
)
for storage_key in result.scalars().all():
try:
await backend.delete(storage_key)
except Exception as delete_error:
logger.warning(
"Failed to delete stored blob %s: %s", storage_key, delete_error
)