feat: add unique identifier hash for documents to prevent duplicates across various connectors

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-10-14 21:09:11 -07:00
parent 673bf6f3c1
commit c99cd710ea
22 changed files with 1631 additions and 356 deletions

View file

@ -29,3 +29,27 @@ async def check_duplicate_document(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()
async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str
) -> Document | None:
"""
Check if a document with the given unique identifier hash already exists.
Eagerly loads chunks to avoid lazy loading issues during updates.
Args:
session: Database session
unique_identifier_hash: Hash of the unique identifier from the source
Returns:
Existing document if found, None otherwise
"""
from sqlalchemy.orm import selectinload
existing_doc_result = await session.execute(
select(Document)
.options(selectinload(Document.chunks))
.where(Document.unique_identifier_hash == unique_identifier_hash)
)
return existing_doc_result.scalars().first()

View file

@ -15,10 +15,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -85,25 +86,42 @@ async def add_extension_received_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
# Generate unique identifier hash for this extension document (using URL)
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.EXTENSION, content.metadata.VisitedWebPageURL, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists: {content.metadata.VisitedWebPageTitle}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"Extension document unchanged: {content.metadata.VisitedWebPageTitle}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document for URL {content.metadata.VisitedWebPageURL} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -127,21 +145,36 @@ async def add_extension_received_document(
# Process chunks
chunks = await create_document_chunks(content.pageContent)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=content.metadata.VisitedWebPageTitle,
document_type=DocumentType.EXTENSION,
document_metadata=content.metadata.model_dump(),
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
# Update or create document
if existing_document:
# Update existing document
existing_document.title = content.metadata.VisitedWebPageTitle
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = content.metadata.model_dump()
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=content.metadata.VisitedWebPageTitle,
document_type=DocumentType.EXTENSION,
document_metadata=content.metadata.model_dump(),
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(

View file

@ -15,10 +15,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -47,19 +48,31 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements
)
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# TODO: Check if file_markdown exceeds token limit of embedding model
# Get user's long context LLM
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -79,24 +92,42 @@ async def add_received_file_document_using_unstructured(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
@ -131,17 +162,31 @@ async def add_received_file_document_using_llamacloud(
# Combine all markdown documents into one
file_in_markdown = llamacloud_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -161,24 +206,42 @@ async def add_received_file_document_using_llamacloud(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
@ -214,17 +277,31 @@ async def add_received_file_document_using_docling(
try:
file_in_markdown = docling_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -268,20 +345,38 @@ async def add_received_file_document_using_docling(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = enhanced_summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
},
content=enhanced_summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
},
content=enhanced_summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()

View file

@ -14,10 +14,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -56,25 +57,41 @@ async def add_received_markdown_file_document(
)
try:
# Generate unique identifier hash for this markdown file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this content hash already exists
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Markdown file document already exists: {file_name}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get user's long context LLM
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"Markdown file document unchanged: {file_name}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document for markdown file {file_name} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for markdown file {file_name}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
@ -93,23 +110,40 @@ async def add_received_markdown_file_document(
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Create and store document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"FILE_NAME": file_name,
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file_name,
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(

View file

@ -17,10 +17,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
md,
)
@ -129,31 +130,49 @@ async def add_crawled_url_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check for duplicates
await task_logger.log_task_progress(
log_entry,
f"Checking for duplicate content: {url}",
{"stage": "duplicate_check", "content_hash": content_hash},
# Generate unique identifier hash for this URL
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CRAWLED_URL, url, search_space_id
)
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"Document already exists for URL: {url}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Get LLM for summary generation
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing URL: {url}",
{"stage": "duplicate_check", "url": url},
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"URL document unchanged: {url}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(f"Document for URL {url} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(f"Content changed for URL {url}. Updating document.")
await task_logger.log_task_progress(
log_entry,
f"Updating URL document: {url}",
{"stage": "document_update", "url": url},
)
# Get LLM for summary generation (needed for both create and update)
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {url}",
@ -194,27 +213,50 @@ async def add_crawled_url_document(
chunks = await create_document_chunks(content_in_markdown)
# Create and store document
await task_logger.log_task_progress(
log_entry,
f"Creating document in database for URL: {url}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
# Update or create document
if existing_document:
# Update existing document
await task_logger.log_task_progress(
log_entry,
f"Updating document in database for URL: {url}",
{"stage": "document_update", "chunks_count": len(chunks)},
)
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
if isinstance(crawl_loader, FireCrawlLoader)
else url_crawled[0].metadata["source"],
document_type=DocumentType.CRAWLED_URL,
document_metadata=url_crawled[0].metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
)
existing_document.title = (
url_crawled[0].metadata["title"]
if isinstance(crawl_loader, FireCrawlLoader)
else url_crawled[0].metadata["source"]
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = url_crawled[0].metadata
existing_document.chunks = chunks
session.add(document)
document = existing_document
else:
# Create new document
await task_logger.log_task_progress(
log_entry,
f"Creating document in database for URL: {url}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
if isinstance(crawl_loader, FireCrawlLoader)
else url_crawled[0].metadata["source"],
document_type=DocumentType.CRAWLED_URL,
document_metadata=url_crawled[0].metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)

View file

@ -17,10 +17,11 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_duplicate_document,
check_document_by_unique_identifier,
)
@ -201,32 +202,54 @@ async def add_youtube_video_document(
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check for duplicates
await task_logger.log_task_progress(
log_entry,
f"Checking for duplicate video content: {video_id}",
{"stage": "duplicate_check", "content_hash": content_hash},
# Generate unique identifier hash for this YouTube video
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.YOUTUBE_VIDEO, video_id, search_space_id
)
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
"video_id": video_id,
},
)
logging.info(
f"Document with content hash {content_hash} already exists. Skipping processing."
)
return existing_document
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Get LLM for summary generation
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing video: {video_id}",
{"stage": "duplicate_check", "video_id": video_id},
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
"video_id": video_id,
},
)
logging.info(
f"Document for YouTube video {video_id} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for YouTube video {video_id}. Updating document."
)
await task_logger.log_task_progress(
log_entry,
f"Updating YouTube video document: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_update", "video_id": video_id},
)
# Get LLM for summary generation (needed for both create and update)
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
@ -270,33 +293,60 @@ async def add_youtube_video_document(
chunks = await create_document_chunks(combined_document_string)
# Create document
await task_logger.log_task_progress(
log_entry,
f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
# Update or create document
if existing_document:
# Update existing document
await task_logger.log_task_progress(
log_entry,
f"Updating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_update", "chunks_count": len(chunks)},
)
document = Document(
title=video_data.get("title", "YouTube Video"),
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
existing_document.title = video_data.get("title", "YouTube Video")
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
search_space_id=search_space_id,
content_hash=content_hash,
)
}
existing_document.chunks = chunks
session.add(document)
await session.commit()
await session.refresh(document)
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
await task_logger.log_task_progress(
log_entry,
f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
document = Document(
title=video_data.get("title", "YouTube Video"),
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
search_space_id=search_space_id,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(