feat: Added celery tasks to populate blocknote_document for existing documents

This commit is contained in:
Anish Sarkar 2025-11-30 03:49:43 +05:30
parent e33b42f9a5
commit 91bc344b56
6 changed files with 231 additions and 12 deletions

View file

@ -20,8 +20,9 @@ depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema - Add BlockNote fields only."""
"""Upgrade schema - Add BlockNote fields and trigger population task."""
# Add the columns
op.add_column(
"documents",
sa.Column(
@ -42,6 +43,21 @@ def upgrade() -> None:
sa.Column("last_edited_at", sa.TIMESTAMP(timezone=True), nullable=True),
)
# Trigger the Celery task to populate blocknote_document for existing documents
try:
from app.tasks.celery_tasks.blocknote_migration_tasks import (
populate_blocknote_for_documents_task,
)
# Queue the task to run asynchronously
populate_blocknote_for_documents_task.apply_async()
print("✓ Queued Celery task to populate blocknote_document for existing documents")
except Exception as e:
# If Celery is not available or task queueing fails, log but don't fail the migration
print(f"⚠ Warning: Could not queue blocknote population task: {e}")
print(" You can manually trigger it later with:")
print(" celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task")
def downgrade() -> None:
"""Downgrade schema - Remove BlockNote fields."""

View file

@ -63,6 +63,7 @@ celery_app = Celery(
"app.tasks.celery_tasks.podcast_tasks",
"app.tasks.celery_tasks.connector_tasks",
"app.tasks.celery_tasks.schedule_checker_task",
"app.tasks.celery_tasks.blocknote_migration_tasks",
],
)

View file

@ -30,11 +30,13 @@ async def get_editor_content(
Get document content for editing.
Returns BlockNote JSON document. If blocknote_document is NULL,
attempts to convert from `content` - though this won't work well
for old documents that only have summaries.
attempts to generate it from chunks (lazy migration).
"""
from sqlalchemy.orm import selectinload
result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.join(SearchSpace)
.filter(Document.id == document_id, SearchSpace.user_id == user.id)
)
@ -54,12 +56,47 @@ async def get_editor_content(
else None,
}
# For old documents without blocknote_document, return error
# (Can't convert summary back to full document)
raise HTTPException(
status_code=400,
detail="This document was uploaded before editing was enabled. Please re-upload to enable editing.",
)
# Lazy migration: Try to generate blocknote_document from chunks
from app.utils.blocknote_converter import convert_markdown_to_blocknote
chunks = sorted(document.chunks, key=lambda c: c.id)
if not chunks:
raise HTTPException(
status_code=400,
detail="This document has no chunks and cannot be edited. Please re-upload to enable editing.",
)
# Reconstruct markdown from chunks
markdown_content = "\n\n".join(chunk.content for chunk in chunks)
if not markdown_content.strip():
raise HTTPException(
status_code=400,
detail="This document has empty content and cannot be edited.",
)
# Convert to BlockNote
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
if not blocknote_json:
raise HTTPException(
status_code=500,
detail="Failed to convert document to editable format. Please try again later.",
)
# Save the generated blocknote_document (lazy migration)
document.blocknote_document = blocknote_json
document.content_needs_reindexing = False
document.last_edited_at = None
await session.commit()
return {
"document_id": document.id,
"title": document.title,
"blocknote_document": blocknote_json,
"last_edited_at": None,
}
@router.put("/documents/{document_id}/blocknote-content")

View file

@ -0,0 +1,161 @@
"""Celery tasks for populating blocknote_document for existing documents."""
import logging
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import selectinload
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.db import Chunk, Document
from app.utils.blocknote_converter import convert_markdown_to_blocknote
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""
Create a new async session maker for Celery tasks.
This is necessary because Celery tasks run in a new event loop,
and the default session maker is bound to the main app's event loop.
"""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="populate_blocknote_for_documents", bind=True)
def populate_blocknote_for_documents_task(
self, document_ids: list[int] | None = None, batch_size: int = 50
):
"""
Celery task to populate blocknote_document for existing documents.
Args:
document_ids: Optional list of specific document IDs to process.
If None, processes all documents with blocknote_document IS NULL.
batch_size: Number of documents to process in each batch (default: 50)
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_populate_blocknote_for_documents(document_ids, batch_size)
)
finally:
loop.close()
async def _populate_blocknote_for_documents(
document_ids: list[int] | None = None, batch_size: int = 50
):
"""
Async function to populate blocknote_document for documents.
Args:
document_ids: Optional list of specific document IDs to process
batch_size: Number of documents to process per batch
"""
async with get_celery_session_maker()() as session:
try:
# Build query for documents that need blocknote_document populated
query = select(Document).where(Document.blocknote_document.is_(None))
# If specific document IDs provided, filter by them
if document_ids:
query = query.where(Document.id.in_(document_ids))
# Load chunks relationship to avoid N+1 queries
query = query.options(selectinload(Document.chunks))
# Execute query
result = await session.execute(query)
documents = result.scalars().all()
total_documents = len(documents)
logger.info(f"Found {total_documents} documents to process")
if total_documents == 0:
logger.info("No documents to process")
return
# Process documents in batches
processed = 0
failed = 0
for i in range(0, total_documents, batch_size):
batch = documents[i : i + batch_size]
logger.info(f"Processing batch {i // batch_size + 1}: documents {i+1}-{min(i+batch_size, total_documents)}")
for document in batch:
try:
# Use preloaded chunks from selectinload - no need to query again
chunks = sorted(document.chunks, key=lambda c: c.id)
if not chunks:
logger.warning(
f"Document {document.id} ({document.title}) has no chunks, skipping"
)
failed += 1
continue
# Reconstruct markdown by concatenating chunk contents
markdown_content = "\n\n".join(chunk.content for chunk in chunks)
if not markdown_content or not markdown_content.strip():
logger.warning(
f"Document {document.id} ({document.title}) has empty markdown content, skipping"
)
failed += 1
continue
# Convert markdown to BlockNote JSON
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
if not blocknote_json:
logger.warning(
f"Failed to convert markdown to BlockNote for document {document.id} ({document.title})"
)
failed += 1
continue
# Update document with blocknote_document (other fields already have correct defaults)
document.blocknote_document = blocknote_json
processed += 1
# Commit every batch_size documents to avoid long transactions
if processed % batch_size == 0:
await session.commit()
logger.info(f"Committed batch: {processed} documents processed so far")
except Exception as e:
logger.error(
f"Error processing document {document.id} ({document.title}): {e}",
exc_info=True,
)
failed += 1
# Continue with next document instead of failing entire batch
continue
# Commit remaining changes in the batch
await session.commit()
logger.info(f"Completed batch {i // batch_size + 1}")
logger.info(
f"Migration complete: {processed} documents processed, {failed} failed"
)
except Exception as e:
await session.rollback()
logger.error(f"Error in blocknote migration task: {e}", exc_info=True)
raise

View file

@ -396,7 +396,9 @@ async def add_received_file_document_using_docling(
"ETL_SERVICE": "DOCLING",
}
existing_document.chunks = chunks
existing_document.blocknote_document = blocknote_json
existing_document.blocknote_document = None
existing_document.content_needs_reindexing = False
existing_document.last_edited_at = None
await session.commit()
await session.refresh(existing_document)
@ -416,7 +418,9 @@ async def add_received_file_document_using_docling(
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
blocknote_document=None,
content_needs_reindexing=False,
last_edited_at=None,
)
session.add(document)

View file

@ -96,7 +96,7 @@ export default function EditorPage() {
}
}, [editorContent, document]);
// TODO: Auto-save every 30 seconds - DIRECT CALL TO FASTAPI
// TODO: Maybe add Auto-save every 30 seconds - DIRECT CALL TO FASTAPI
// Save and exit - DIRECT CALL TO FASTAPI
const handleSave = async () => {