diff --git a/surfsense_backend/alembic/versions/29_add_unique_identifier_hash_to_documents.py b/surfsense_backend/alembic/versions/29_add_unique_identifier_hash_to_documents.py new file mode 100644 index 000000000..cf3486473 --- /dev/null +++ b/surfsense_backend/alembic/versions/29_add_unique_identifier_hash_to_documents.py @@ -0,0 +1,54 @@ +"""Add unique_identifier_hash column to documents table + +Revision ID: 29 +Revises: 28 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from sqlalchemy import inspect + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "29" +down_revision: str | None = "28" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + columns = [col["name"] for col in inspector.get_columns("documents")] + + # Only add the column if it doesn't already exist + if "unique_identifier_hash" not in columns: + op.add_column( + "documents", + sa.Column("unique_identifier_hash", sa.String(), nullable=True), + ) + op.create_index( + op.f("ix_documents_unique_identifier_hash"), + "documents", + ["unique_identifier_hash"], + unique=False, + ) + op.create_unique_constraint( + op.f("uq_documents_unique_identifier_hash"), + "documents", + ["unique_identifier_hash"], + ) + else: + print( + "Column 'unique_identifier_hash' already exists. Skipping column creation." + ) + + +def downgrade() -> None: + op.drop_constraint( + op.f("uq_documents_unique_identifier_hash"), "documents", type_="unique" + ) + op.drop_index(op.f("ix_documents_unique_identifier_hash"), table_name="documents") + op.drop_column("documents", "unique_identifier_hash") diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 80920c8a9..db1feaf21 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -174,6 +174,7 @@ class Document(BaseModel, TimestampMixin): content = Column(Text, nullable=False) content_hash = Column(String, nullable=False, index=True, unique=True) + unique_identifier_hash = Column(String, nullable=True, index=True, unique=True) embedding = Column(Vector(config.embedding_model_instance.dimension)) search_space_id = Column( diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 0cc21bb47..b670391eb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -16,11 +16,12 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( calculate_date_range, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -240,25 +241,100 @@ async def index_airtable_records( documents_skipped += 1 continue + record_id = record.get("id", "Unknown") + + # Generate unique identifier hash for this Airtable record + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.AIRTABLE_CONNECTOR, + record_id, + search_space_id, + ) + # Generate content hash content_hash = generate_content_hash( markdown_content, search_space_id ) - # Check if document already exists - existing_document_by_hash = ( - await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = ( + await check_document_by_unique_identifier( + session, unique_identifier_hash ) ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for message {record.get('id')}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Airtable record {record_id} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Airtable record {record_id}. Updating document." + ) + # Generate document summary + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "record_id": record_id, + "created_time": record.get( + "CREATED_TIME()", "" + ), + "document_type": "Airtable Record", + "connector_type": "Airtable", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, + user_llm, + document_metadata, + ) + else: + summary_content = ( + f"Airtable Record: {record_id}\n\n" + ) + summary_embedding = ( + config.embedding_model_instance.embed( + summary_content + ) + ) + + # Process chunks + chunks = await create_document_chunks( + markdown_content + ) + + # Update existing document + existing_document.title = ( + f"Airtable Record: {record_id}" + ) + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "record_id": record_id, + "created_time": record.get( + "CREATED_TIME()", "" + ), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated Airtable record {record_id}" + ) + continue + + # Document doesn't exist - create new one # Generate document summary user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -266,7 +342,7 @@ async def index_airtable_records( if user_llm: document_metadata = { - "record_id": record.get("id", "Unknown"), + "record_id": record_id, "created_time": record.get("CREATED_TIME()", ""), "document_type": "Airtable Record", "connector_type": "Airtable", @@ -279,7 +355,7 @@ async def index_airtable_records( ) else: # Fallback to simple summary if no LLM configured - summary_content = f"Airtable Record: {record.get('id', 'Unknown')}\n\n" + summary_content = f"Airtable Record: {record_id}\n\n" summary_embedding = ( config.embedding_model_instance.embed( summary_content @@ -291,18 +367,19 @@ async def index_airtable_records( # Create and store new document logger.info( - f"Creating new document for Airtable record: {record.get('id', 'Unknown')}" + f"Creating new document for Airtable record: {record_id}" ) document = Document( search_space_id=search_space_id, - title=f"Airtable Record: {record.get('id', 'Unknown')}", + title=f"Airtable Record: {record_id}", document_type=DocumentType.AIRTABLE_CONNECTOR, document_metadata={ - "record_id": record.get("id", "Unknown"), + "record_id": record_id, "created_time": record.get("CREATED_TIME()", ""), }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index 6d6f823e1..052ae3f4a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -37,6 +37,30 @@ async def check_duplicate_document_by_hash( return existing_doc_result.scalars().first() +async def check_document_by_unique_identifier( + session: AsyncSession, unique_identifier_hash: str +) -> Document | None: + """ + Check if a document with the given unique identifier hash already exists. + Eagerly loads chunks to avoid lazy loading issues during updates. + + Args: + session: Database session + unique_identifier_hash: Hash of the unique identifier from the source system + + Returns: + Existing document if found, None otherwise + """ + from sqlalchemy.orm import selectinload + + existing_doc_result = await session.execute( + select(Document) + .options(selectinload(Document.chunks)) + .where(Document.unique_identifier_hash == unique_identifier_hash) + ) + return existing_doc_result.scalars().first() + + async def get_connector_by_id( session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType ) -> SearchSourceConnector | None: diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 5ee7342fa..4c057946b 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -16,10 +16,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -209,18 +210,92 @@ async def index_clickup_tasks( documents_skipped += 1 continue - # Hash for duplicates - content_hash = generate_content_hash(task_content, search_space_id) - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Generate unique identifier hash for this ClickUp task + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CLICKUP_CONNECTOR, task_id, search_space_id ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for task {task_name}. Skipping processing." - ) - documents_skipped += 1 - continue + # Generate content hash + content_hash = generate_content_hash(task_content, search_space_id) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for ClickUp task {task_name} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for ClickUp task {task_name}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "task_id": task_id, + "task_name": task_name, + "task_status": task_status, + "task_priority": task_priority, + "task_list": task_list_name, + "task_space": task_space_name, + "assignees": len(task_assignees), + "document_type": "ClickUp Task", + "connector_type": "ClickUp", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + task_content, user_llm, document_metadata + ) + else: + summary_content = task_content + summary_embedding = ( + config.embedding_model_instance.embed(task_content) + ) + + # Process chunks + chunks = await create_document_chunks(task_content) + + # Update existing document + existing_document.title = f"Task - {task_name}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "task_id": task_id, + "task_name": task_name, + "task_status": task_status, + "task_priority": task_priority, + "task_assignees": task_assignees, + "task_due_date": task_due_date, + "task_created": task_created, + "task_updated": task_updated, + "indexed_at": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated ClickUp task {task_name}" + ) + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -270,6 +345,7 @@ async def index_clickup_tasks( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index f9d424b30..afdbdd177 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -16,11 +16,12 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( calculate_date_range, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -217,26 +218,97 @@ async def index_confluence_pages( documents_skipped += 1 continue + # Generate unique identifier hash for this Confluence page + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CONFLUENCE_CONNECTOR, page_id, search_space_id + ) + # Generate content hash content_hash = generate_content_hash(full_content, search_space_id) - # Check if document already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing." - ) - documents_skipped += 1 - continue + comment_count = len(comments) + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Confluence page {page_title} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Confluence page {page_title}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "page_title": page_title, + "page_id": page_id, + "space_id": space_id, + "comment_count": comment_count, + "document_type": "Confluence Page", + "connector_type": "Confluence", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + full_content, user_llm, document_metadata + ) + else: + summary_content = f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n" + if page_content: + content_preview = page_content[:1000] + if len(page_content) > 1000: + content_preview += "..." + summary_content += ( + f"Content Preview: {content_preview}\n\n" + ) + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(full_content) + + # Update existing document + existing_document.title = f"Confluence - {page_title}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "page_id": page_id, + "page_title": page_title, + "space_id": space_id, + "comment_count": comment_count, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated Confluence page {page_title}" + ) + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) - comment_count = len(comments) if user_llm: document_metadata = { @@ -287,6 +359,7 @@ async def index_confluence_pages( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 08c995f64..b08a36132 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -16,11 +16,12 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( build_document_metadata_string, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -307,23 +308,98 @@ async def index_discord_messages( combined_document_string = build_document_metadata_string( metadata_sections ) + + # Generate unique identifier hash for this Discord channel + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.DISCORD_CONNECTOR, channel_id, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash( combined_document_string, search_space_id ) - # Skip duplicates by hash - existing_document_by_hash = ( - await check_duplicate_document_by_hash( - session, content_hash - ) + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for channel {guild_name}#{channel_name}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Discord channel {guild_name}#{channel_name} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Discord channel {guild_name}#{channel_name}. Updating document." + ) + + # Get user's long context LLM + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + if not user_llm: + logger.error( + f"No long context LLM configured for user {user_id}" + ) + skipped_channels.append( + f"{guild_name}#{channel_name} (no LLM configured)" + ) + documents_skipped += 1 + continue + + # Generate summary with metadata + document_metadata = { + "guild_name": guild_name, + "channel_name": channel_name, + "message_count": len(formatted_messages), + "document_type": "Discord Channel Messages", + "connector_type": "Discord", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + combined_document_string, + user_llm, + document_metadata, + ) + + # Chunks from channel content + chunks = await create_document_chunks(channel_content) + + # Update existing document + existing_document.title = ( + f"Discord - {guild_name}#{channel_name}" + ) + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "guild_name": guild_name, + "guild_id": guild_id, + "channel_name": channel_name, + "channel_id": channel_id, + "message_count": len(formatted_messages), + "start_date": start_date_iso, + "end_date": end_date_iso, + "indexed_at": datetime.now(UTC).strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated Discord channel {guild_name}#{channel_name}" + ) + continue + + # Document doesn't exist - create new one # Get user's long context LLM user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -375,6 +451,7 @@ async def index_discord_messages( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 9cc0c0993..8cd8ca299 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -16,10 +16,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, ) @@ -199,19 +200,101 @@ async def index_github_repos( ) continue # Skip if content fetch failed - content_hash = generate_content_hash(file_content, search_space_id) - - # Check if document with this content hash already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Generate unique identifier hash for this GitHub file + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GITHUB_CONNECTOR, file_sha, search_space_id ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for file {full_path_key}. Skipping processing." - ) - continue + # Generate content hash + content_hash = generate_content_hash(file_content, search_space_id) + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for GitHub file {full_path_key} unchanged. Skipping." + ) + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for GitHub file {full_path_key}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + if user_llm: + file_extension = ( + file_path.split(".")[-1] + if "." in file_path + else None + ) + document_metadata = { + "file_path": full_path_key, + "repository": repo_full_name, + "file_type": file_extension or "unknown", + "document_type": "GitHub Repository File", + "connector_type": "GitHub", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + file_content, user_llm, document_metadata + ) + else: + summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..." + summary_embedding = ( + config.embedding_model_instance.embed( + summary_content + ) + ) + + # Chunk the content + try: + if hasattr(config, "code_chunker_instance"): + chunks_data = [ + await create_document_chunks(file_content) + ][0] + else: + chunks_data = await create_document_chunks( + file_content + ) + except Exception as chunk_err: + logger.error( + f"Failed to chunk file {full_path_key}: {chunk_err}" + ) + continue + + # Update existing document + existing_document.title = f"GitHub - {full_path_key}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "file_path": file_path, + "file_sha": file_sha, + "file_url": file_url, + "repository": repo_full_name, + "indexed_at": datetime.now(UTC).strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + existing_document.chunks = chunks_data + + logger.info( + f"Successfully updated GitHub file {full_path_key}" + ) + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -290,6 +373,7 @@ async def index_github_repos( document_metadata=doc_metadata, content=summary_content, # Store summary content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, search_space_id=search_space_id, chunks=chunks_data, # Associate chunks directly diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 22173a41a..b7d8e0b59 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -17,9 +17,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -248,23 +250,99 @@ async def index_google_calendar_events( location = event.get("location", "") description = event.get("description", "") + # Generate unique identifier hash for this Google Calendar event + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_CALENDAR_CONNECTOR, event_id, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(event_markdown, search_space_id) - # Duplicate check via simple query using helper in base - from .base import ( - check_duplicate_document_by_hash, # local import to avoid circular at module import + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash - ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for event {event_summary}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Google Calendar event {event_summary} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Google Calendar event {event_summary}. Updating document." + ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "event_id": event_id, + "event_summary": event_summary, + "calendar_id": calendar_id, + "start_time": start_time, + "end_time": end_time, + "location": location or "No location", + "document_type": "Google Calendar Event", + "connector_type": "Google Calendar", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + event_markdown, user_llm, document_metadata + ) + else: + summary_content = ( + f"Google Calendar Event: {event_summary}\n\n" + ) + summary_content += f"Calendar: {calendar_id}\n" + summary_content += f"Start: {start_time}\n" + summary_content += f"End: {end_time}\n" + if location: + summary_content += f"Location: {location}\n" + if description: + desc_preview = description[:1000] + if len(description) > 1000: + desc_preview += "..." + summary_content += f"Description: {desc_preview}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(event_markdown) + + # Update existing document + existing_document.title = f"Calendar Event - {event_summary}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "event_id": event_id, + "event_summary": event_summary, + "calendar_id": calendar_id, + "start_time": start_time, + "end_time": end_time, + "location": location, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated Google Calendar event {event_summary}" + ) + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -320,6 +398,7 @@ async def index_google_calendar_events( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 872e19d03..9d3823741 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -21,10 +21,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -194,21 +195,85 @@ async def index_google_gmail_messages( documents_skipped += 1 continue + # Generate unique identifier hash for this Gmail message + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_GMAIL_CONNECTOR, message_id, search_space_id + ) + # Generate content hash content_hash = generate_content_hash(markdown_content, search_space_id) - # Check if document already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Gmail message {subject} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Gmail message {subject}. Updating document." + ) + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "document_type": "Gmail Message", + "connector_type": "Google Gmail", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = f"Google Gmail Message: {subject}\n\n" + summary_content += f"Sender: {sender}\n" + summary_content += f"Date: {date_str}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(markdown_content) + + # Update existing document + existing_document.title = f"Gmail: {subject}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "connector_id": connector_id, + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info(f"Successfully updated Gmail message {subject}") + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -258,6 +323,7 @@ async def index_google_gmail_messages( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index e9d556954..36e09c81e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -16,11 +16,12 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( calculate_date_range, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -200,26 +201,96 @@ async def index_jira_issues( documents_skipped += 1 continue + # Generate unique identifier hash for this Jira issue + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.JIRA_CONNECTOR, issue_id, search_space_id + ) + # Generate content hash content_hash = generate_content_hash(issue_content, search_space_id) - # Check if document already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing." - ) - documents_skipped += 1 - continue + comment_count = len(formatted_issue.get("comments", [])) + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Jira issue {issue_identifier} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Jira issue {issue_identifier}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "issue_key": issue_identifier, + "issue_title": issue_title, + "status": formatted_issue.get("status", "Unknown"), + "priority": formatted_issue.get("priority", "Unknown"), + "comment_count": comment_count, + "document_type": "Jira Issue", + "connector_type": "Jira", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + issue_content, user_llm, document_metadata + ) + else: + summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n" + if formatted_issue.get("description"): + summary_content += f"Description: {formatted_issue.get('description')}\n\n" + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(issue_content) + + # Update existing document + existing_document.title = ( + f"Jira - {issue_identifier}: {issue_title}" + ) + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": formatted_issue.get("status", "Unknown"), + "comment_count": comment_count, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated Jira issue {issue_identifier}" + ) + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) - comment_count = len(formatted_issue.get("comments", [])) if user_llm: document_metadata = { @@ -270,6 +341,7 @@ async def index_jira_issues( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 4730fa868..33d5835ee 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -16,11 +16,12 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( calculate_date_range, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -213,27 +214,101 @@ async def index_linear_issues( documents_skipped += 1 continue - content_hash = generate_content_hash(issue_content, search_space_id) - - # Check if document with this content hash already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Generate unique identifier hash for this Linear issue + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.LINEAR_CONNECTOR, issue_id, search_space_id ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing." - ) - documents_skipped += 1 - continue + # Generate content hash + content_hash = generate_content_hash(issue_content, search_space_id) + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + state = formatted_issue.get("state", "Unknown") + description = formatted_issue.get("description", "") + comment_count = len(formatted_issue.get("comments", [])) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Linear issue {issue_identifier} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Linear issue {issue_identifier}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "issue_id": issue_identifier, + "issue_title": issue_title, + "state": state, + "priority": formatted_issue.get("priority", "Unknown"), + "comment_count": comment_count, + "document_type": "Linear Issue", + "connector_type": "Linear", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + issue_content, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + if description and len(description) > 1000: + description = description[:997] + "..." + summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n" + if description: + summary_content += f"Description: {description}\n\n" + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(issue_content) + + # Update existing document + existing_document.title = ( + f"Linear - {issue_identifier}: {issue_title}" + ) + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": state, + "comment_count": comment_count, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info( + f"Successfully updated Linear issue {issue_identifier}" + ) + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) - state = formatted_issue.get("state", "Unknown") - description = formatted_issue.get("description", "") - comment_count = len(formatted_issue.get("comments", [])) if user_llm: document_metadata = { @@ -285,6 +360,7 @@ async def index_linear_issues( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 226494008..15588afaa 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -16,9 +16,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -254,21 +256,108 @@ async def index_luma_events( description = event_data.get("description", "") cover_url = event_data.get("cover_url", "") + # Generate unique identifier hash for this Luma event + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.LUMA_CONNECTOR, event_id, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(event_markdown, search_space_id) - # Duplicate check via simple query using helper in base - from .base import check_duplicate_document_by_hash - - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for event {event_name}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Luma event {event_name} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Luma event {event_name}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "event_id": event_id, + "event_name": event_name, + "event_url": event_url, + "start_at": start_at, + "end_at": end_at, + "timezone": timezone, + "location": location or "No location", + "city": city, + "hosts": host_names, + "document_type": "Luma Event", + "connector_type": "Luma", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + event_markdown, user_llm, document_metadata + ) + else: + summary_content = f"Luma Event: {event_name}\n\n" + if event_url: + summary_content += f"URL: {event_url}\n" + summary_content += f"Start: {start_at}\n" + summary_content += f"End: {end_at}\n" + if timezone: + summary_content += f"Timezone: {timezone}\n" + if location: + summary_content += f"Location: {location}\n" + if city: + summary_content += f"City: {city}\n" + if host_names: + summary_content += f"Hosts: {host_names}\n" + if description: + desc_preview = description[:1000] + if len(description) > 1000: + desc_preview += "..." + summary_content += f"Description: {desc_preview}\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(event_markdown) + + # Update existing document + existing_document.title = f"Luma Event - {event_name}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "event_id": event_id, + "event_name": event_name, + "event_url": event_url, + "start_at": start_at, + "end_at": end_at, + "timezone": timezone, + "location": location, + "city": city, + "hosts": host_names, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info(f"Successfully updated Luma event {event_name}") + continue + + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -340,6 +429,7 @@ async def index_luma_events( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index b290f86da..699d2fddf 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -15,11 +15,12 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( build_document_metadata_string, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -282,22 +283,82 @@ async def index_notion_pages( combined_document_string = build_document_metadata_string( metadata_sections ) + + # Generate unique identifier hash for this Notion page + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.NOTION_CONNECTOR, page_id, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash( combined_document_string, search_space_id ) - # Check if document with this content hash already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for page {page_title}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Notion page {page_title} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Notion page {page_title}. Updating document." + ) + # Get user's long context LLM + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + if not user_llm: + logger.error( + f"No long context LLM configured for user {user_id}" + ) + skipped_pages.append(f"{page_title} (no LLM configured)") + documents_skipped += 1 + continue + + # Generate summary with metadata + document_metadata = { + "page_title": page_title, + "page_id": page_id, + "document_type": "Notion Page", + "connector_type": "Notion", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + + # Process chunks + chunks = await create_document_chunks(markdown_content) + + # Update existing document + existing_document.title = f"Notion - {page_title}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "page_title": page_title, + "page_id": page_id, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info(f"Successfully updated Notion page: {page_title}") + continue + + # Document doesn't exist - create new one # Get user's long context LLM user_llm = await get_user_long_context_llm( session, user_id, search_space_id @@ -336,6 +397,7 @@ async def index_notion_pages( }, content=summary_content, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, embedding=summary_embedding, chunks=chunks, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index fb6dac9c5..dd9edcc8d 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -15,12 +15,13 @@ from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, generate_content_hash, + generate_unique_identifier_hash, ) from .base import ( build_document_metadata_markdown, calculate_date_range, - check_duplicate_document_by_hash, + check_document_by_unique_identifier, get_connector_by_id, logger, update_connector_last_indexed, @@ -235,6 +236,7 @@ async def index_slack_messages( for msg in formatted_messages: timestamp = msg.get("datetime", "Unknown Time") + msg_ts = msg.get("ts", timestamp) # Get original Slack timestamp msg_user_name = msg.get("user_name", "Unknown User") msg_user_email = msg.get("user_email", "Unknown Email") msg_text = msg.get("text", "") @@ -261,22 +263,68 @@ async def index_slack_messages( combined_document_string = build_document_metadata_markdown( metadata_sections ) + + # Generate unique identifier hash for this Slack message + unique_identifier = f"{channel_id}_{msg_ts}" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.SLACK_CONNECTOR, unique_identifier, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash( combined_document_string, search_space_id ) - # Check if document with this content hash already exists - existing_document_by_hash = await check_duplicate_document_by_hash( - session, content_hash + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) - if existing_document_by_hash: - logger.info( - f"Document with content hash {content_hash} already exists for channel {channel_name}. Skipping processing." - ) - documents_skipped += 1 - continue + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Slack message {msg_ts} in channel {channel_name}. Updating document." + ) + # Update chunks and embedding + chunks = await create_document_chunks( + combined_document_string + ) + doc_embedding = config.embedding_model_instance.embed( + combined_document_string + ) + + # Update existing document + existing_document.content = combined_document_string + existing_document.content_hash = content_hash + existing_document.embedding = doc_embedding + existing_document.document_metadata = { + "channel_name": channel_name, + "channel_id": channel_id, + "start_date": start_date_str, + "end_date": end_date_str, + "message_count": len(formatted_messages), + "indexed_at": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + + # Delete old chunks and add new ones + existing_document.chunks = chunks + + documents_indexed += 1 + logger.info(f"Successfully updated Slack message {msg_ts}") + continue + + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks(combined_document_string) doc_embedding = config.embedding_model_instance.embed( @@ -300,6 +348,7 @@ async def index_slack_messages( embedding=doc_embedding, chunks=chunks, content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/base.py b/surfsense_backend/app/tasks/document_processors/base.py index d5b1722fb..b3c08fec3 100644 --- a/surfsense_backend/app/tasks/document_processors/base.py +++ b/surfsense_backend/app/tasks/document_processors/base.py @@ -29,3 +29,27 @@ async def check_duplicate_document( select(Document).where(Document.content_hash == content_hash) ) return existing_doc_result.scalars().first() + + +async def check_document_by_unique_identifier( + session: AsyncSession, unique_identifier_hash: str +) -> Document | None: + """ + Check if a document with the given unique identifier hash already exists. + Eagerly loads chunks to avoid lazy loading issues during updates. + + Args: + session: Database session + unique_identifier_hash: Hash of the unique identifier from the source + + Returns: + Existing document if found, None otherwise + """ + from sqlalchemy.orm import selectinload + + existing_doc_result = await session.execute( + select(Document) + .options(selectinload(Document.chunks)) + .where(Document.unique_identifier_hash == unique_identifier_hash) + ) + return existing_doc_result.scalars().first() diff --git a/surfsense_backend/app/tasks/document_processors/extension_processor.py b/surfsense_backend/app/tasks/document_processors/extension_processor.py index ed25b8fbd..663093375 100644 --- a/surfsense_backend/app/tasks/document_processors/extension_processor.py +++ b/surfsense_backend/app/tasks/document_processors/extension_processor.py @@ -15,10 +15,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document, + check_document_by_unique_identifier, ) @@ -85,25 +86,42 @@ async def add_extension_received_document( document_parts.append("") combined_document_string = "\n".join(document_parts) + + # Generate unique identifier hash for this extension document (using URL) + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.EXTENSION, content.metadata.VisitedWebPageURL, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(combined_document_string, search_space_id) - # Check if document with this content hash already exists - existing_document = await check_duplicate_document(session, content_hash) - if existing_document: - await task_logger.log_task_success( - log_entry, - f"Extension document already exists: {content.metadata.VisitedWebPageTitle}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - }, - ) - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) - # Get user's long context LLM + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + await task_logger.log_task_success( + log_entry, + f"Extension document unchanged: {content.metadata.VisitedWebPageTitle}", + { + "duplicate_detected": True, + "existing_document_id": existing_document.id, + }, + ) + logging.info( + f"Document for URL {content.metadata.VisitedWebPageURL} unchanged. Skipping." + ) + return existing_document + else: + # Content has changed - update the existing document + logging.info( + f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document." + ) + + # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( @@ -127,21 +145,36 @@ async def add_extension_received_document( # Process chunks chunks = await create_document_chunks(content.pageContent) - # Create and store document - document = Document( - search_space_id=search_space_id, - title=content.metadata.VisitedWebPageTitle, - document_type=DocumentType.EXTENSION, - document_metadata=content.metadata.model_dump(), - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - ) + # Update or create document + if existing_document: + # Update existing document + existing_document.title = content.metadata.VisitedWebPageTitle + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = content.metadata.model_dump() + existing_document.chunks = chunks - session.add(document) - await session.commit() - await session.refresh(document) + await session.commit() + await session.refresh(existing_document) + document = existing_document + else: + # Create new document + document = Document( + search_space_id=search_space_id, + title=content.metadata.VisitedWebPageTitle, + document_type=DocumentType.EXTENSION, + document_metadata=content.metadata.model_dump(), + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) + + session.add(document) + await session.commit() + await session.refresh(document) # Log success await task_logger.log_task_success( diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 573b2c28c..f509e700b 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -15,10 +15,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document, + check_document_by_unique_identifier, ) @@ -47,19 +48,31 @@ async def add_received_file_document_using_unstructured( unstructured_processed_elements ) + # Generate unique identifier hash for this file + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.FILE, file_name, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this content hash already exists - existing_document = await check_duplicate_document(session, content_hash) + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + if existing_document: - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logging.info(f"Document for file {file_name} unchanged. Skipping.") + return existing_document + else: + # Content has changed - update the existing document + logging.info( + f"Content changed for file {file_name}. Updating document." + ) - # TODO: Check if file_markdown exceeds token limit of embedding model - - # Get user's long context LLM + # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( @@ -79,24 +92,42 @@ async def add_received_file_document_using_unstructured( # Process chunks chunks = await create_document_chunks(file_in_markdown) - # Create and store document - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType.FILE, - document_metadata={ + # Update or create document + if existing_document: + # Update existing document + existing_document.title = file_name + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { "FILE_NAME": file_name, "ETL_SERVICE": "UNSTRUCTURED", - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - ) + } + existing_document.chunks = chunks - session.add(document) - await session.commit() - await session.refresh(document) + await session.commit() + await session.refresh(existing_document) + document = existing_document + else: + # Create new document + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType.FILE, + document_metadata={ + "FILE_NAME": file_name, + "ETL_SERVICE": "UNSTRUCTURED", + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) + + session.add(document) + await session.commit() + await session.refresh(document) return document except SQLAlchemyError as db_error: @@ -131,17 +162,31 @@ async def add_received_file_document_using_llamacloud( # Combine all markdown documents into one file_in_markdown = llamacloud_markdown_document + # Generate unique identifier hash for this file + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.FILE, file_name, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this content hash already exists - existing_document = await check_duplicate_document(session, content_hash) - if existing_document: - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) - # Get user's long context LLM + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logging.info(f"Document for file {file_name} unchanged. Skipping.") + return existing_document + else: + # Content has changed - update the existing document + logging.info( + f"Content changed for file {file_name}. Updating document." + ) + + # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( @@ -161,24 +206,42 @@ async def add_received_file_document_using_llamacloud( # Process chunks chunks = await create_document_chunks(file_in_markdown) - # Create and store document - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType.FILE, - document_metadata={ + # Update or create document + if existing_document: + # Update existing document + existing_document.title = file_name + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { "FILE_NAME": file_name, "ETL_SERVICE": "LLAMACLOUD", - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - ) + } + existing_document.chunks = chunks - session.add(document) - await session.commit() - await session.refresh(document) + await session.commit() + await session.refresh(existing_document) + document = existing_document + else: + # Create new document + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType.FILE, + document_metadata={ + "FILE_NAME": file_name, + "ETL_SERVICE": "LLAMACLOUD", + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) + + session.add(document) + await session.commit() + await session.refresh(document) return document except SQLAlchemyError as db_error: @@ -214,17 +277,31 @@ async def add_received_file_document_using_docling( try: file_in_markdown = docling_markdown_document + # Generate unique identifier hash for this file + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.FILE, file_name, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this content hash already exists - existing_document = await check_duplicate_document(session, content_hash) - if existing_document: - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) - # Get user's long context LLM + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logging.info(f"Document for file {file_name} unchanged. Skipping.") + return existing_document + else: + # Content has changed - update the existing document + logging.info( + f"Content changed for file {file_name}. Updating document." + ) + + # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( @@ -268,20 +345,38 @@ async def add_received_file_document_using_docling( # Process chunks chunks = await create_document_chunks(file_in_markdown) - # Create and store document - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType.FILE, - document_metadata={ + # Update or create document + if existing_document: + # Update existing document + existing_document.title = file_name + existing_document.content = enhanced_summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { "FILE_NAME": file_name, "ETL_SERVICE": "DOCLING", - }, - content=enhanced_summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - ) + } + existing_document.chunks = chunks + + await session.commit() + await session.refresh(existing_document) + document = existing_document + else: + # Create new document + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType.FILE, + document_metadata={ + "FILE_NAME": file_name, + "ETL_SERVICE": "DOCLING", + }, + content=enhanced_summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) session.add(document) await session.commit() diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index fa3c79d81..76215ed51 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -14,10 +14,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document, + check_document_by_unique_identifier, ) @@ -56,25 +57,41 @@ async def add_received_markdown_file_document( ) try: + # Generate unique identifier hash for this markdown file + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.FILE, file_name, search_space_id + ) + + # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this content hash already exists - existing_document = await check_duplicate_document(session, content_hash) - if existing_document: - await task_logger.log_task_success( - log_entry, - f"Markdown file document already exists: {file_name}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - }, - ) - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) - # Get user's long context LLM + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + await task_logger.log_task_success( + log_entry, + f"Markdown file document unchanged: {file_name}", + { + "duplicate_detected": True, + "existing_document_id": existing_document.id, + }, + ) + logging.info( + f"Document for markdown file {file_name} unchanged. Skipping." + ) + return existing_document + else: + # Content has changed - update the existing document + logging.info( + f"Content changed for markdown file {file_name}. Updating document." + ) + + # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( @@ -93,23 +110,40 @@ async def add_received_markdown_file_document( # Process chunks chunks = await create_document_chunks(file_in_markdown) - # Create and store document - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType.FILE, - document_metadata={ + # Update or create document + if existing_document: + # Update existing document + existing_document.title = file_name + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { "FILE_NAME": file_name, - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - ) + } + existing_document.chunks = chunks - session.add(document) - await session.commit() - await session.refresh(document) + await session.commit() + await session.refresh(existing_document) + document = existing_document + else: + # Create new document + document = Document( + search_space_id=search_space_id, + title=file_name, + document_type=DocumentType.FILE, + document_metadata={ + "FILE_NAME": file_name, + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) + + session.add(document) + await session.commit() + await session.refresh(document) # Log success await task_logger.log_task_success( diff --git a/surfsense_backend/app/tasks/document_processors/url_crawler.py b/surfsense_backend/app/tasks/document_processors/url_crawler.py index 682086112..8e2863198 100644 --- a/surfsense_backend/app/tasks/document_processors/url_crawler.py +++ b/surfsense_backend/app/tasks/document_processors/url_crawler.py @@ -17,10 +17,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document, + check_document_by_unique_identifier, md, ) @@ -129,31 +130,49 @@ async def add_crawled_url_document( document_parts.append("") combined_document_string = "\n".join(document_parts) - content_hash = generate_content_hash(combined_document_string, search_space_id) - # Check for duplicates - await task_logger.log_task_progress( - log_entry, - f"Checking for duplicate content: {url}", - {"stage": "duplicate_check", "content_hash": content_hash}, + # Generate unique identifier hash for this URL + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CRAWLED_URL, url, search_space_id ) - existing_document = await check_duplicate_document(session, content_hash) - if existing_document: - await task_logger.log_task_success( - log_entry, - f"Document already exists for URL: {url}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - }, - ) - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Generate content hash + content_hash = generate_content_hash(combined_document_string, search_space_id) - # Get LLM for summary generation + # Check if document with this unique identifier already exists + await task_logger.log_task_progress( + log_entry, + f"Checking for existing URL: {url}", + {"stage": "duplicate_check", "url": url}, + ) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + await task_logger.log_task_success( + log_entry, + f"URL document unchanged: {url}", + { + "duplicate_detected": True, + "existing_document_id": existing_document.id, + }, + ) + logging.info(f"Document for URL {url} unchanged. Skipping.") + return existing_document + else: + # Content has changed - update the existing document + logging.info(f"Content changed for URL {url}. Updating document.") + await task_logger.log_task_progress( + log_entry, + f"Updating URL document: {url}", + {"stage": "document_update", "url": url}, + ) + + # Get LLM for summary generation (needed for both create and update) await task_logger.log_task_progress( log_entry, f"Preparing for summary generation: {url}", @@ -194,27 +213,50 @@ async def add_crawled_url_document( chunks = await create_document_chunks(content_in_markdown) - # Create and store document - await task_logger.log_task_progress( - log_entry, - f"Creating document in database for URL: {url}", - {"stage": "document_creation", "chunks_count": len(chunks)}, - ) + # Update or create document + if existing_document: + # Update existing document + await task_logger.log_task_progress( + log_entry, + f"Updating document in database for URL: {url}", + {"stage": "document_update", "chunks_count": len(chunks)}, + ) - document = Document( - search_space_id=search_space_id, - title=url_crawled[0].metadata["title"] - if isinstance(crawl_loader, FireCrawlLoader) - else url_crawled[0].metadata["source"], - document_type=DocumentType.CRAWLED_URL, - document_metadata=url_crawled[0].metadata, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - ) + existing_document.title = ( + url_crawled[0].metadata["title"] + if isinstance(crawl_loader, FireCrawlLoader) + else url_crawled[0].metadata["source"] + ) + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = url_crawled[0].metadata + existing_document.chunks = chunks - session.add(document) + document = existing_document + else: + # Create new document + await task_logger.log_task_progress( + log_entry, + f"Creating document in database for URL: {url}", + {"stage": "document_creation", "chunks_count": len(chunks)}, + ) + + document = Document( + search_space_id=search_space_id, + title=url_crawled[0].metadata["title"] + if isinstance(crawl_loader, FireCrawlLoader) + else url_crawled[0].metadata["source"], + document_type=DocumentType.CRAWLED_URL, + document_metadata=url_crawled[0].metadata, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) + + session.add(document) await session.commit() await session.refresh(document) diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index a28a7f186..c7d396974 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -17,10 +17,11 @@ from app.utils.document_converters import ( create_document_chunks, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) from .base import ( - check_duplicate_document, + check_document_by_unique_identifier, ) @@ -201,32 +202,54 @@ async def add_youtube_video_document( document_parts.append("") combined_document_string = "\n".join(document_parts) - content_hash = generate_content_hash(combined_document_string, search_space_id) - # Check for duplicates - await task_logger.log_task_progress( - log_entry, - f"Checking for duplicate video content: {video_id}", - {"stage": "duplicate_check", "content_hash": content_hash}, + # Generate unique identifier hash for this YouTube video + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.YOUTUBE_VIDEO, video_id, search_space_id ) - existing_document = await check_duplicate_document(session, content_hash) - if existing_document: - await task_logger.log_task_success( - log_entry, - f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - "video_id": video_id, - }, - ) - logging.info( - f"Document with content hash {content_hash} already exists. Skipping processing." - ) - return existing_document + # Generate content hash + content_hash = generate_content_hash(combined_document_string, search_space_id) - # Get LLM for summary generation + # Check if document with this unique identifier already exists + await task_logger.log_task_progress( + log_entry, + f"Checking for existing video: {video_id}", + {"stage": "duplicate_check", "video_id": video_id}, + ) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + await task_logger.log_task_success( + log_entry, + f"YouTube video document unchanged: {video_data.get('title', 'YouTube Video')}", + { + "duplicate_detected": True, + "existing_document_id": existing_document.id, + "video_id": video_id, + }, + ) + logging.info( + f"Document for YouTube video {video_id} unchanged. Skipping." + ) + return existing_document + else: + # Content has changed - update the existing document + logging.info( + f"Content changed for YouTube video {video_id}. Updating document." + ) + await task_logger.log_task_progress( + log_entry, + f"Updating YouTube video document: {video_data.get('title', 'YouTube Video')}", + {"stage": "document_update", "video_id": video_id}, + ) + + # Get LLM for summary generation (needed for both create and update) await task_logger.log_task_progress( log_entry, f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}", @@ -270,33 +293,60 @@ async def add_youtube_video_document( chunks = await create_document_chunks(combined_document_string) - # Create document - await task_logger.log_task_progress( - log_entry, - f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}", - {"stage": "document_creation", "chunks_count": len(chunks)}, - ) + # Update or create document + if existing_document: + # Update existing document + await task_logger.log_task_progress( + log_entry, + f"Updating YouTube video document in database: {video_data.get('title', 'YouTube Video')}", + {"stage": "document_update", "chunks_count": len(chunks)}, + ) - document = Document( - title=video_data.get("title", "YouTube Video"), - document_type=DocumentType.YOUTUBE_VIDEO, - document_metadata={ + existing_document.title = video_data.get("title", "YouTube Video") + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { "url": url, "video_id": video_id, "video_title": video_data.get("title", "YouTube Video"), "author": video_data.get("author_name", "Unknown"), "thumbnail": video_data.get("thumbnail_url", ""), - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - search_space_id=search_space_id, - content_hash=content_hash, - ) + } + existing_document.chunks = chunks - session.add(document) - await session.commit() - await session.refresh(document) + await session.commit() + await session.refresh(existing_document) + document = existing_document + else: + # Create new document + await task_logger.log_task_progress( + log_entry, + f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}", + {"stage": "document_creation", "chunks_count": len(chunks)}, + ) + + document = Document( + title=video_data.get("title", "YouTube Video"), + document_type=DocumentType.YOUTUBE_VIDEO, + document_metadata={ + "url": url, + "video_id": video_id, + "video_title": video_data.get("title", "YouTube Video"), + "author": video_data.get("author_name", "Unknown"), + "thumbnail": video_data.get("thumbnail_url", ""), + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + search_space_id=search_space_id, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + ) + + session.add(document) + await session.commit() + await session.refresh(document) # Log success await task_logger.log_task_success( diff --git a/surfsense_backend/app/utils/document_converters.py b/surfsense_backend/app/utils/document_converters.py index 69ba27c7b..9883a74ed 100644 --- a/surfsense_backend/app/utils/document_converters.py +++ b/surfsense_backend/app/utils/document_converters.py @@ -3,7 +3,7 @@ import hashlib from litellm import get_model_info, token_counter from app.config import config -from app.db import Chunk +from app.db import Chunk, DocumentType from app.prompts import SUMMARY_PROMPT_TEMPLATE @@ -308,3 +308,40 @@ def generate_content_hash(content: str, search_space_id: int) -> str: """Generate SHA-256 hash for the given content combined with search space ID.""" combined_data = f"{search_space_id}:{content}" return hashlib.sha256(combined_data.encode("utf-8")).hexdigest() + + +def generate_unique_identifier_hash( + document_type: DocumentType, + unique_identifier: str | int | float, + search_space_id: int, +) -> str: + """ + Generate SHA-256 hash for a unique document identifier from connector sources. + + This function creates a consistent hash based on the document type, its unique + identifier from the source system, and the search space ID. This helps prevent + duplicate documents when syncing from various connectors like Slack, Notion, Jira, etc. + + Args: + document_type: The type of document (e.g., SLACK_CONNECTOR, NOTION_CONNECTOR) + unique_identifier: The unique ID from the source system (e.g., message ID, page ID) + search_space_id: The search space this document belongs to + + Returns: + str: SHA-256 hash string representing the unique document identifier + + Example: + >>> generate_unique_identifier_hash( + ... DocumentType.SLACK_CONNECTOR, + ... "1234567890.123456", + ... 42 + ... ) + 'a1b2c3d4e5f6...' + """ + # Convert unique_identifier to string to handle different types + identifier_str = str(unique_identifier) + + # Combine document type value, unique identifier, and search space ID + combined_data = f"{document_type.value}:{identifier_str}:{search_space_id}" + + return hashlib.sha256(combined_data.encode("utf-8")).hexdigest()