refactor: update safe_set_chunks function to be asynchronous and modify all connector and document processor files to use the new async implementation

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-03-15 00:44:27 -07:00
parent 49d8f41b09
commit 2b33dfe728
30 changed files with 102 additions and 106 deletions

View file

@ -432,7 +432,7 @@ async def index_airtable_records(
"table_name": item["table_name"],
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -28,45 +28,37 @@ def get_current_timestamp() -> datetime:
return datetime.now(UTC)
def safe_set_chunks(document: Document, chunks: list) -> None:
async def safe_set_chunks(
session: "AsyncSession", document: Document, chunks: list
) -> None:
"""
Safely assign chunks to a document without triggering lazy loading.
Delete old chunks and assign new ones to a document.
ALWAYS use this instead of `document.chunks = chunks` to avoid
SQLAlchemy async errors (MissingGreenlet / greenlet_spawn).
Why this is needed:
- Direct assignment `document.chunks = chunks` triggers SQLAlchemy to
load the OLD chunks first (for comparison/orphan detection)
- This lazy loading fails in async context with asyncpg driver
- set_committed_value bypasses this by setting the value directly
This function is safe regardless of how the document was loaded
(with or without selectinload).
This replaces direct ``document.chunks = chunks`` which triggers lazy
loading (and MissingGreenlet errors in async contexts). It also
explicitly deletes pre-existing chunks so they don't accumulate across
repeated re-indexes ``set_committed_value`` bypasses SQLAlchemy's
delete-orphan cascade.
Args:
document: The Document object to update
chunks: List of Chunk objects to assign
Example:
# Instead of: document.chunks = chunks (DANGEROUS!)
safe_set_chunks(document, chunks) # Always safe
session: The current async database session.
document: The Document object to update.
chunks: List of Chunk objects to assign.
"""
from sqlalchemy.orm import object_session
from sqlalchemy import delete
from sqlalchemy.orm.attributes import set_committed_value
# Keep relationship assignment lazy-load-safe.
set_committed_value(document, "chunks", chunks)
from app.db import Chunk
# Ensure chunk rows are actually persisted.
# set_committed_value bypasses normal unit-of-work tracking, so we need to
# explicitly attach chunk objects to the current session.
session = object_session(document)
if session is not None:
if document.id is not None:
for chunk in chunks:
chunk.document_id = document.id
session.add_all(chunks)
if document.id is not None:
await session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)
for chunk in chunks:
chunk.document_id = document.id
set_committed_value(document, "chunks", chunks)
session.add_all(chunks)
def parse_date_flexible(date_str: str) -> datetime:

View file

@ -430,7 +430,7 @@ async def index_bookstack_pages(
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = doc_metadata
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -439,7 +439,7 @@ async def index_clickup_tasks(
"connector_id": connector_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -413,7 +413,7 @@ async def index_confluence_pages(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -690,7 +690,7 @@ async def index_discord_messages(
"indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -386,7 +386,7 @@ async def index_elasticsearch_documents(
document.content_hash = item["content_hash"]
document.unique_identifier_hash = item["unique_identifier_hash"]
document.document_metadata = metadata
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -415,7 +415,7 @@ async def index_github_repos(
document.content_hash = item["content_hash"]
document.embedding = summary_embedding
document.document_metadata = doc_metadata
safe_set_chunks(document, chunks_data)
await safe_set_chunks(session, document, chunks_data)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -528,7 +528,7 @@ async def index_google_calendar_events(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -451,7 +451,7 @@ async def index_google_gmail_messages(
"date": item["date_str"],
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -393,7 +393,7 @@ async def index_jira_issues(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -431,7 +431,7 @@ async def index_linear_issues(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -488,7 +488,7 @@ async def index_luma_events(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -479,7 +479,7 @@ async def index_notion_pages(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -571,7 +571,7 @@ async def index_obsidian_vault(
document.content_hash = content_hash
document.embedding = embedding
document.document_metadata = document_metadata
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -564,7 +564,7 @@ async def index_slack_messages(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -603,7 +603,7 @@ async def index_teams_messages(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()

View file

@ -410,7 +410,7 @@ async def index_crawled_urls(
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.status = DocumentStatus.ready() # READY status
document.updated_at = get_current_timestamp()

View file

@ -14,45 +14,37 @@ from app.db import Document
md = MarkdownifyTransformer()
def safe_set_chunks(document: Document, chunks: list) -> None:
async def safe_set_chunks(
session: "AsyncSession", document: Document, chunks: list
) -> None:
"""
Safely assign chunks to a document without triggering lazy loading.
Delete old chunks and assign new ones to a document.
ALWAYS use this instead of `document.chunks = chunks` to avoid
SQLAlchemy async errors (MissingGreenlet / greenlet_spawn).
Why this is needed:
- Direct assignment `document.chunks = chunks` triggers SQLAlchemy to
load the OLD chunks first (for comparison/orphan detection)
- This lazy loading fails in async context with asyncpg driver
- set_committed_value bypasses this by setting the value directly
This function is safe regardless of how the document was loaded
(with or without selectinload).
This replaces direct ``document.chunks = chunks`` which triggers lazy
loading (and MissingGreenlet errors in async contexts). It also
explicitly deletes pre-existing chunks so they don't accumulate across
repeated re-indexes ``set_committed_value`` bypasses SQLAlchemy's
delete-orphan cascade.
Args:
document: The Document object to update
chunks: List of Chunk objects to assign
Example:
# Instead of: document.chunks = chunks (DANGEROUS!)
safe_set_chunks(document, chunks) # Always safe
session: The current async database session.
document: The Document object to update.
chunks: List of Chunk objects to assign.
"""
from sqlalchemy.orm import object_session
from sqlalchemy import delete
from sqlalchemy.orm.attributes import set_committed_value
# Keep relationship assignment lazy-load-safe.
set_committed_value(document, "chunks", chunks)
from app.db import Chunk
# Ensure chunk rows are actually persisted.
# set_committed_value bypasses normal unit-of-work tracking, so we need to
# explicitly attach chunk objects to the current session.
session = object_session(document)
if session is not None:
if document.id is not None:
for chunk in chunks:
chunk.document_id = document.id
session.add_all(chunks)
if document.id is not None:
await session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)
for chunk in chunks:
chunk.document_id = document.id
set_committed_value(document, "chunks", chunks)
session.add_all(chunks)
def get_current_timestamp() -> datetime:

View file

@ -227,7 +227,7 @@ async def add_circleback_meeting_document(
if summary_embedding is not None:
document.embedding = summary_embedding
document.document_metadata = document_metadata
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.source_markdown = markdown_content
document.content_needs_reindexing = False
document.updated_at = get_current_timestamp()

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
get_current_timestamp,
safe_set_chunks,
)
@ -154,7 +155,7 @@ async def add_extension_received_document(
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = content.metadata.model_dump()
existing_document.chunks = chunks
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = combined_document_string
existing_document.updated_at = get_current_timestamp()

View file

@ -35,6 +35,7 @@ from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
safe_set_chunks,
)
from .markdown_processor import add_received_markdown_file_document
@ -488,7 +489,7 @@ async def add_received_file_document_using_unstructured(
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
}
existing_document.chunks = chunks
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
@ -622,7 +623,7 @@ async def add_received_file_document_using_llamacloud(
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
}
existing_document.chunks = chunks
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
@ -777,7 +778,7 @@ async def add_received_file_document_using_docling(
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
}
existing_document.chunks = chunks
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()

View file

@ -21,6 +21,7 @@ from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
safe_set_chunks,
)
@ -258,7 +259,7 @@ async def add_received_markdown_file_document(
existing_document.document_metadata = {
"FILE_NAME": file_name,
}
existing_document.chunks = chunks
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = file_in_markdown
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready

View file

@ -419,7 +419,7 @@ async def add_youtube_video_document(
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
}
safe_set_chunks(document, chunks)
await safe_set_chunks(session, document, chunks)
document.source_markdown = combined_document_string
document.status = DocumentStatus.ready() # READY status - fully processed
document.updated_at = get_current_timestamp()

View file

@ -13,12 +13,32 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy import delete as sa_delete
from sqlalchemy.orm.attributes import set_committed_value
from app.config import config
from app.db import SurfsenseDocsChunk, SurfsenseDocsDocument, async_session_maker
from app.utils.document_converters import embed_text
logger = logging.getLogger(__name__)
async def _safe_set_docs_chunks(
session: AsyncSession, document: SurfsenseDocsDocument, chunks: list
) -> None:
"""safe_set_chunks variant for the SurfsenseDocsDocument/Chunk models."""
if document.id is not None:
await session.execute(
sa_delete(SurfsenseDocsChunk).where(
SurfsenseDocsChunk.document_id == document.id
)
)
for chunk in chunks:
chunk.document_id = document.id
set_committed_value(document, "chunks", chunks)
session.add_all(chunks)
# Path to docs relative to project root
DOCS_DIR = (
Path(__file__).resolve().parent.parent.parent.parent
@ -156,7 +176,7 @@ async def index_surfsense_docs(session: AsyncSession) -> tuple[int, int, int, in
existing_doc.content = content
existing_doc.content_hash = content_hash
existing_doc.embedding = embed_text(content)
existing_doc.chunks = chunks
await _safe_set_docs_chunks(session, existing_doc, chunks)
existing_doc.updated_at = datetime.now(UTC)
updated += 1