Merge pull request #404 from MODSetter/dev

feat: add unique identifier hash for documents to prevent duplicates across various connectors
This commit is contained in:
Rohan Verma 2025-10-14 21:19:53 -07:00 committed by GitHub
commit d868bae134
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1631 additions and 356 deletions

View file

@ -0,0 +1,54 @@
"""Add unique_identifier_hash column to documents table
Revision ID: 29
Revises: 28
"""
from collections.abc import Sequence
import sqlalchemy as sa
from sqlalchemy import inspect
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "29"
down_revision: str | None = "28"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
bind = op.get_bind()
inspector = inspect(bind)
columns = [col["name"] for col in inspector.get_columns("documents")]
# Only add the column if it doesn't already exist
if "unique_identifier_hash" not in columns:
op.add_column(
"documents",
sa.Column("unique_identifier_hash", sa.String(), nullable=True),
)
op.create_index(
op.f("ix_documents_unique_identifier_hash"),
"documents",
["unique_identifier_hash"],
unique=False,
)
op.create_unique_constraint(
op.f("uq_documents_unique_identifier_hash"),
"documents",
["unique_identifier_hash"],
)
else:
print(
"Column 'unique_identifier_hash' already exists. Skipping column creation."
)
def downgrade() -> None:
op.drop_constraint(
op.f("uq_documents_unique_identifier_hash"), "documents", type_="unique"
)
op.drop_index(op.f("ix_documents_unique_identifier_hash"), table_name="documents")
op.drop_column("documents", "unique_identifier_hash")

View file

@ -174,6 +174,7 @@ class Document(BaseModel, TimestampMixin):
content = Column(Text, nullable=False)
content_hash = Column(String, nullable=False, index=True, unique=True)
unique_identifier_hash = Column(String, nullable=True, index=True, unique=True)
embedding = Column(Vector(config.embedding_model_instance.dimension))
search_space_id = Column(

View file

@ -16,11 +16,12 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -240,25 +241,100 @@ async def index_airtable_records(
documents_skipped += 1
continue
record_id = record.get("id", "Unknown")
# Generate unique identifier hash for this Airtable record
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.AIRTABLE_CONNECTOR,
record_id,
search_space_id,
)
# Generate content hash
content_hash = generate_content_hash(
markdown_content, search_space_id
)
# Check if document already exists
existing_document_by_hash = (
await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = (
await check_document_by_unique_identifier(
session, unique_identifier_hash
)
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for message {record.get('id')}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Airtable record {record_id} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Airtable record {record_id}. Updating document."
)
# Generate document summary
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"record_id": record_id,
"created_time": record.get(
"CREATED_TIME()", ""
),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content,
user_llm,
document_metadata,
)
else:
summary_content = (
f"Airtable Record: {record_id}\n\n"
)
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
)
)
# Process chunks
chunks = await create_document_chunks(
markdown_content
)
# Update existing document
existing_document.title = (
f"Airtable Record: {record_id}"
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"record_id": record_id,
"created_time": record.get(
"CREATED_TIME()", ""
),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated Airtable record {record_id}"
)
continue
# Document doesn't exist - create new one
# Generate document summary
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -266,7 +342,7 @@ async def index_airtable_records(
if user_llm:
document_metadata = {
"record_id": record.get("id", "Unknown"),
"record_id": record_id,
"created_time": record.get("CREATED_TIME()", ""),
"document_type": "Airtable Record",
"connector_type": "Airtable",
@ -279,7 +355,7 @@ async def index_airtable_records(
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Airtable Record: {record.get('id', 'Unknown')}\n\n"
summary_content = f"Airtable Record: {record_id}\n\n"
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
@ -291,18 +367,19 @@ async def index_airtable_records(
# Create and store new document
logger.info(
f"Creating new document for Airtable record: {record.get('id', 'Unknown')}"
f"Creating new document for Airtable record: {record_id}"
)
document = Document(
search_space_id=search_space_id,
title=f"Airtable Record: {record.get('id', 'Unknown')}",
title=f"Airtable Record: {record_id}",
document_type=DocumentType.AIRTABLE_CONNECTOR,
document_metadata={
"record_id": record.get("id", "Unknown"),
"record_id": record_id,
"created_time": record.get("CREATED_TIME()", ""),
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -37,6 +37,30 @@ async def check_duplicate_document_by_hash(
return existing_doc_result.scalars().first()
async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str
) -> Document | None:
"""
Check if a document with the given unique identifier hash already exists.
Eagerly loads chunks to avoid lazy loading issues during updates.
Args:
session: Database session
unique_identifier_hash: Hash of the unique identifier from the source system
Returns:
Existing document if found, None otherwise
"""
from sqlalchemy.orm import selectinload
existing_doc_result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.where(Document.unique_identifier_hash == unique_identifier_hash)
)
return existing_doc_result.scalars().first()
async def get_connector_by_id(
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType
) -> SearchSourceConnector | None:

View file

@ -16,10 +16,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -209,18 +210,92 @@ async def index_clickup_tasks(
documents_skipped += 1
continue
# Hash for duplicates
content_hash = generate_content_hash(task_content, search_space_id)
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Generate unique identifier hash for this ClickUp task
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CLICKUP_CONNECTOR, task_id, search_space_id
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for task {task_name}. Skipping processing."
)
documents_skipped += 1
continue
# Generate content hash
content_hash = generate_content_hash(task_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for ClickUp task {task_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for ClickUp task {task_name}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"task_id": task_id,
"task_name": task_name,
"task_status": task_status,
"task_priority": task_priority,
"task_list": task_list_name,
"task_space": task_space_name,
"assignees": len(task_assignees),
"document_type": "ClickUp Task",
"connector_type": "ClickUp",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
task_content, user_llm, document_metadata
)
else:
summary_content = task_content
summary_embedding = (
config.embedding_model_instance.embed(task_content)
)
# Process chunks
chunks = await create_document_chunks(task_content)
# Update existing document
existing_document.title = f"Task - {task_name}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"task_id": task_id,
"task_name": task_name,
"task_status": task_status,
"task_priority": task_priority,
"task_assignees": task_assignees,
"task_due_date": task_due_date,
"task_created": task_created,
"task_updated": task_updated,
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated ClickUp task {task_name}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -270,6 +345,7 @@ async def index_clickup_tasks(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -16,11 +16,12 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -217,26 +218,97 @@ async def index_confluence_pages(
documents_skipped += 1
continue
# Generate unique identifier hash for this Confluence page
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CONFLUENCE_CONNECTOR, page_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(full_content, search_space_id)
# Check if document already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing."
)
documents_skipped += 1
continue
comment_count = len(comments)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Confluence page {page_title} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Confluence page {page_title}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"page_title": page_title,
"page_id": page_id,
"space_id": space_id,
"comment_count": comment_count,
"document_type": "Confluence Page",
"connector_type": "Confluence",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
full_content, user_llm, document_metadata
)
else:
summary_content = f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n"
if page_content:
content_preview = page_content[:1000]
if len(page_content) > 1000:
content_preview += "..."
summary_content += (
f"Content Preview: {content_preview}\n\n"
)
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(full_content)
# Update existing document
existing_document.title = f"Confluence - {page_title}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"page_id": page_id,
"page_title": page_title,
"space_id": space_id,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated Confluence page {page_title}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
comment_count = len(comments)
if user_llm:
document_metadata = {
@ -287,6 +359,7 @@ async def index_confluence_pages(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -16,11 +16,12 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
build_document_metadata_string,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -307,23 +308,98 @@ async def index_discord_messages(
combined_document_string = build_document_metadata_string(
metadata_sections
)
# Generate unique identifier hash for this Discord channel
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.DISCORD_CONNECTOR, channel_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Skip duplicates by hash
existing_document_by_hash = (
await check_duplicate_document_by_hash(
session, content_hash
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Discord channel {guild_name}#{channel_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Discord channel {guild_name}#{channel_name}. Updating document."
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if not user_llm:
logger.error(
f"No long context LLM configured for user {user_id}"
)
skipped_channels.append(
f"{guild_name}#{channel_name} (no LLM configured)"
)
documents_skipped += 1
continue
# Generate summary with metadata
document_metadata = {
"guild_name": guild_name,
"channel_name": channel_name,
"message_count": len(formatted_messages),
"document_type": "Discord Channel Messages",
"connector_type": "Discord",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
combined_document_string,
user_llm,
document_metadata,
)
# Chunks from channel content
chunks = await create_document_chunks(channel_content)
# Update existing document
existing_document.title = (
f"Discord - {guild_name}#{channel_name}"
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_count": len(formatted_messages),
"start_date": start_date_iso,
"end_date": end_date_iso,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated Discord channel {guild_name}#{channel_name}"
)
continue
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -375,6 +451,7 @@ async def index_discord_messages(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -16,10 +16,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
)
@ -199,19 +200,101 @@ async def index_github_repos(
)
continue # Skip if content fetch failed
content_hash = generate_content_hash(file_content, search_space_id)
# Check if document with this content hash already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Generate unique identifier hash for this GitHub file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GITHUB_CONNECTOR, file_sha, search_space_id
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing."
)
continue
# Generate content hash
content_hash = generate_content_hash(file_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for GitHub file {full_path_key} unchanged. Skipping."
)
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for GitHub file {full_path_key}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
file_extension = (
file_path.split(".")[-1]
if "." in file_path
else None
)
document_metadata = {
"file_path": full_path_key,
"repository": repo_full_name,
"file_type": file_extension or "unknown",
"document_type": "GitHub Repository File",
"connector_type": "GitHub",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
file_content, user_llm, document_metadata
)
else:
summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..."
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
)
)
# Chunk the content
try:
if hasattr(config, "code_chunker_instance"):
chunks_data = [
await create_document_chunks(file_content)
][0]
else:
chunks_data = await create_document_chunks(
file_content
)
except Exception as chunk_err:
logger.error(
f"Failed to chunk file {full_path_key}: {chunk_err}"
)
continue
# Update existing document
existing_document.title = f"GitHub - {full_path_key}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"file_path": file_path,
"file_sha": file_sha,
"file_url": file_url,
"repository": repo_full_name,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks_data
logger.info(
f"Successfully updated GitHub file {full_path_key}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -290,6 +373,7 @@ async def index_github_repos(
document_metadata=doc_metadata,
content=summary_content, # Store summary
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
chunks=chunks_data, # Associate chunks directly

View file

@ -17,9 +17,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -248,23 +250,99 @@ async def index_google_calendar_events(
location = event.get("location", "")
description = event.get("description", "")
# Generate unique identifier hash for this Google Calendar event
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_CALENDAR_CONNECTOR, event_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(event_markdown, search_space_id)
# Duplicate check via simple query using helper in base
from .base import (
check_duplicate_document_by_hash, # local import to avoid circular at module import
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for event {event_summary}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Google Calendar event {event_summary} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Google Calendar event {event_summary}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location or "No location",
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
event_markdown, user_llm, document_metadata
)
else:
summary_content = (
f"Google Calendar Event: {event_summary}\n\n"
)
summary_content += f"Calendar: {calendar_id}\n"
summary_content += f"Start: {start_time}\n"
summary_content += f"End: {end_time}\n"
if location:
summary_content += f"Location: {location}\n"
if description:
desc_preview = description[:1000]
if len(description) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(event_markdown)
# Update existing document
existing_document.title = f"Calendar Event - {event_summary}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated Google Calendar event {event_summary}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -320,6 +398,7 @@ async def index_google_calendar_events(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -21,10 +21,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -194,21 +195,85 @@ async def index_google_gmail_messages(
documents_skipped += 1
continue
# Generate unique identifier hash for this Gmail message
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_GMAIL_CONNECTOR, message_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(markdown_content, search_space_id)
# Check if document already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Gmail message {subject} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Gmail message {subject}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Google Gmail Message: {subject}\n\n"
summary_content += f"Sender: {sender}\n"
summary_content += f"Date: {date_str}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Update existing document
existing_document.title = f"Gmail: {subject}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(f"Successfully updated Gmail message {subject}")
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -258,6 +323,7 @@ async def index_google_gmail_messages(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -16,11 +16,12 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -200,26 +201,96 @@ async def index_jira_issues(
documents_skipped += 1
continue
# Generate unique identifier hash for this Jira issue
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.JIRA_CONNECTOR, issue_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing."
)
documents_skipped += 1
continue
comment_count = len(formatted_issue.get("comments", []))
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Jira issue {issue_identifier} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Jira issue {issue_identifier}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"issue_key": issue_identifier,
"issue_title": issue_title,
"status": formatted_issue.get("status", "Unknown"),
"priority": formatted_issue.get("priority", "Unknown"),
"comment_count": comment_count,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
issue_content, user_llm, document_metadata
)
else:
summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n"
if formatted_issue.get("description"):
summary_content += f"Description: {formatted_issue.get('description')}\n\n"
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(issue_content)
# Update existing document
existing_document.title = (
f"Jira - {issue_identifier}: {issue_title}"
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": formatted_issue.get("status", "Unknown"),
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated Jira issue {issue_identifier}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
comment_count = len(formatted_issue.get("comments", []))
if user_llm:
document_metadata = {
@ -270,6 +341,7 @@ async def index_jira_issues(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -16,11 +16,12 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
calculate_date_range,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -213,27 +214,101 @@ async def index_linear_issues(
documents_skipped += 1
continue
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document with this content hash already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Generate unique identifier hash for this Linear issue
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.LINEAR_CONNECTOR, issue_id, search_space_id
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing."
)
documents_skipped += 1
continue
# Generate content hash
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
state = formatted_issue.get("state", "Unknown")
description = formatted_issue.get("description", "")
comment_count = len(formatted_issue.get("comments", []))
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Linear issue {issue_identifier} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Linear issue {issue_identifier}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"issue_id": issue_identifier,
"issue_title": issue_title,
"state": state,
"priority": formatted_issue.get("priority", "Unknown"),
"comment_count": comment_count,
"document_type": "Linear Issue",
"connector_type": "Linear",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
issue_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
if description and len(description) > 1000:
description = description[:997] + "..."
summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n"
if description:
summary_content += f"Description: {description}\n\n"
summary_content += f"Comments: {comment_count}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(issue_content)
# Update existing document
existing_document.title = (
f"Linear - {issue_identifier}: {issue_title}"
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(
f"Successfully updated Linear issue {issue_identifier}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
state = formatted_issue.get("state", "Unknown")
description = formatted_issue.get("description", "")
comment_count = len(formatted_issue.get("comments", []))
if user_llm:
document_metadata = {
@ -285,6 +360,7 @@ async def index_linear_issues(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -16,9 +16,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -254,21 +256,108 @@ async def index_luma_events(
description = event_data.get("description", "")
cover_url = event_data.get("cover_url", "")
# Generate unique identifier hash for this Luma event
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.LUMA_CONNECTOR, event_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(event_markdown, search_space_id)
# Duplicate check via simple query using helper in base
from .base import check_duplicate_document_by_hash
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for event {event_name}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Luma event {event_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Luma event {event_name}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"event_id": event_id,
"event_name": event_name,
"event_url": event_url,
"start_at": start_at,
"end_at": end_at,
"timezone": timezone,
"location": location or "No location",
"city": city,
"hosts": host_names,
"document_type": "Luma Event",
"connector_type": "Luma",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
event_markdown, user_llm, document_metadata
)
else:
summary_content = f"Luma Event: {event_name}\n\n"
if event_url:
summary_content += f"URL: {event_url}\n"
summary_content += f"Start: {start_at}\n"
summary_content += f"End: {end_at}\n"
if timezone:
summary_content += f"Timezone: {timezone}\n"
if location:
summary_content += f"Location: {location}\n"
if city:
summary_content += f"City: {city}\n"
if host_names:
summary_content += f"Hosts: {host_names}\n"
if description:
desc_preview = description[:1000]
if len(description) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(event_markdown)
# Update existing document
existing_document.title = f"Luma Event - {event_name}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"event_id": event_id,
"event_name": event_name,
"event_url": event_url,
"start_at": start_at,
"end_at": end_at,
"timezone": timezone,
"location": location,
"city": city,
"hosts": host_names,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(f"Successfully updated Luma event {event_name}")
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -340,6 +429,7 @@ async def index_luma_events(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -15,11 +15,12 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
build_document_metadata_string,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -282,22 +283,82 @@ async def index_notion_pages(
combined_document_string = build_document_metadata_string(
metadata_sections
)
# Generate unique identifier hash for this Notion page
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.NOTION_CONNECTOR, page_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Check if document with this content hash already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Notion page {page_title} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Notion page {page_title}. Updating document."
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if not user_llm:
logger.error(
f"No long context LLM configured for user {user_id}"
)
skipped_pages.append(f"{page_title} (no LLM configured)")
documents_skipped += 1
continue
# Generate summary with metadata
document_metadata = {
"page_title": page_title,
"page_id": page_id,
"document_type": "Notion Page",
"connector_type": "Notion",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Update existing document
existing_document.title = f"Notion - {page_title}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"page_title": page_title,
"page_id": page_id,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
documents_indexed += 1
logger.info(f"Successfully updated Notion page: {page_title}")
continue
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
@ -336,6 +397,7 @@ async def index_notion_pages(
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)

View file

@ -15,12 +15,13 @@ from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_unique_identifier_hash,
)
from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_duplicate_document_by_hash,
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
@ -235,6 +236,7 @@ async def index_slack_messages(
for msg in formatted_messages:
timestamp = msg.get("datetime", "Unknown Time")
msg_ts = msg.get("ts", timestamp) # Get original Slack timestamp
msg_user_name = msg.get("user_name", "Unknown User")
msg_user_email = msg.get("user_email", "Unknown Email")
msg_text = msg.get("text", "")
@ -261,22 +263,68 @@ async def index_slack_messages(
combined_document_string = build_document_metadata_markdown(
metadata_sections
)
# Generate unique identifier hash for this Slack message
unique_identifier = f"{channel_id}_{msg_ts}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.SLACK_CONNECTOR, unique_identifier, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Check if document with this content hash already exists
existing_document_by_hash = await check_duplicate_document_by_hash(
session, content_hash
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing."
)
documents_skipped += 1
continue
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Slack message {msg_ts} in channel {channel_name}. Updating document."
)
# Update chunks and embedding
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Update existing document
existing_document.content = combined_document_string
existing_document.content_hash = content_hash
existing_document.embedding = doc_embedding
existing_document.document_metadata = {
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
# Delete old chunks and add new ones
existing_document.chunks = chunks
documents_indexed += 1
logger.info(f"Successfully updated Slack message {msg_ts}")
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(combined_document_string)
doc_embedding = config.embedding_model_instance.embed(
@ -300,6 +348,7 @@ async def index_slack_messages(
embedding=doc_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)

View file

@ -29,3 +29,27 @@ async def check_duplicate_document(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()
async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str
) -> Document | None:
"""
Check if a document with the given unique identifier hash already exists.
Eagerly loads chunks to avoid lazy loading issues during updates.
Args:
session: Database session
unique_identifier_hash: Hash of the unique identifier from the source
Returns:
Existing document if found, None otherwise
"""
from sqlalchemy.orm import selectinload
existing_doc_result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.where(Document.unique_identifier_hash == unique_identifier_hash)
)
return existing_doc_result.scalars().first()

View file

@ -15,10 +15,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -85,25 +86,42 @@ async def add_extension_received_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
# Generate unique identifier hash for this extension document (using URL)
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.EXTENSION, content.metadata.VisitedWebPageURL, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists: {content.metadata.VisitedWebPageTitle}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"Extension document unchanged: {content.metadata.VisitedWebPageTitle}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document for URL {content.metadata.VisitedWebPageURL} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -127,21 +145,36 @@ async def add_extension_received_document(
# Process chunks
chunks = await create_document_chunks(content.pageContent)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=content.metadata.VisitedWebPageTitle,
document_type=DocumentType.EXTENSION,
document_metadata=content.metadata.model_dump(),
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
# Update or create document
if existing_document:
# Update existing document
existing_document.title = content.metadata.VisitedWebPageTitle
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = content.metadata.model_dump()
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=content.metadata.VisitedWebPageTitle,
document_type=DocumentType.EXTENSION,
document_metadata=content.metadata.model_dump(),
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(

View file

@ -15,10 +15,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -47,19 +48,31 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements
)
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# TODO: Check if file_markdown exceeds token limit of embedding model
# Get user's long context LLM
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -79,24 +92,42 @@ async def add_received_file_document_using_unstructured(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
@ -131,17 +162,31 @@ async def add_received_file_document_using_llamacloud(
# Combine all markdown documents into one
file_in_markdown = llamacloud_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -161,24 +206,42 @@ async def add_received_file_document_using_llamacloud(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
@ -214,17 +277,31 @@ async def add_received_file_document_using_docling(
try:
file_in_markdown = docling_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -268,20 +345,38 @@ async def add_received_file_document_using_docling(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = enhanced_summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
},
content=enhanced_summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
},
content=enhanced_summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()

View file

@ -14,10 +14,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -56,25 +57,41 @@ async def add_received_markdown_file_document(
)
try:
# Generate unique identifier hash for this markdown file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Markdown file document already exists: {file_name}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"Markdown file document unchanged: {file_name}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document for markdown file {file_name} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for markdown file {file_name}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -93,23 +110,40 @@ async def add_received_markdown_file_document(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(

View file

@ -17,10 +17,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
md,
)
@ -129,31 +130,49 @@ async def add_crawled_url_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check for duplicates
await task_logger.log_task_progress(
log_entry,
f"Checking for duplicate content: {url}",
{"stage": "duplicate_check", "content_hash": content_hash},
# Generate unique identifier hash for this URL
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CRAWLED_URL, url, search_space_id
)
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Document already exists for URL: {url}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Get LLM for summary generation
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing URL: {url}",
{"stage": "duplicate_check", "url": url},
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"URL document unchanged: {url}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(f"Document for URL {url} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(f"Content changed for URL {url}. Updating document.")
await task_logger.log_task_progress(
log_entry,
f"Updating URL document: {url}",
{"stage": "document_update", "url": url},
)
# Get LLM for summary generation (needed for both create and update)
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {url}",
@ -194,27 +213,50 @@ async def add_crawled_url_document(
chunks = await create_document_chunks(content_in_markdown)
# Create and store document
await task_logger.log_task_progress(
log_entry,
f"Creating document in database for URL: {url}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
# Update or create document
if existing_document:
# Update existing document
await task_logger.log_task_progress(
log_entry,
f"Updating document in database for URL: {url}",
{"stage": "document_update", "chunks_count": len(chunks)},
)
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
if isinstance(crawl_loader, FireCrawlLoader)
else url_crawled[0].metadata["source"],
document_type=DocumentType.CRAWLED_URL,
document_metadata=url_crawled[0].metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
existing_document.title = (
url_crawled[0].metadata["title"]
if isinstance(crawl_loader, FireCrawlLoader)
else url_crawled[0].metadata["source"]
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = url_crawled[0].metadata
existing_document.chunks = chunks
session.add(document)
document = existing_document
else:
# Create new document
await task_logger.log_task_progress(
log_entry,
f"Creating document in database for URL: {url}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
if isinstance(crawl_loader, FireCrawlLoader)
else url_crawled[0].metadata["source"],
document_type=DocumentType.CRAWLED_URL,
document_metadata=url_crawled[0].metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)

View file

@ -17,10 +17,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -201,32 +202,54 @@ async def add_youtube_video_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check for duplicates
await task_logger.log_task_progress(
log_entry,
f"Checking for duplicate video content: {video_id}",
{"stage": "duplicate_check", "content_hash": content_hash},
# Generate unique identifier hash for this YouTube video
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.YOUTUBE_VIDEO, video_id, search_space_id
)
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
"video_id": video_id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Get LLM for summary generation
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing video: {video_id}",
{"stage": "duplicate_check", "video_id": video_id},
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
"video_id": video_id,
},
)
logging.info(
f"Document for YouTube video {video_id} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for YouTube video {video_id}. Updating document."
)
await task_logger.log_task_progress(
log_entry,
f"Updating YouTube video document: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_update", "video_id": video_id},
)
# Get LLM for summary generation (needed for both create and update)
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
@ -270,33 +293,60 @@ async def add_youtube_video_document(
chunks = await create_document_chunks(combined_document_string)
# Create document
await task_logger.log_task_progress(
log_entry,
f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
# Update or create document
if existing_document:
# Update existing document
await task_logger.log_task_progress(
log_entry,
f"Updating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_update", "chunks_count": len(chunks)},
)
document = Document(
title=video_data.get("title", "YouTube Video"),
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
existing_document.title = video_data.get("title", "YouTube Video")
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
search_space_id=search_space_id,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
await task_logger.log_task_progress(
log_entry,
f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
document = Document(
title=video_data.get("title", "YouTube Video"),
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
search_space_id=search_space_id,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(

View file

@ -3,7 +3,7 @@ import hashlib
from litellm import get_model_info, token_counter
from app.config import config
from app.db import Chunk
from app.db import Chunk, DocumentType
from app.prompts import SUMMARY_PROMPT_TEMPLATE
@ -308,3 +308,40 @@ def generate_content_hash(content: str, search_space_id: int) -> str:
"""Generate SHA-256 hash for the given content combined with search space ID."""
combined_data = f"{search_space_id}:{content}"
return hashlib.sha256(combined_data.encode("utf-8")).hexdigest()
def generate_unique_identifier_hash(
document_type: DocumentType,
unique_identifier: str | int | float,
search_space_id: int,
) -> str:
"""
Generate SHA-256 hash for a unique document identifier from connector sources.
This function creates a consistent hash based on the document type, its unique
identifier from the source system, and the search space ID. This helps prevent
duplicate documents when syncing from various connectors like Slack, Notion, Jira, etc.
Args:
document_type: The type of document (e.g., SLACK_CONNECTOR, NOTION_CONNECTOR)
unique_identifier: The unique ID from the source system (e.g., message ID, page ID)
search_space_id: The search space this document belongs to
Returns:
str: SHA-256 hash string representing the unique document identifier
Example:
>>> generate_unique_identifier_hash(
... DocumentType.SLACK_CONNECTOR,
... "1234567890.123456",
... 42
... )
'a1b2c3d4e5f6...'
"""
# Convert unique_identifier to string to handle different types
identifier_str = str(unique_identifier)
# Combine document type value, unique identifier, and search space ID
combined_data = f"{document_type.value}:{identifier_str}:{search_space_id}"
return hashlib.sha256(combined_data.encode("utf-8")).hexdigest()