fix(backend): Add duplicate content_hash check to connector indexers

Prevent UniqueViolationError on ix_documents_content_hash constraint by
adding check_duplicate_document_by_hash() before inserting new documents
in 15 connector indexers that were missing this check.

Affected: clickup, luma, linear, jira, google_gmail, confluence,
bookstack, github, webcrawler, teams, slack, notion, discord,
airtable, obsidian indexers.
This commit is contained in:
CREDO23 2026-01-28 14:51:54 +02:00
parent 5eca07f24f
commit a9d393327d
15 changed files with 263 additions and 0 deletions

View file

@ -20,6 +20,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -317,6 +318,24 @@ async def index_airtable_records(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
f"Airtable record {record_id} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate document summary
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -308,6 +309,22 @@ async def index_bookstack_pages(
logger.info(f"Successfully updated BookStack page {page_name}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"BookStack page {page_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -302,6 +303,22 @@ async def index_clickup_tasks(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"ClickUp task {task_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -306,6 +307,22 @@ async def index_confluence_pages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Confluence page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
build_document_metadata_markdown,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -454,6 +455,24 @@ async def index_discord_messages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(

View file

@ -24,6 +24,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -319,6 +320,21 @@ async def _process_repository_digest(
# Delete existing document to replace with new one
await session.delete(existing_document)
await session.flush()
else:
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Repository {repo_full_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
return 0
# Generate summary using LLM (ONE call per repository!)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)

View file

@ -25,6 +25,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -316,6 +317,22 @@ async def index_google_gmail_messages(
logger.info(f"Successfully updated Gmail message {subject}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Gmail message {subject} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -284,6 +285,22 @@ async def index_jira_issues(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Jira issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -315,6 +316,22 @@ async def index_linear_issues(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Linear issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -363,6 +364,22 @@ async def index_luma_events(
logger.info(f"Successfully updated Luma event {event_name}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Luma event {event_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from .base import (
build_document_metadata_string,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -360,6 +361,22 @@ async def index_notion_pages(
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Notion page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(

View file

@ -28,6 +28,7 @@ from app.utils.document_converters import (
from .base import (
build_document_metadata_string,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -426,6 +427,22 @@ async def index_obsidian_vault(
indexed_count += 1
else:
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Obsidian note {title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
skipped_count += 1
continue
# Create new document
logger.info(f"Indexing new note: {title}")

View file

@ -22,6 +22,7 @@ from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -325,6 +326,22 @@ async def index_slack_messages(
logger.info(f"Successfully updated Slack message {msg_ts}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(combined_document_string)

View file

@ -21,6 +21,7 @@ from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -354,6 +355,27 @@ async def index_teams_messages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
"Teams message %s in channel %s already indexed by another connector "
"(existing document ID: %s, type: %s). Skipping.",
message_id,
channel_name,
duplicate_by_content.id,
duplicate_by_content.document_type,
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -281,6 +282,22 @@ async def index_crawled_urls(
logger.info(f"Successfully updated URL {url}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"URL {url} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(