diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index 2a382f3b8..e675085db 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -463,7 +463,7 @@ async def _process_gmail_messages_phase2( "connector_id": connector_id, "source": "composio", } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index 63bade873..6344f9f38 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -477,7 +477,7 @@ async def index_composio_google_calendar( "connector_id": connector_id, "source": "composio", } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index c10edb7e9..30ce4a77b 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -1112,7 +1112,7 @@ async def _index_composio_drive_delta_sync( "connector_id": connector_id, "source": "composio", } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() @@ -1520,7 +1520,7 @@ async def _index_composio_drive_full_scan( "connector_id": connector_id, "source": "composio", } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/services/linear/kb_sync_service.py b/surfsense_backend/app/services/linear/kb_sync_service.py index 8d1bc47c7..4f97a3dd0 100644 --- a/surfsense_backend/app/services/linear/kb_sync_service.py +++ b/surfsense_backend/app/services/linear/kb_sync_service.py @@ -1,11 +1,10 @@ import logging from datetime import datetime -from sqlalchemy import delete from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.linear_connector import LinearConnector -from app.db import Chunk, Document +from app.db import Document from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import ( create_document_chunks, @@ -105,10 +104,6 @@ class LinearKBSyncService: ) summary_embedding = embed_text(summary_content) - await self.db_session.execute( - delete(Chunk).where(Chunk.document_id == document.id) - ) - chunks = await create_document_chunks(issue_content) document.title = f"{issue_identifier}: {issue_title}" @@ -131,7 +126,7 @@ class LinearKBSyncService: "connector_id": connector_id, } flag_modified(document, "document_metadata") - safe_set_chunks(document, chunks) + await safe_set_chunks(self.db_session, document, chunks) document.updated_at = get_current_timestamp() await self.db_session.commit() diff --git a/surfsense_backend/app/services/notion/kb_sync_service.py b/surfsense_backend/app/services/notion/kb_sync_service.py index ce31e0d35..d6c64897f 100644 --- a/surfsense_backend/app/services/notion/kb_sync_service.py +++ b/surfsense_backend/app/services/notion/kb_sync_service.py @@ -1,10 +1,9 @@ import logging from datetime import datetime -from sqlalchemy import delete from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Chunk, Document +from app.db import Document from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import ( create_document_chunks, @@ -130,11 +129,6 @@ class NotionKBSyncService: summary_content = f"Notion Page: {document.document_metadata.get('page_title')}\n\n{full_content}" summary_embedding = embed_text(summary_content) - logger.debug(f"Deleting old chunks for document {document_id}") - await self.db_session.execute( - delete(Chunk).where(Chunk.document_id == document.id) - ) - logger.debug("Creating new chunks") chunks = await create_document_chunks(full_content) logger.debug(f"Created {len(chunks)} chunks") @@ -147,7 +141,7 @@ class NotionKBSyncService: **document.document_metadata, "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } - safe_set_chunks(document, chunks) + await safe_set_chunks(self.db_session, document, chunks) document.updated_at = get_current_timestamp() logger.debug("Committing changes to database") diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 438a93815..6f020685a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -432,7 +432,7 @@ async def index_airtable_records( "table_name": item["table_name"], "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index 139aed1d3..b6ce2f2f9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -28,45 +28,37 @@ def get_current_timestamp() -> datetime: return datetime.now(UTC) -def safe_set_chunks(document: Document, chunks: list) -> None: +async def safe_set_chunks( + session: "AsyncSession", document: Document, chunks: list +) -> None: """ - Safely assign chunks to a document without triggering lazy loading. + Delete old chunks and assign new ones to a document. - ALWAYS use this instead of `document.chunks = chunks` to avoid - SQLAlchemy async errors (MissingGreenlet / greenlet_spawn). - - Why this is needed: - - Direct assignment `document.chunks = chunks` triggers SQLAlchemy to - load the OLD chunks first (for comparison/orphan detection) - - This lazy loading fails in async context with asyncpg driver - - set_committed_value bypasses this by setting the value directly - - This function is safe regardless of how the document was loaded - (with or without selectinload). + This replaces direct ``document.chunks = chunks`` which triggers lazy + loading (and MissingGreenlet errors in async contexts). It also + explicitly deletes pre-existing chunks so they don't accumulate across + repeated re-indexes — ``set_committed_value`` bypasses SQLAlchemy's + delete-orphan cascade. Args: - document: The Document object to update - chunks: List of Chunk objects to assign - - Example: - # Instead of: document.chunks = chunks (DANGEROUS!) - safe_set_chunks(document, chunks) # Always safe + session: The current async database session. + document: The Document object to update. + chunks: List of Chunk objects to assign. """ - from sqlalchemy.orm import object_session + from sqlalchemy import delete from sqlalchemy.orm.attributes import set_committed_value - # Keep relationship assignment lazy-load-safe. - set_committed_value(document, "chunks", chunks) + from app.db import Chunk - # Ensure chunk rows are actually persisted. - # set_committed_value bypasses normal unit-of-work tracking, so we need to - # explicitly attach chunk objects to the current session. - session = object_session(document) - if session is not None: - if document.id is not None: - for chunk in chunks: - chunk.document_id = document.id - session.add_all(chunks) + if document.id is not None: + await session.execute( + delete(Chunk).where(Chunk.document_id == document.id) + ) + for chunk in chunks: + chunk.document_id = document.id + + set_committed_value(document, "chunks", chunks) + session.add_all(chunks) def parse_date_flexible(date_str: str) -> datetime: diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index bf3aaa35f..0660531b2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -430,7 +430,7 @@ async def index_bookstack_pages( document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = doc_metadata - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index fd0233e87..af796ba3c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -439,7 +439,7 @@ async def index_clickup_tasks( "connector_id": connector_id, "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index c28e82b8f..3495c59a4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -413,7 +413,7 @@ async def index_confluence_pages( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 0421352ff..e8e80a646 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -690,7 +690,7 @@ async def index_discord_messages( "indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 212afff39..f07c6c580 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -386,7 +386,7 @@ async def index_elasticsearch_documents( document.content_hash = item["content_hash"] document.unique_identifier_hash = item["unique_identifier_hash"] document.document_metadata = metadata - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index fc6634024..61607dda3 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -415,7 +415,7 @@ async def index_github_repos( document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = doc_metadata - safe_set_chunks(document, chunks_data) + await safe_set_chunks(session, document, chunks_data) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 1407d98dd..24e822060 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -528,7 +528,7 @@ async def index_google_calendar_events( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 1a8b2b176..6e2408cbd 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -451,7 +451,7 @@ async def index_google_gmail_messages( "date": item["date_str"], "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index dec37428a..1765a592e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -393,7 +393,7 @@ async def index_jira_issues( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 1a2254c5b..bacafccc7 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -431,7 +431,7 @@ async def index_linear_issues( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 2d86b09c1..83cf54f4e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -488,7 +488,7 @@ async def index_luma_events( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index b0c49dea5..85daff94c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -479,7 +479,7 @@ async def index_notion_pages( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index c0eef84d5..d53baa3b0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -571,7 +571,7 @@ async def index_obsidian_vault( document.content_hash = content_hash document.embedding = embedding document.document_metadata = document_metadata - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index f83b171bc..1f2693844 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -564,7 +564,7 @@ async def index_slack_messages( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index ad34e8696..d04a98177 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -603,7 +603,7 @@ async def index_teams_messages( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.updated_at = get_current_timestamp() document.status = DocumentStatus.ready() diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index 94361cc27..4d2644420 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -410,7 +410,7 @@ async def index_crawled_urls( "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.status = DocumentStatus.ready() # READY status document.updated_at = get_current_timestamp() diff --git a/surfsense_backend/app/tasks/document_processors/base.py b/surfsense_backend/app/tasks/document_processors/base.py index 2edc48e91..580126d48 100644 --- a/surfsense_backend/app/tasks/document_processors/base.py +++ b/surfsense_backend/app/tasks/document_processors/base.py @@ -14,45 +14,37 @@ from app.db import Document md = MarkdownifyTransformer() -def safe_set_chunks(document: Document, chunks: list) -> None: +async def safe_set_chunks( + session: "AsyncSession", document: Document, chunks: list +) -> None: """ - Safely assign chunks to a document without triggering lazy loading. + Delete old chunks and assign new ones to a document. - ALWAYS use this instead of `document.chunks = chunks` to avoid - SQLAlchemy async errors (MissingGreenlet / greenlet_spawn). - - Why this is needed: - - Direct assignment `document.chunks = chunks` triggers SQLAlchemy to - load the OLD chunks first (for comparison/orphan detection) - - This lazy loading fails in async context with asyncpg driver - - set_committed_value bypasses this by setting the value directly - - This function is safe regardless of how the document was loaded - (with or without selectinload). + This replaces direct ``document.chunks = chunks`` which triggers lazy + loading (and MissingGreenlet errors in async contexts). It also + explicitly deletes pre-existing chunks so they don't accumulate across + repeated re-indexes — ``set_committed_value`` bypasses SQLAlchemy's + delete-orphan cascade. Args: - document: The Document object to update - chunks: List of Chunk objects to assign - - Example: - # Instead of: document.chunks = chunks (DANGEROUS!) - safe_set_chunks(document, chunks) # Always safe + session: The current async database session. + document: The Document object to update. + chunks: List of Chunk objects to assign. """ - from sqlalchemy.orm import object_session + from sqlalchemy import delete from sqlalchemy.orm.attributes import set_committed_value - # Keep relationship assignment lazy-load-safe. - set_committed_value(document, "chunks", chunks) + from app.db import Chunk - # Ensure chunk rows are actually persisted. - # set_committed_value bypasses normal unit-of-work tracking, so we need to - # explicitly attach chunk objects to the current session. - session = object_session(document) - if session is not None: - if document.id is not None: - for chunk in chunks: - chunk.document_id = document.id - session.add_all(chunks) + if document.id is not None: + await session.execute( + delete(Chunk).where(Chunk.document_id == document.id) + ) + for chunk in chunks: + chunk.document_id = document.id + + set_committed_value(document, "chunks", chunks) + session.add_all(chunks) def get_current_timestamp() -> datetime: diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index a86b64499..a6b9568b9 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -227,7 +227,7 @@ async def add_circleback_meeting_document( if summary_embedding is not None: document.embedding = summary_embedding document.document_metadata = document_metadata - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.source_markdown = markdown_content document.content_needs_reindexing = False document.updated_at = get_current_timestamp() diff --git a/surfsense_backend/app/tasks/document_processors/extension_processor.py b/surfsense_backend/app/tasks/document_processors/extension_processor.py index a6e482e15..7320ec9fa 100644 --- a/surfsense_backend/app/tasks/document_processors/extension_processor.py +++ b/surfsense_backend/app/tasks/document_processors/extension_processor.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, get_current_timestamp, + safe_set_chunks, ) @@ -154,7 +155,7 @@ async def add_extension_received_document( existing_document.content_hash = content_hash existing_document.embedding = summary_embedding existing_document.document_metadata = content.metadata.model_dump() - existing_document.chunks = chunks + await safe_set_chunks(session, existing_document, chunks) existing_document.source_markdown = combined_document_string existing_document.updated_at = get_current_timestamp() diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 5e97951bd..647435213 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -35,6 +35,7 @@ from .base import ( check_document_by_unique_identifier, check_duplicate_document, get_current_timestamp, + safe_set_chunks, ) from .markdown_processor import add_received_markdown_file_document @@ -488,7 +489,7 @@ async def add_received_file_document_using_unstructured( "FILE_NAME": file_name, "ETL_SERVICE": "UNSTRUCTURED", } - existing_document.chunks = chunks + await safe_set_chunks(session, existing_document, chunks) existing_document.source_markdown = file_in_markdown existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() @@ -622,7 +623,7 @@ async def add_received_file_document_using_llamacloud( "FILE_NAME": file_name, "ETL_SERVICE": "LLAMACLOUD", } - existing_document.chunks = chunks + await safe_set_chunks(session, existing_document, chunks) existing_document.source_markdown = file_in_markdown existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() @@ -777,7 +778,7 @@ async def add_received_file_document_using_docling( "FILE_NAME": file_name, "ETL_SERVICE": "DOCLING", } - existing_document.chunks = chunks + await safe_set_chunks(session, existing_document, chunks) existing_document.source_markdown = file_in_markdown existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index a8d20c062..d598bf9dd 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -21,6 +21,7 @@ from .base import ( check_document_by_unique_identifier, check_duplicate_document, get_current_timestamp, + safe_set_chunks, ) @@ -258,7 +259,7 @@ async def add_received_markdown_file_document( existing_document.document_metadata = { "FILE_NAME": file_name, } - existing_document.chunks = chunks + await safe_set_chunks(session, existing_document, chunks) existing_document.source_markdown = file_in_markdown existing_document.updated_at = get_current_timestamp() existing_document.status = DocumentStatus.ready() # Mark as ready diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index 13b969fb6..0ed2e57d2 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -419,7 +419,7 @@ async def add_youtube_video_document( "author": video_data.get("author_name", "Unknown"), "thumbnail": video_data.get("thumbnail_url", ""), } - safe_set_chunks(document, chunks) + await safe_set_chunks(session, document, chunks) document.source_markdown = combined_document_string document.status = DocumentStatus.ready() # READY status - fully processed document.updated_at = get_current_timestamp() diff --git a/surfsense_backend/app/tasks/surfsense_docs_indexer.py b/surfsense_backend/app/tasks/surfsense_docs_indexer.py index ca4a83de3..7d449c0ab 100644 --- a/surfsense_backend/app/tasks/surfsense_docs_indexer.py +++ b/surfsense_backend/app/tasks/surfsense_docs_indexer.py @@ -13,12 +13,32 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload +from sqlalchemy import delete as sa_delete +from sqlalchemy.orm.attributes import set_committed_value + from app.config import config from app.db import SurfsenseDocsChunk, SurfsenseDocsDocument, async_session_maker from app.utils.document_converters import embed_text logger = logging.getLogger(__name__) + +async def _safe_set_docs_chunks( + session: AsyncSession, document: SurfsenseDocsDocument, chunks: list +) -> None: + """safe_set_chunks variant for the SurfsenseDocsDocument/Chunk models.""" + if document.id is not None: + await session.execute( + sa_delete(SurfsenseDocsChunk).where( + SurfsenseDocsChunk.document_id == document.id + ) + ) + for chunk in chunks: + chunk.document_id = document.id + + set_committed_value(document, "chunks", chunks) + session.add_all(chunks) + # Path to docs relative to project root DOCS_DIR = ( Path(__file__).resolve().parent.parent.parent.parent @@ -156,7 +176,7 @@ async def index_surfsense_docs(session: AsyncSession) -> tuple[int, int, int, in existing_doc.content = content existing_doc.content_hash = content_hash existing_doc.embedding = embed_text(content) - existing_doc.chunks = chunks + await _safe_set_docs_chunks(session, existing_doc, chunks) existing_doc.updated_at = datetime.now(UTC) updated += 1