SurfSense/surfsense_backend/app/services/export_service.py

200 lines
6.5 KiB
Python

"""Service for exporting knowledge base content as a ZIP archive."""
import asyncio
import logging
import os
import tempfile
import zipfile
from dataclasses import dataclass, field
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import Chunk, Document, Folder
from app.services.folder_service import get_folder_subtree_ids
logger = logging.getLogger(__name__)
def _sanitize_filename(title: str) -> str:
safe = "".join(c if c.isalnum() or c in " -_." else "_" for c in title).strip()
return safe[:80] or "document"
def _build_folder_path_map(folders: list[Folder]) -> dict[int, str]:
"""Build a mapping of folder_id -> full path string (e.g. 'Research/AI')."""
id_to_folder = {f.id: f for f in folders}
cache: dict[int, str] = {}
def resolve(folder_id: int) -> str:
if folder_id in cache:
return cache[folder_id]
folder = id_to_folder[folder_id]
safe_name = _sanitize_filename(folder.name)
if folder.parent_id is None or folder.parent_id not in id_to_folder:
cache[folder_id] = safe_name
else:
cache[folder_id] = f"{resolve(folder.parent_id)}/{safe_name}"
return cache[folder_id]
for f in folders:
resolve(f.id)
return cache
async def _get_document_markdown(
session: AsyncSession, document: Document
) -> str | None:
"""Resolve markdown content using the 3-tier fallback:
1. source_markdown 2. blocknote_document conversion 3. chunk concatenation
"""
if document.source_markdown is not None:
return document.source_markdown
if document.blocknote_document:
from app.utils.blocknote_to_markdown import blocknote_to_markdown
md = blocknote_to_markdown(document.blocknote_document)
if md:
return md
chunk_result = await session.execute(
select(Chunk.content)
.filter(Chunk.document_id == document.id)
.order_by(Chunk.id)
)
chunks = chunk_result.scalars().all()
if chunks:
return "\n\n".join(chunks)
return None
@dataclass
class ExportResult:
zip_path: str
export_name: str
zip_size: int
skipped_docs: list[str] = field(default_factory=list)
async def build_export_zip(
session: AsyncSession,
search_space_id: int,
folder_id: int | None = None,
) -> ExportResult:
"""Build a ZIP archive of markdown documents preserving folder structure.
Returns an ExportResult with the path to the temp ZIP file.
The caller is responsible for streaming and cleaning up the file.
Raises ValueError if folder_id is provided but not found.
"""
if folder_id is not None:
folder = await session.get(Folder, folder_id)
if not folder or folder.search_space_id != search_space_id:
raise ValueError("Folder not found")
target_folder_ids = set(await get_folder_subtree_ids(session, folder_id))
else:
target_folder_ids = None
folder_query = select(Folder).where(Folder.search_space_id == search_space_id)
if target_folder_ids is not None:
folder_query = folder_query.where(Folder.id.in_(target_folder_ids))
folder_result = await session.execute(folder_query)
folders = list(folder_result.scalars().all())
folder_path_map = _build_folder_path_map(folders)
batch_size = 100
base_doc_query = select(Document).where(Document.search_space_id == search_space_id)
if target_folder_ids is not None:
base_doc_query = base_doc_query.where(Document.folder_id.in_(target_folder_ids))
base_doc_query = base_doc_query.order_by(Document.id)
fd, tmp_path = tempfile.mkstemp(suffix=".zip")
os.close(fd)
used_paths: dict[str, int] = {}
skipped_docs: list[str] = []
is_first_batch = True
try:
offset = 0
while True:
batch_query = base_doc_query.limit(batch_size).offset(offset)
batch_result = await session.execute(batch_query)
documents = list(batch_result.scalars().all())
if not documents:
break
entries: list[tuple[str, str]] = []
for doc in documents:
status = doc.status or {}
state = (
status.get("state", "ready")
if isinstance(status, dict)
else "ready"
)
if state in ("pending", "processing"):
skipped_docs.append(doc.title or "Untitled")
continue
markdown = await _get_document_markdown(session, doc)
if not markdown or not markdown.strip():
continue
if doc.folder_id and doc.folder_id in folder_path_map:
dir_path = folder_path_map[doc.folder_id]
else:
dir_path = ""
base_name = _sanitize_filename(doc.title or "Untitled")
file_path = (
f"{dir_path}/{base_name}.md" if dir_path else f"{base_name}.md"
)
if file_path in used_paths:
used_paths[file_path] += 1
suffix = used_paths[file_path]
file_path = (
f"{dir_path}/{base_name}_{suffix}.md"
if dir_path
else f"{base_name}_{suffix}.md"
)
used_paths[file_path] = used_paths.get(file_path, 0) + 1
entries.append((file_path, markdown))
if entries:
mode = "w" if is_first_batch else "a"
batch_entries = entries
def _write_batch(m: str = mode, e: list = batch_entries) -> None:
with zipfile.ZipFile(tmp_path, m, zipfile.ZIP_DEFLATED) as zf:
for path, content in e:
zf.writestr(path, content)
await asyncio.to_thread(_write_batch)
is_first_batch = False
offset += batch_size
export_name = "knowledge-base"
if folder_id is not None and folder_id in folder_path_map:
export_name = _sanitize_filename(folder_path_map[folder_id].split("/")[0])
return ExportResult(
zip_path=tmp_path,
export_name=export_name,
zip_size=os.path.getsize(tmp_path),
skipped_docs=skipped_docs,
)
except Exception:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise