mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 00:36:31 +02:00
200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
"""Service for exporting knowledge base content as a ZIP archive."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import zipfile
|
|
from dataclasses import dataclass, field
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
|
|
from app.db import Chunk, Document, Folder
|
|
from app.services.folder_service import get_folder_subtree_ids
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _sanitize_filename(title: str) -> str:
|
|
safe = "".join(c if c.isalnum() or c in " -_." else "_" for c in title).strip()
|
|
return safe[:80] or "document"
|
|
|
|
|
|
def _build_folder_path_map(folders: list[Folder]) -> dict[int, str]:
|
|
"""Build a mapping of folder_id -> full path string (e.g. 'Research/AI')."""
|
|
id_to_folder = {f.id: f for f in folders}
|
|
cache: dict[int, str] = {}
|
|
|
|
def resolve(folder_id: int) -> str:
|
|
if folder_id in cache:
|
|
return cache[folder_id]
|
|
folder = id_to_folder[folder_id]
|
|
safe_name = _sanitize_filename(folder.name)
|
|
if folder.parent_id is None or folder.parent_id not in id_to_folder:
|
|
cache[folder_id] = safe_name
|
|
else:
|
|
cache[folder_id] = f"{resolve(folder.parent_id)}/{safe_name}"
|
|
return cache[folder_id]
|
|
|
|
for f in folders:
|
|
resolve(f.id)
|
|
|
|
return cache
|
|
|
|
|
|
async def _get_document_markdown(
|
|
session: AsyncSession, document: Document
|
|
) -> str | None:
|
|
"""Resolve markdown content using the 3-tier fallback:
|
|
1. source_markdown 2. blocknote_document conversion 3. chunk concatenation
|
|
"""
|
|
if document.source_markdown is not None:
|
|
return document.source_markdown
|
|
|
|
if document.blocknote_document:
|
|
from app.utils.blocknote_to_markdown import blocknote_to_markdown
|
|
|
|
md = blocknote_to_markdown(document.blocknote_document)
|
|
if md:
|
|
return md
|
|
|
|
chunk_result = await session.execute(
|
|
select(Chunk.content)
|
|
.filter(Chunk.document_id == document.id)
|
|
.order_by(Chunk.id)
|
|
)
|
|
chunks = chunk_result.scalars().all()
|
|
if chunks:
|
|
return "\n\n".join(chunks)
|
|
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class ExportResult:
|
|
zip_path: str
|
|
export_name: str
|
|
zip_size: int
|
|
skipped_docs: list[str] = field(default_factory=list)
|
|
|
|
|
|
async def build_export_zip(
|
|
session: AsyncSession,
|
|
search_space_id: int,
|
|
folder_id: int | None = None,
|
|
) -> ExportResult:
|
|
"""Build a ZIP archive of markdown documents preserving folder structure.
|
|
|
|
Returns an ExportResult with the path to the temp ZIP file.
|
|
The caller is responsible for streaming and cleaning up the file.
|
|
|
|
Raises ValueError if folder_id is provided but not found.
|
|
"""
|
|
if folder_id is not None:
|
|
folder = await session.get(Folder, folder_id)
|
|
if not folder or folder.search_space_id != search_space_id:
|
|
raise ValueError("Folder not found")
|
|
target_folder_ids = set(await get_folder_subtree_ids(session, folder_id))
|
|
else:
|
|
target_folder_ids = None
|
|
|
|
folder_query = select(Folder).where(Folder.search_space_id == search_space_id)
|
|
if target_folder_ids is not None:
|
|
folder_query = folder_query.where(Folder.id.in_(target_folder_ids))
|
|
folder_result = await session.execute(folder_query)
|
|
folders = list(folder_result.scalars().all())
|
|
|
|
folder_path_map = _build_folder_path_map(folders)
|
|
|
|
batch_size = 100
|
|
|
|
base_doc_query = select(Document).where(Document.search_space_id == search_space_id)
|
|
if target_folder_ids is not None:
|
|
base_doc_query = base_doc_query.where(Document.folder_id.in_(target_folder_ids))
|
|
base_doc_query = base_doc_query.order_by(Document.id)
|
|
|
|
fd, tmp_path = tempfile.mkstemp(suffix=".zip")
|
|
os.close(fd)
|
|
|
|
used_paths: dict[str, int] = {}
|
|
skipped_docs: list[str] = []
|
|
is_first_batch = True
|
|
|
|
try:
|
|
offset = 0
|
|
while True:
|
|
batch_query = base_doc_query.limit(batch_size).offset(offset)
|
|
batch_result = await session.execute(batch_query)
|
|
documents = list(batch_result.scalars().all())
|
|
if not documents:
|
|
break
|
|
|
|
entries: list[tuple[str, str]] = []
|
|
|
|
for doc in documents:
|
|
status = doc.status or {}
|
|
state = (
|
|
status.get("state", "ready")
|
|
if isinstance(status, dict)
|
|
else "ready"
|
|
)
|
|
if state in ("pending", "processing"):
|
|
skipped_docs.append(doc.title or "Untitled")
|
|
continue
|
|
|
|
markdown = await _get_document_markdown(session, doc)
|
|
if not markdown or not markdown.strip():
|
|
continue
|
|
|
|
if doc.folder_id and doc.folder_id in folder_path_map:
|
|
dir_path = folder_path_map[doc.folder_id]
|
|
else:
|
|
dir_path = ""
|
|
|
|
base_name = _sanitize_filename(doc.title or "Untitled")
|
|
file_path = (
|
|
f"{dir_path}/{base_name}.md" if dir_path else f"{base_name}.md"
|
|
)
|
|
|
|
if file_path in used_paths:
|
|
used_paths[file_path] += 1
|
|
suffix = used_paths[file_path]
|
|
file_path = (
|
|
f"{dir_path}/{base_name}_{suffix}.md"
|
|
if dir_path
|
|
else f"{base_name}_{suffix}.md"
|
|
)
|
|
used_paths[file_path] = used_paths.get(file_path, 0) + 1
|
|
|
|
entries.append((file_path, markdown))
|
|
|
|
if entries:
|
|
mode = "w" if is_first_batch else "a"
|
|
batch_entries = entries
|
|
|
|
def _write_batch(m: str = mode, e: list = batch_entries) -> None:
|
|
with zipfile.ZipFile(tmp_path, m, zipfile.ZIP_DEFLATED) as zf:
|
|
for path, content in e:
|
|
zf.writestr(path, content)
|
|
|
|
await asyncio.to_thread(_write_batch)
|
|
is_first_batch = False
|
|
|
|
offset += batch_size
|
|
|
|
export_name = "knowledge-base"
|
|
if folder_id is not None and folder_id in folder_path_map:
|
|
export_name = _sanitize_filename(folder_path_map[folder_id].split("/")[0])
|
|
|
|
return ExportResult(
|
|
zip_path=tmp_path,
|
|
export_name=export_name,
|
|
zip_size=os.path.getsize(tmp_path),
|
|
skipped_docs=skipped_docs,
|
|
)
|
|
|
|
except Exception:
|
|
if os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
raise
|