feat: Implement document saving with reindexing

- Updated the document saving endpoint to trigger reindexing after saving.
- Introduced a new Celery task for reindexing documents.
- Refactored the editor page to reflect the changes in the API endpoint and method.
This commit is contained in:
Anish Sarkar 2025-11-30 04:08:12 +05:30
parent 91bc344b56
commit f8e4926969
4 changed files with 156 additions and 83 deletions

View file

@ -64,6 +64,7 @@ celery_app = Celery(
"app.tasks.celery_tasks.connector_tasks",
"app.tasks.celery_tasks.schedule_checker_task",
"app.tasks.celery_tasks.blocknote_migration_tasks",
"app.tasks.celery_tasks.document_reindex_tasks",
],
)

View file

@ -99,103 +99,47 @@ async def get_editor_content(
}
@router.put("/documents/{document_id}/blocknote-content")
async def update_blocknote_content(
@router.post("/documents/{document_id}/save")
async def save_document(
document_id: int,
data: dict[str, Any],
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""
Auto-save BlockNote document during editing.
Only updates blocknote_document field, not content.
Save BlockNote document and trigger reindexing.
Called when user clicks 'Save & Exit'.
"""
from app.tasks.celery_tasks.document_reindex_tasks import reindex_document_task
# Verify ownership
result = await session.execute(
select(Document)
.join(SearchSpace)
.filter(Document.id == document_id, SearchSpace.user_id == user.id)
)
document = result.scalars().first()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
blocknote_document = data.get("blocknote_document")
if not blocknote_document:
raise HTTPException(status_code=400, detail="blocknote_document is required")
# Update only blocknote_document and last_edited_at
# Save BlockNote document
document.blocknote_document = blocknote_document
document.last_edited_at = datetime.now(UTC)
document.content_needs_reindexing = True
await session.commit()
await session.refresh(document)
return {"status": "saved", "last_edited_at": document.last_edited_at.isoformat()}
# did not implement reindexing (for now)
# @router.post("/documents/{document_id}/finalize-edit")
# async def finalize_edit(
# document_id: int,
# session: AsyncSession = Depends(get_async_session),
# user: User = Depends(current_active_user),
# ):
# """
# Finalize document editing: convert BlockNote to markdown,
# update content (summary), and trigger reindexing.
# """
# result = await session.execute(
# select(Document)
# .join(SearchSpace)
# .filter(Document.id == document_id, SearchSpace.user_id == user.id)
# )
# document = result.scalars().first()
# if not document:
# raise HTTPException(status_code=404, detail="Document not found")
# if not document.blocknote_document:
# raise HTTPException(
# status_code=400,
# detail="Document has no BlockNote content to finalize"
# )
# # 1. Convert BlockNote JSON → Markdown
# full_markdown = await convert_blocknote_to_markdown(document.blocknote_document)
# if not full_markdown:
# raise HTTPException(
# status_code=500,
# detail="Failed to convert BlockNote document to markdown"
# )
# # 2. Generate new summary from full markdown
# from app.services.llm_service import get_user_long_context_llm
# from app.utils.document_converters import generate_document_summary
# user_llm = await get_user_long_context_llm(session, str(user.id), document.search_space_id)
# if not user_llm:
# raise HTTPException(
# status_code=500,
# detail="No LLM configured for summary generation"
# )
# document_metadata = document.document_metadata or {}
# summary_content, summary_embedding = await generate_document_summary(
# full_markdown, user_llm, document_metadata
# )
# # 3. Update document fields
# document.content = summary_content
# document.embedding = summary_embedding
# document.content_needs_reindexing = True # Trigger chunk regeneration
# document.last_edited_at = datetime.now(UTC)
# await session.commit()
# return {
# "status": "finalized",
# "message": "Document saved. Summary and chunks will be regenerated in the background.",
# "content_needs_reindexing": True,
# }
# Queue reindex task
reindex_document_task.delay(document_id, str(user.id))
return {
"status": "saved",
"document_id": document_id,
"message": "Document saved and will be reindexed in the background",
"last_edited_at": document.last_edited_at.isoformat()
}

View file

@ -0,0 +1,128 @@
"""Celery tasks for reindexing edited documents."""
import logging
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from sqlalchemy import delete
from sqlalchemy.orm import selectinload
from app.celery_app import celery_app
from app.config import config
from app.db import Document
from app.utils.blocknote_converter import convert_blocknote_to_markdown
from app.utils.document_converters import (
create_document_chunks,
generate_document_summary,
)
from app.services.llm_service import get_user_long_context_llm
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""Create async session maker for Celery tasks."""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="reindex_document", bind=True)
def reindex_document_task(self, document_id: int, user_id: str):
"""
Celery task to reindex a document after editing.
Args:
document_id: ID of document to reindex
user_id: ID of user who edited the document
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_reindex_document(document_id, user_id))
finally:
loop.close()
async def _reindex_document(document_id: int, user_id: str):
"""Async function to reindex a document."""
async with get_celery_session_maker()() as session:
try:
# Get document
result = await session.execute(
select(Document)
.options(selectinload(Document.chunks)) # Eagerly load chunks
.where(Document.id == document_id)
)
document = result.scalars().first()
if not document:
logger.error(f"Document {document_id} not found")
return
if not document.blocknote_document:
logger.warning(f"Document {document_id} has no BlockNote content")
return
logger.info(f"Reindexing document {document_id} ({document.title})")
# 1. Convert BlockNote → Markdown
markdown_content = await convert_blocknote_to_markdown(
document.blocknote_document
)
if not markdown_content:
logger.error(f"Failed to convert document {document_id} to markdown")
return
# 2. Delete old chunks explicitly
from app.db import Chunk
await session.execute(
delete(Chunk).where(Chunk.document_id == document_id)
)
await session.flush() # Ensure old chunks are deleted
# 3. Create new chunks
new_chunks = await create_document_chunks(markdown_content)
# 4. Add new chunks to session
for chunk in new_chunks:
chunk.document_id = document_id
session.add(chunk)
logger.info(f"Created {len(new_chunks)} chunks for document {document_id}")
# 5. Regenerate summary
user_llm = await get_user_long_context_llm(
session, user_id, document.search_space_id
)
document_metadata = {
"title": document.title,
"document_type": document.document_type.value,
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
# 6. Update document
document.content = summary_content
document.embedding = summary_embedding
document.content_needs_reindexing = False
await session.commit()
logger.info(f"Successfully reindexed document {document_id}")
except Exception as e:
await session.rollback()
logger.error(f"Error reindexing document {document_id}: {e}", exc_info=True)
raise

View file

@ -112,11 +112,11 @@ export default function EditorPage() {
setSaving(true);
try {
// Save blocknote_document to database (without finalizing/reindexing)
// Save blocknote_document and trigger reindexing in background
const response = await fetch(
`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/${documentId}/blocknote-content`,
`${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/${documentId}/save`,
{
method: "PUT",
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${token}`,
@ -133,7 +133,7 @@ export default function EditorPage() {
}
setHasUnsavedChanges(false);
toast.success("Document saved successfully");
toast.success("Document saved! Reindexing in background...");
// Small delay before redirect to show success message
setTimeout(() => {