mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-19 18:45:15 +02:00
feat: add source_markdown column to documents and implement migration logic for existing records using a pure-Python BlockNote JSON to Markdown converter
This commit is contained in:
parent
f2a2872995
commit
8b497da130
22 changed files with 632 additions and 920 deletions
|
|
@ -1,168 +0,0 @@
|
|||
"""Celery tasks for populating blocknote_document for existing documents."""
|
||||
|
||||
import logging
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import selectinload
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
from app.celery_app import celery_app
|
||||
from app.config import config
|
||||
from app.db import Document
|
||||
from app.utils.blocknote_converter import convert_markdown_to_blocknote
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_celery_session_maker():
|
||||
"""
|
||||
Create a new async session maker for Celery tasks.
|
||||
This is necessary because Celery tasks run in a new event loop,
|
||||
and the default session maker is bound to the main app's event loop.
|
||||
"""
|
||||
engine = create_async_engine(
|
||||
config.DATABASE_URL,
|
||||
poolclass=NullPool,
|
||||
echo=False,
|
||||
)
|
||||
return async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
|
||||
@celery_app.task(name="populate_blocknote_for_documents", bind=True)
|
||||
def populate_blocknote_for_documents_task(
|
||||
self, document_ids: list[int] | None = None, batch_size: int = 50
|
||||
):
|
||||
"""
|
||||
Celery task to populate blocknote_document for existing documents.
|
||||
|
||||
Args:
|
||||
document_ids: Optional list of specific document IDs to process.
|
||||
If None, processes all documents with blocknote_document IS NULL.
|
||||
batch_size: Number of documents to process in each batch (default: 50)
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_populate_blocknote_for_documents(document_ids, batch_size)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _populate_blocknote_for_documents(
|
||||
document_ids: list[int] | None = None, batch_size: int = 50
|
||||
):
|
||||
"""
|
||||
Async function to populate blocknote_document for documents.
|
||||
|
||||
Args:
|
||||
document_ids: Optional list of specific document IDs to process
|
||||
batch_size: Number of documents to process per batch
|
||||
"""
|
||||
async with get_celery_session_maker()() as session:
|
||||
try:
|
||||
# Build query for documents that need blocknote_document populated
|
||||
query = select(Document).where(Document.blocknote_document.is_(None))
|
||||
|
||||
# If specific document IDs provided, filter by them
|
||||
if document_ids:
|
||||
query = query.where(Document.id.in_(document_ids))
|
||||
|
||||
# Load chunks relationship to avoid N+1 queries
|
||||
query = query.options(selectinload(Document.chunks))
|
||||
|
||||
# Execute query
|
||||
result = await session.execute(query)
|
||||
documents = result.scalars().all()
|
||||
|
||||
total_documents = len(documents)
|
||||
logger.info(f"Found {total_documents} documents to process")
|
||||
|
||||
if total_documents == 0:
|
||||
logger.info("No documents to process")
|
||||
return
|
||||
|
||||
# Process documents in batches
|
||||
processed = 0
|
||||
failed = 0
|
||||
|
||||
for i in range(0, total_documents, batch_size):
|
||||
batch = documents[i : i + batch_size]
|
||||
logger.info(
|
||||
f"Processing batch {i // batch_size + 1}: documents {i + 1}-{min(i + batch_size, total_documents)}"
|
||||
)
|
||||
|
||||
for document in batch:
|
||||
try:
|
||||
# Use preloaded chunks from selectinload - no need to query again
|
||||
chunks = sorted(document.chunks, key=lambda c: c.id)
|
||||
|
||||
if not chunks:
|
||||
logger.warning(
|
||||
f"Document {document.id} ({document.title}) has no chunks, skipping"
|
||||
)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# Reconstruct markdown by concatenating chunk contents
|
||||
markdown_content = "\n\n".join(
|
||||
chunk.content for chunk in chunks
|
||||
)
|
||||
|
||||
if not markdown_content or not markdown_content.strip():
|
||||
logger.warning(
|
||||
f"Document {document.id} ({document.title}) has empty markdown content, skipping"
|
||||
)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# Convert markdown to BlockNote JSON
|
||||
blocknote_json = await convert_markdown_to_blocknote(
|
||||
markdown_content
|
||||
)
|
||||
|
||||
if not blocknote_json:
|
||||
logger.warning(
|
||||
f"Failed to convert markdown to BlockNote for document {document.id} ({document.title})"
|
||||
)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# Update document with blocknote_document (other fields already have correct defaults)
|
||||
document.blocknote_document = blocknote_json
|
||||
|
||||
processed += 1
|
||||
|
||||
# Commit every batch_size documents to avoid long transactions
|
||||
if processed % batch_size == 0:
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"Committed batch: {processed} documents processed so far"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing document {document.id} ({document.title}): {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
failed += 1
|
||||
# Continue with next document instead of failing entire batch
|
||||
continue
|
||||
|
||||
# Commit remaining changes in the batch
|
||||
await session.commit()
|
||||
logger.info(f"Completed batch {i // batch_size + 1}")
|
||||
|
||||
logger.info(
|
||||
f"Migration complete: {processed} documents processed, {failed} failed"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Error in blocknote migration task: {e}", exc_info=True)
|
||||
raise
|
||||
|
|
@ -13,7 +13,6 @@ from app.config import config
|
|||
from app.db import Document
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.blocknote_converter import convert_blocknote_to_markdown
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_document_summary,
|
||||
|
|
@ -84,48 +83,37 @@ async def _reindex_document(document_id: int, user_id: str):
|
|||
)
|
||||
|
||||
try:
|
||||
if not document.blocknote_document:
|
||||
# Read markdown directly from source_markdown
|
||||
markdown_content = document.source_markdown
|
||||
|
||||
if not markdown_content:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Document {document_id} has no BlockNote content to reindex",
|
||||
"No BlockNote content",
|
||||
{"error_type": "NoBlockNoteContent"},
|
||||
f"Document {document_id} has no source_markdown to reindex",
|
||||
"No source_markdown content",
|
||||
{"error_type": "NoSourceMarkdown"},
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(f"Reindexing document {document_id} ({document.title})")
|
||||
|
||||
# 1. Convert BlockNote → Markdown
|
||||
markdown_content = await convert_blocknote_to_markdown(
|
||||
document.blocknote_document
|
||||
)
|
||||
|
||||
if not markdown_content:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to convert document {document_id} to markdown",
|
||||
"Markdown conversion failed",
|
||||
{"error_type": "ConversionError"},
|
||||
)
|
||||
return
|
||||
|
||||
# 2. Delete old chunks explicitly
|
||||
# 1. Delete old chunks explicitly
|
||||
from app.db import Chunk
|
||||
|
||||
await session.execute(delete(Chunk).where(Chunk.document_id == document_id))
|
||||
await session.flush() # Ensure old chunks are deleted
|
||||
|
||||
# 3. Create new chunks
|
||||
# 2. Create new chunks from source_markdown
|
||||
new_chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
# 4. Add new chunks to session
|
||||
# 3. Add new chunks to session
|
||||
for chunk in new_chunks:
|
||||
chunk.document_id = document_id
|
||||
session.add(chunk)
|
||||
|
||||
logger.info(f"Created {len(new_chunks)} chunks for document {document_id}")
|
||||
|
||||
# 5. Regenerate summary
|
||||
# 4. Regenerate summary
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, document.search_space_id
|
||||
)
|
||||
|
|
@ -139,7 +127,7 @@ async def _reindex_document(document_id: int, user_id: str):
|
|||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
|
||||
# 6. Update document
|
||||
# 5. Update document
|
||||
document.content = summary_content
|
||||
document.embedding = summary_embedding
|
||||
document.content_needs_reindexing = False
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue