feat: update migration to skip blocknode conversion and update transaction handling in chat routes

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-02-20 22:43:25 -08:00
parent 4c8a70ca4d
commit 08c75127f1
3 changed files with 26 additions and 104 deletions

View file

@ -4,18 +4,12 @@ Revision ID: 101
Revises: 100
Create Date: 2026-02-17
Adds source_markdown column and converts only documents that have
blocknote_document data. Uses a pure-Python BlockNote JSON Markdown
converter without external dependencies.
Documents without blocknote_document keep source_markdown = NULL and
get populated lazily by the editor route when a user first opens them.
Adds source_markdown column to documents. All existing rows start as NULL
and get populated lazily by the editor route when a user first opens them.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Sequence
import sqlalchemy as sa
@ -28,114 +22,19 @@ down_revision: str | None = "100"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
logger = logging.getLogger("alembic.migration.101")
def upgrade() -> None:
"""Add source_markdown column and populate it for existing documents."""
conn = op.get_bind()
existing_columns = [
col["name"] for col in sa.inspect(conn).get_columns("documents")
]
# 1. Add the column
if "source_markdown" not in existing_columns:
op.add_column(
"documents",
sa.Column("source_markdown", sa.Text(), nullable=True),
)
# 2. Convert only documents that have blocknote_document data
_populate_source_markdown(conn)
def _populate_source_markdown(conn, batch_size: int = 500) -> None:
"""Populate source_markdown only for documents that have blocknote_document.
Processes in batches to avoid long-running transactions and high memory usage.
"""
from app.utils.blocknote_to_markdown import blocknote_to_markdown
# Get total count first
count_result = conn.execute(
sa.text("""
SELECT count(*)
FROM documents
WHERE source_markdown IS NULL
AND blocknote_document IS NOT NULL
""")
)
total = count_result.scalar()
if total == 0:
print("No documents with blocknote_document need migration")
return
print(
f" Migrating {total} documents (with blocknote_document) to source_markdown..."
)
migrated = 0
failed = 0
offset = 0
while offset < total:
# Fetch one batch at a time
result = conn.execute(
sa.text("""
SELECT id, title, blocknote_document
FROM documents
WHERE source_markdown IS NULL
AND blocknote_document IS NOT NULL
ORDER BY id
LIMIT :limit OFFSET :offset
"""),
{"limit": batch_size, "offset": offset},
)
rows = result.fetchall()
if not rows:
break
for row in rows:
doc_id = row[0]
doc_title = row[1]
blocknote_doc = row[2]
try:
if isinstance(blocknote_doc, str):
blocknote_doc = json.loads(blocknote_doc)
markdown = blocknote_to_markdown(blocknote_doc)
if markdown:
conn.execute(
sa.text("""
UPDATE documents SET source_markdown = :md WHERE id = :doc_id
"""),
{"md": markdown, "doc_id": doc_id},
)
migrated += 1
else:
logger.warning(
f" Doc {doc_id} ({doc_title}): blocknote conversion produced empty result"
)
failed += 1
except Exception as e:
logger.warning(
f" Doc {doc_id} ({doc_title}): blocknote conversion failed ({e})"
)
failed += 1
print(f" Batch complete: processed {min(offset + batch_size, total)}/{total}")
offset += batch_size
print(
f"source_markdown migration complete: {migrated} migrated, "
f"{failed} failed out of {total} total"
)
def downgrade() -> None:
"""Remove source_markdown column."""
op.drop_column("documents", "source_markdown")

View file

@ -1042,7 +1042,11 @@ async def handle_new_chat(
search_space.agent_llm_id if search_space.agent_llm_id is not None else -1
)
# Return streaming response
# Release the read-transaction so we don't hold ACCESS SHARE locks
# on searchspaces/documents for the entire duration of the stream.
# expire_on_commit=False keeps loaded ORM attrs usable.
await session.commit()
return StreamingResponse(
stream_new_chat(
user_query=request.user_query,
@ -1269,6 +1273,11 @@ async def regenerate_response(
search_space.agent_llm_id if search_space.agent_llm_id is not None else -1
)
# Release the read-transaction so we don't hold ACCESS SHARE locks
# on searchspaces/documents for the entire duration of the stream.
# expire_on_commit=False keeps loaded ORM attrs (including messages_to_delete PKs) usable.
await session.commit()
# Create a wrapper generator that deletes messages only AFTER streaming succeeds
# This prevents data loss if streaming fails (network error, LLM error, etc.)
async def stream_with_cleanup():
@ -1382,6 +1391,10 @@ async def resume_chat(
decisions = [d.model_dump() for d in request.decisions]
# Release the read-transaction so we don't hold ACCESS SHARE locks
# on searchspaces/documents for the entire duration of the stream.
await session.commit()
return StreamingResponse(
stream_resume_chat(
chat_id=thread_id,

View file

@ -1110,6 +1110,13 @@ async def stream_new_chat(
"search_space_id": search_space_id,
}
# All pre-streaming DB reads are done. Commit to release the
# transaction and its ACCESS SHARE locks so we don't block DDL
# (e.g. migrations) for the entire duration of LLM streaming.
# Tools that need DB access during streaming will start their own
# short-lived transactions (or use isolated sessions).
await session.commit()
# Configure LangGraph with thread_id for memory
# If checkpoint_id is provided, fork from that checkpoint (for edit/reload)
configurable = {"thread_id": str(chat_id)}
@ -1345,6 +1352,9 @@ async def stream_resume_chat(
thread_visibility=visibility,
)
# Release the transaction before streaming (same rationale as stream_new_chat).
await session.commit()
from langgraph.types import Command
config = {