From a9d393327d6751db8a2b88e10aa50d6a0afa8fbd Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 28 Jan 2026 14:51:54 +0200 Subject: [PATCH] fix(backend): Add duplicate content_hash check to connector indexers Prevent UniqueViolationError on ix_documents_content_hash constraint by adding check_duplicate_document_by_hash() before inserting new documents in 15 connector indexers that were missing this check. Affected: clickup, luma, linear, jira, google_gmail, confluence, bookstack, github, webcrawler, teams, slack, notion, discord, airtable, obsidian indexers. --- .../connector_indexers/airtable_indexer.py | 19 ++++++++++++++++ .../connector_indexers/bookstack_indexer.py | 17 ++++++++++++++ .../connector_indexers/clickup_indexer.py | 17 ++++++++++++++ .../connector_indexers/confluence_indexer.py | 17 ++++++++++++++ .../connector_indexers/discord_indexer.py | 19 ++++++++++++++++ .../connector_indexers/github_indexer.py | 16 ++++++++++++++ .../google_gmail_indexer.py | 17 ++++++++++++++ .../tasks/connector_indexers/jira_indexer.py | 17 ++++++++++++++ .../connector_indexers/linear_indexer.py | 17 ++++++++++++++ .../tasks/connector_indexers/luma_indexer.py | 17 ++++++++++++++ .../connector_indexers/notion_indexer.py | 17 ++++++++++++++ .../connector_indexers/obsidian_indexer.py | 17 ++++++++++++++ .../tasks/connector_indexers/slack_indexer.py | 17 ++++++++++++++ .../tasks/connector_indexers/teams_indexer.py | 22 +++++++++++++++++++ .../connector_indexers/webcrawler_indexer.py | 17 ++++++++++++++ 15 files changed, 263 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 4d5a33b79..6bb62d716 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -20,6 +20,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -317,6 +318,24 @@ async def index_airtable_records( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Airtable record {record_id} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate document summary user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index a1067255d..e183ab333 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -308,6 +309,22 @@ async def index_bookstack_pages( logger.info(f"Successfully updated BookStack page {page_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"BookStack page {page_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index e459584f8..887c3e2e5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -302,6 +303,22 @@ async def index_clickup_tasks( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"ClickUp task {task_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index ddbefafb9..5673839bb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -306,6 +307,22 @@ async def index_confluence_pages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Confluence page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 8f0c76e53..9e401b335 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_markdown, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -454,6 +455,24 @@ async def index_discord_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 4a8df4918..fb6989bb9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -24,6 +24,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -319,6 +320,21 @@ async def _process_repository_digest( # Delete existing document to replace with new one await session.delete(existing_document) await session.flush() + else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Repository {repo_full_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + return 0 # Generate summary using LLM (ONE call per repository!) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 08d2904d6..e832997d0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -25,6 +25,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -316,6 +317,22 @@ async def index_google_gmail_messages( logger.info(f"Successfully updated Gmail message {subject}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Gmail message {subject} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 4851a6466..d6095d20e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -284,6 +285,22 @@ async def index_jira_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Jira issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 7d8e0c30e..d00a39160 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -315,6 +316,22 @@ async def index_linear_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Linear issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index ead259a44..59890dbe4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -363,6 +364,22 @@ async def index_luma_events( logger.info(f"Successfully updated Luma event {event_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Luma event {event_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 2d36351fa..70c4917da 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -22,6 +22,7 @@ from .base import ( build_document_metadata_string, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -360,6 +361,22 @@ async def index_notion_pages( continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Notion page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Get user's long context LLM user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index 4c4dab4c2..a603d3fba 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -28,6 +28,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_string, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -426,6 +427,22 @@ async def index_obsidian_vault( indexed_count += 1 else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Obsidian note {title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + skipped_count += 1 + continue + # Create new document logger.info(f"Indexing new note: {title}") diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index f6ed4f567..f244c97f8 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -22,6 +22,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -325,6 +326,22 @@ async def index_slack_messages( logger.info(f"Successfully updated Slack message {msg_ts}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks(combined_document_string) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index b879ddfcb..66b709ddc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -21,6 +21,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -354,6 +355,27 @@ async def index_teams_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + "Teams message %s in channel %s already indexed by another connector " + "(existing document ID: %s, type: %s). Skipping.", + message_id, + channel_name, + duplicate_by_content.id, + duplicate_by_content.document_type, + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index fb1aae5f2..6ae070c06 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -281,6 +282,22 @@ async def index_crawled_urls( logger.info(f"Successfully updated URL {url}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"URL {url} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm(