From aa66928154aacb1e2f8a0fdc4cdc4679d9d2d0b0 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 6 Feb 2026 05:35:15 +0530 Subject: [PATCH] chore: ran linting --- .../versions/92_add_document_status_column.py | 3 - .../connectors/composio_gmail_connector.py | 104 +-- .../composio_google_calendar_connector.py | 99 ++- .../composio_google_drive_connector.py | 164 ++-- surfsense_backend/app/db.py | 29 +- .../app/routes/documents_routes.py | 24 +- surfsense_backend/app/schemas/documents.py | 9 +- .../app/services/connector_service.py | 12 +- .../app/tasks/celery_tasks/document_tasks.py | 18 +- .../stale_notification_cleanup_task.py | 4 +- .../connector_indexers/airtable_indexer.py | 105 ++- .../app/tasks/connector_indexers/base.py | 13 +- .../connector_indexers/bookstack_indexer.py | 116 +-- .../connector_indexers/clickup_indexer.py | 124 +-- .../connector_indexers/confluence_indexer.py | 96 ++- .../connector_indexers/discord_indexer.py | 100 ++- .../elasticsearch_indexer.py | 90 +- .../connector_indexers/github_indexer.py | 74 +- .../google_calendar_indexer.py | 119 +-- .../google_drive_indexer.py | 67 +- .../google_gmail_indexer.py | 101 ++- .../tasks/connector_indexers/jira_indexer.py | 94 ++- .../connector_indexers/linear_indexer.py | 107 ++- .../tasks/connector_indexers/luma_indexer.py | 148 ++-- .../connector_indexers/notion_indexer.py | 73 +- .../connector_indexers/obsidian_indexer.py | 96 ++- .../tasks/connector_indexers/slack_indexer.py | 86 +- .../tasks/connector_indexers/teams_indexer.py | 98 ++- .../connector_indexers/webcrawler_indexer.py | 67 +- .../app/tasks/document_processors/base.py | 13 +- .../circleback_processor.py | 18 +- .../document_processors/file_processors.py | 124 ++- .../document_processors/youtube_processor.py | 24 +- .../(manage)/components/DocumentTypeIcon.tsx | 4 +- .../(manage)/components/DocumentsFilters.tsx | 192 +++-- .../components/DocumentsTableShell.tsx | 82 +- .../(manage)/components/RowActions.tsx | 34 +- .../documents/(manage)/page.tsx | 68 +- .../connector-dialog.atoms.ts | 1 - .../assistant-ui/connector-popup.tsx | 4 +- .../hooks/use-connector-dialog.ts | 36 +- .../components/theme/theme-toggle.tsx | 787 +++++++++--------- surfsense_web/hooks/use-documents.ts | 31 +- surfsense_web/lib/electric/client.ts | 25 +- 44 files changed, 2025 insertions(+), 1658 deletions(-) diff --git a/surfsense_backend/alembic/versions/92_add_document_status_column.py b/surfsense_backend/alembic/versions/92_add_document_status_column.py index 550faa3c3..8204096aa 100644 --- a/surfsense_backend/alembic/versions/92_add_document_status_column.py +++ b/surfsense_backend/alembic/versions/92_add_document_status_column.py @@ -13,8 +13,6 @@ Changes: from collections.abc import Sequence -import sqlalchemy as sa - from alembic import op # revision identifiers, used by Alembic. @@ -77,4 +75,3 @@ def downgrade() -> None: END$$; """ ) - diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index 870053c7f..4764a0a41 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -285,24 +285,28 @@ async def _analyze_gmail_messages_phase1( if existing_document: if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'message_id': message_id, - 'thread_id': thread_id, - 'subject': subject, - 'sender': sender, - 'date_str': date_str, - 'label_ids': label_ids, - }) + messages_to_process.append( + { + "document": existing_document, + "is_new": False, + "markdown_content": markdown_content, + "content_hash": content_hash, + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date_str": date_str, + "label_ids": label_ids, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -350,18 +354,20 @@ async def _analyze_gmail_messages_phase1( ) session.add(document) - messages_to_process.append({ - 'document': document, - 'is_new': True, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'message_id': message_id, - 'thread_id': thread_id, - 'subject': subject, - 'sender': sender, - 'date_str': date_str, - 'label_ids': label_ids, - }) + messages_to_process.append( + { + "document": document, + "is_new": True, + "markdown_content": markdown_content, + "content_hash": content_hash, + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date_str": date_str, + "label_ids": label_ids, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True) @@ -398,7 +404,7 @@ async def _process_gmail_messages_phase2( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -411,37 +417,35 @@ async def _process_gmail_messages_phase2( if user_llm: document_metadata_for_summary = { - "message_id": item['message_id'], - "thread_id": item['thread_id'], - "subject": item['subject'], - "sender": item['sender'], + "message_id": item["message_id"], + "thread_id": item["thread_id"], + "subject": item["subject"], + "sender": item["sender"], "document_type": "Gmail Message (Composio)", } summary_content, summary_embedding = await generate_document_summary( - item['markdown_content'], user_llm, document_metadata_for_summary + item["markdown_content"], user_llm, document_metadata_for_summary ) else: - summary_content = ( - f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}" - ) + summary_content = f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}" summary_embedding = config.embedding_model_instance.embed( summary_content ) - chunks = await create_document_chunks(item['markdown_content']) + chunks = await create_document_chunks(item["markdown_content"]) # Update document to READY with actual content - document.title = item['subject'] + document.title = item["subject"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "message_id": item['message_id'], - "thread_id": item['thread_id'], - "subject": item['subject'], - "sender": item['sender'], - "date": item['date_str'], - "labels": item['label_ids'], + "message_id": item["message_id"], + "thread_id": item["thread_id"], + "subject": item["subject"], + "sender": item["sender"], + "date": item["date_str"], + "labels": item["label_ids"], "connector_id": connector_id, "source": "composio", } @@ -465,7 +469,9 @@ async def _process_gmail_messages_phase2( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue @@ -571,7 +577,9 @@ async def index_composio_gmail( ) all_messages.extend(messages) - logger.info(f"Fetched {len(messages)} messages (total: {len(all_messages)})") + logger.info( + f"Fetched {len(messages)} messages (total: {len(all_messages)})" + ) if not next_token or len(messages) < current_batch_size: break @@ -616,7 +624,7 @@ async def index_composio_gmail( ) # Commit all pending documents - they all appear in UI now - new_documents_count = len([m for m in messages_to_process if m['is_new']]) + new_documents_count = len([m for m in messages_to_process if m["is_new"]]) if new_documents_count > 0: logger.info(f"Phase 1: Committing {new_documents_count} pending documents") await session.commit() @@ -645,9 +653,7 @@ async def index_composio_gmail( await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit to ensure all documents are persisted - logger.info( - f"Final commit: Total {documents_indexed} Gmail messages processed" - ) + logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed") try: await session.commit() logger.info( diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index dc9c18c99..6593721a1 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -268,7 +268,9 @@ async def index_composio_google_calendar( documents_indexed = 0 documents_skipped = 0 documents_failed = 0 # Track events that failed processing - duplicate_content_count = 0 # Track events skipped due to duplicate content_hash + duplicate_content_count = ( + 0 # Track events skipped due to duplicate content_hash + ) last_heartbeat_time = time.time() # ======================================================================= @@ -317,23 +319,27 @@ async def index_composio_google_calendar( if existing_document: if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - events_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'event_id': event_id, - 'summary': summary, - 'start_time': start_time, - 'end_time': end_time, - 'location': location, - }) + events_to_process.append( + { + "document": existing_document, + "is_new": False, + "markdown_content": markdown_content, + "content_hash": content_hash, + "event_id": event_id, + "summary": summary, + "start_time": start_time, + "end_time": end_time, + "location": location, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -383,17 +389,19 @@ async def index_composio_google_calendar( session.add(document) new_documents_created = True - events_to_process.append({ - 'document': document, - 'is_new': True, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'event_id': event_id, - 'summary': summary, - 'start_time': start_time, - 'end_time': end_time, - 'location': location, - }) + events_to_process.append( + { + "document": document, + "is_new": True, + "markdown_content": markdown_content, + "content_hash": content_hash, + "event_id": event_id, + "summary": summary, + "start_time": start_time, + "end_time": end_time, + "location": location, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) @@ -402,7 +410,9 @@ async def index_composio_google_calendar( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -419,7 +429,7 @@ async def index_composio_google_calendar( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -432,35 +442,40 @@ async def index_composio_google_calendar( if user_llm: document_metadata_for_summary = { - "event_id": item['event_id'], - "summary": item['summary'], - "start_time": item['start_time'], + "event_id": item["event_id"], + "summary": item["summary"], + "start_time": item["start_time"], "document_type": "Google Calendar Event (Composio)", } - summary_content, summary_embedding = await generate_document_summary( - item['markdown_content'], user_llm, document_metadata_for_summary + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item["markdown_content"], + user_llm, + document_metadata_for_summary, ) else: summary_content = f"Calendar: {item['summary']}\n\nStart: {item['start_time']}\nEnd: {item['end_time']}" - if item['location']: + if item["location"]: summary_content += f"\nLocation: {item['location']}" summary_embedding = config.embedding_model_instance.embed( summary_content ) - chunks = await create_document_chunks(item['markdown_content']) + chunks = await create_document_chunks(item["markdown_content"]) # Update document to READY with actual content - document.title = item['summary'] + document.title = item["summary"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "event_id": item['event_id'], - "summary": item['summary'], - "start_time": item['start_time'], - "end_time": item['end_time'], - "location": item['location'], + "event_id": item["event_id"], + "summary": item["summary"], + "start_time": item["start_time"], + "end_time": item["end_time"], + "location": item["location"], "connector_id": connector_id, "source": "composio", } @@ -484,7 +499,9 @@ async def index_composio_google_calendar( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 26cfd3020..4ccd195e6 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -938,13 +938,15 @@ async def _index_composio_drive_delta_sync( if existing_document: # Queue existing document for update - files_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'file_id': file_id, - 'file_name': file_name, - 'mime_type': mime_type, - }) + files_to_process.append( + { + "document": existing_document, + "is_new": False, + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + } + ) continue # Create new document with PENDING status @@ -974,13 +976,15 @@ async def _index_composio_drive_delta_sync( session.add(document) new_documents_created = True - files_to_process.append({ - 'document': document, - 'is_new': True, - 'file_id': file_id, - 'file_name': file_name, - 'mime_type': mime_type, - }) + files_to_process.append( + { + "document": document, + "is_new": True, + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for change: {e!s}", exc_info=True) @@ -989,7 +993,9 @@ async def _index_composio_drive_delta_sync( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -1005,7 +1011,7 @@ async def _index_composio_drive_delta_sync( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit document.status = DocumentStatus.processing() @@ -1013,11 +1019,13 @@ async def _index_composio_drive_delta_sync( # Get file content content, content_error = await composio_connector.get_drive_file_content( - item['file_id'], original_mime_type=item['mime_type'] + item["file_id"], original_mime_type=item["mime_type"] ) if content_error or not content: - logger.warning(f"Could not get content for file {item['file_name']}: {content_error}") + logger.warning( + f"Could not get content for file {item['file_name']}: {content_error}" + ) markdown_content = f"# {item['file_name']}\n\n" markdown_content += f"**File ID:** {item['file_id']}\n" markdown_content += f"**Type:** {item['mime_type']}\n" @@ -1031,9 +1039,9 @@ async def _index_composio_drive_delta_sync( else: markdown_content = await _process_file_content( content=content, - file_name=item['file_name'], - file_id=item['file_id'], - mime_type=item['mime_type'], + file_name=item["file_name"], + file_id=item["file_id"], + mime_type=item["mime_type"], search_space_id=search_space_id, user_id=user_id, session=session, @@ -1045,14 +1053,14 @@ async def _index_composio_drive_delta_sync( content_hash = generate_content_hash(markdown_content, search_space_id) # For existing documents, check if content changed - if not item['is_new'] and document.content_hash == content_hash: + if not item["is_new"] and document.content_hash == content_hash: if not DocumentStatus.is_state(document.status, DocumentStatus.READY): document.status = DocumentStatus.ready() documents_skipped += 1 continue # Check for duplicate content hash (for new documents) - if item['is_new']: + if item["is_new"]: with session.no_autoflush: duplicate_by_content = await check_duplicate_document_by_hash( session, content_hash @@ -1067,13 +1075,15 @@ async def _index_composio_drive_delta_sync( continue # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) if user_llm: document_metadata_for_summary = { - "file_id": item['file_id'], - "file_name": item['file_name'], - "mime_type": item['mime_type'], + "file_id": item["file_id"], + "file_name": item["file_name"], + "mime_type": item["mime_type"], "document_type": "Google Drive File (Composio)", } summary_content, summary_embedding = await generate_document_summary( @@ -1081,20 +1091,22 @@ async def _index_composio_drive_delta_sync( ) else: summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) # Update document to READY - document.title = item['file_name'] + document.title = item["file_name"] document.content = summary_content document.content_hash = content_hash document.embedding = summary_embedding document.document_metadata = { - "file_id": item['file_id'], - "file_name": item['file_name'], - "FILE_NAME": item['file_name'], - "mime_type": item['mime_type'], + "file_id": item["file_id"], + "file_name": item["file_name"], + "FILE_NAME": item["file_name"], + "mime_type": item["mime_type"], "connector_id": connector_id, "source": "composio", } @@ -1117,7 +1129,9 @@ async def _index_composio_drive_delta_sync( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue @@ -1329,13 +1343,15 @@ async def _index_composio_drive_full_scan( if existing_document: # Queue existing document for update (will be set to processing in Phase 2) - files_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'file_id': file_id, - 'file_name': file_name, - 'mime_type': mime_type, - }) + files_to_process.append( + { + "document": existing_document, + "is_new": False, + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + } + ) continue # Create new document with PENDING status (visible in UI immediately) @@ -1365,13 +1381,15 @@ async def _index_composio_drive_full_scan( session.add(document) new_documents_created = True - files_to_process.append({ - 'document': document, - 'is_new': True, - 'file_id': file_id, - 'file_name': file_name, - 'mime_type': mime_type, - }) + files_to_process.append( + { + "document": document, + "is_new": True, + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for file: {e!s}", exc_info=True) @@ -1380,7 +1398,9 @@ async def _index_composio_drive_full_scan( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -1397,7 +1417,7 @@ async def _index_composio_drive_full_scan( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -1405,11 +1425,13 @@ async def _index_composio_drive_full_scan( # Get file content (pass mime_type for Google Workspace export handling) content, content_error = await composio_connector.get_drive_file_content( - item['file_id'], original_mime_type=item['mime_type'] + item["file_id"], original_mime_type=item["mime_type"] ) if content_error or not content: - logger.warning(f"Could not get content for file {item['file_name']}: {content_error}") + logger.warning( + f"Could not get content for file {item['file_name']}: {content_error}" + ) markdown_content = f"# {item['file_name']}\n\n" markdown_content += f"**File ID:** {item['file_id']}\n" markdown_content += f"**Type:** {item['mime_type']}\n" @@ -1424,9 +1446,9 @@ async def _index_composio_drive_full_scan( # Process content based on file type markdown_content = await _process_file_content( content=content, - file_name=item['file_name'], - file_id=item['file_id'], - mime_type=item['mime_type'], + file_name=item["file_name"], + file_id=item["file_id"], + mime_type=item["mime_type"], search_space_id=search_space_id, user_id=user_id, session=session, @@ -1438,7 +1460,7 @@ async def _index_composio_drive_full_scan( content_hash = generate_content_hash(markdown_content, search_space_id) # For existing documents, check if content changed - if not item['is_new'] and document.content_hash == content_hash: + if not item["is_new"] and document.content_hash == content_hash: # Ensure status is ready if not DocumentStatus.is_state(document.status, DocumentStatus.READY): document.status = DocumentStatus.ready() @@ -1446,7 +1468,7 @@ async def _index_composio_drive_full_scan( continue # Check for duplicate content hash (for new documents) - if item['is_new']: + if item["is_new"]: with session.no_autoflush: duplicate_by_content = await check_duplicate_document_by_hash( session, content_hash @@ -1462,13 +1484,15 @@ async def _index_composio_drive_full_scan( continue # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) if user_llm: document_metadata_for_summary = { - "file_id": item['file_id'], - "file_name": item['file_name'], - "mime_type": item['mime_type'], + "file_id": item["file_id"], + "file_name": item["file_name"], + "mime_type": item["mime_type"], "document_type": "Google Drive File (Composio)", } summary_content, summary_embedding = await generate_document_summary( @@ -1476,20 +1500,22 @@ async def _index_composio_drive_full_scan( ) else: summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) # Update document to READY with actual content - document.title = item['file_name'] + document.title = item["file_name"] document.content = summary_content document.content_hash = content_hash document.embedding = summary_embedding document.document_metadata = { - "file_id": item['file_id'], - "file_name": item['file_name'], - "FILE_NAME": item['file_name'], - "mime_type": item['mime_type'], + "file_id": item["file_id"], + "file_name": item["file_name"], + "FILE_NAME": item["file_name"], + "mime_type": item["mime_type"], "connector_id": connector_id, "source": "composio", } @@ -1515,7 +1541,9 @@ async def _index_composio_drive_full_scan( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index fb5c711ed..344d83f13 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -103,67 +103,70 @@ class PodcastStatus(str, Enum): class DocumentStatus: """ Helper class for document processing status (stored as JSONB). - + Status values: - {"state": "ready"} - Document is fully processed and searchable - {"state": "pending"} - Document is queued, waiting to be processed - {"state": "processing"} - Document is currently being processed (only 1 at a time) - {"state": "failed", "reason": "..."} - Processing failed with reason - + Usage: document.status = DocumentStatus.pending() document.status = DocumentStatus.processing() document.status = DocumentStatus.ready() document.status = DocumentStatus.failed("LLM rate limit exceeded") """ - + # State constants READY = "ready" PENDING = "pending" PROCESSING = "processing" FAILED = "failed" - + @staticmethod def ready() -> dict: """Return status dict for a ready/searchable document.""" return {"state": DocumentStatus.READY} - + @staticmethod def pending() -> dict: """Return status dict for a document waiting to be processed.""" return {"state": DocumentStatus.PENDING} - + @staticmethod def processing() -> dict: """Return status dict for a document being processed.""" return {"state": DocumentStatus.PROCESSING} - + @staticmethod def failed(reason: str, **extra_details) -> dict: """ Return status dict for a failed document. - + Args: reason: Human-readable failure reason **extra_details: Optional additional details (duplicate_of, error_code, etc.) """ - status = {"state": DocumentStatus.FAILED, "reason": reason[:500]} # Truncate long reasons + status = { + "state": DocumentStatus.FAILED, + "reason": reason[:500], + } # Truncate long reasons if extra_details: status.update(extra_details) return status - + @staticmethod def get_state(status: dict | None) -> str | None: """Extract state from status dict, returns None if invalid.""" if status is None: return None return status.get("state") if isinstance(status, dict) else None - + @staticmethod def is_state(status: dict | None, state: str) -> bool: """Check if status matches a given state.""" return DocumentStatus.get_state(status) == state - + @staticmethod def get_failure_reason(status: dict | None) -> str | None: """Extract failure reason from status dict.""" @@ -866,7 +869,7 @@ class Document(BaseModel, TimestampMixin): JSONB, nullable=False, default=DocumentStatus.ready, - server_default=text("'{\"state\": \"ready\"}'::jsonb"), + server_default=text('\'{"state": "ready"}\'::jsonb'), index=True, ) diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 00c80dcb5..b20f8cd9c 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -114,11 +114,11 @@ async def create_documents_file_upload( ): """ Upload files as documents with real-time status tracking. - + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately via ElectricSQL) - Phase 2: Celery processes each file: pending → processing → ready/failed - + Requires DOCUMENTS_CREATE permission. """ from datetime import datetime @@ -144,7 +144,9 @@ async def create_documents_file_upload( raise HTTPException(status_code=400, detail="No files provided") created_documents: list[Document] = [] - files_to_process: list[tuple[Document, str, str]] = [] # (document, temp_path, filename) + files_to_process: list[ + tuple[Document, str, str] + ] = [] # (document, temp_path, filename) skipped_duplicates = 0 # ===== PHASE 1: Create pending documents for all files ===== @@ -201,7 +203,9 @@ async def create_documents_file_upload( ) session.add(document) created_documents.append(document) - files_to_process.append((document, temp_path, file.filename or "unknown")) + files_to_process.append( + (document, temp_path, file.filename or "unknown") + ) except Exception as e: raise HTTPException( @@ -348,15 +352,15 @@ async def read_documents( created_by_name = None if doc.created_by: created_by_name = doc.created_by.display_name or doc.created_by.email - + # Parse status from JSONB status_data = None - if hasattr(doc, 'status') and doc.status: + if hasattr(doc, "status") and doc.status: status_data = DocumentStatusSchema( state=doc.status.get("state", "ready"), reason=doc.status.get("reason"), ) - + api_documents.append( DocumentRead( id=doc.id, @@ -503,15 +507,15 @@ async def search_documents( created_by_name = None if doc.created_by: created_by_name = doc.created_by.display_name or doc.created_by.email - + # Parse status from JSONB status_data = None - if hasattr(doc, 'status') and doc.status: + if hasattr(doc, "status") and doc.status: status_data = DocumentStatusSchema( state=doc.status.get("state", "ready"), reason=doc.status.get("reason"), ) - + api_documents.append( DocumentRead( id=doc.id, diff --git a/surfsense_backend/app/schemas/documents.py b/surfsense_backend/app/schemas/documents.py index 7d85d0229..4cedc7d93 100644 --- a/surfsense_backend/app/schemas/documents.py +++ b/surfsense_backend/app/schemas/documents.py @@ -43,6 +43,7 @@ class DocumentUpdate(DocumentBase): class DocumentStatusSchema(BaseModel): """Document processing status.""" + state: str # "ready", "processing", "failed" reason: str | None = None @@ -59,8 +60,12 @@ class DocumentRead(BaseModel): updated_at: datetime | None search_space_id: int created_by_id: UUID | None = None # User who created/uploaded this document - created_by_name: str | None = None # Display name or email of the user who created this document - status: DocumentStatusSchema | None = None # Processing status (ready, processing, failed) + created_by_name: str | None = ( + None # Display name or email of the user who created this document + ) + status: DocumentStatusSchema | None = ( + None # Processing status (ready, processing, failed) + ) model_config = ConfigDict(from_attributes=True) diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 6967902d1..251241e96 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1465,11 +1465,7 @@ class ConnectorService: issue_key = metadata.get("issue_key", "") issue_title = metadata.get("issue_title", "Untitled Issue") status = metadata.get("status", "") - title = ( - f"{issue_key} - {issue_title}" - if issue_key - else issue_title - ) + title = f"{issue_key} - {issue_title}" if issue_key else issue_title if status: title += f" ({status})" return title @@ -2387,11 +2383,7 @@ class ConnectorService: def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: event_name = metadata.get("event_name", "Untitled Event") start_time = metadata.get("start_time", "") - return ( - f"{event_name} ({start_time})" - if start_time - else event_name - ) + return f"{event_name} ({start_time})" if start_time else event_name def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: return metadata.get("event_url", "") or "" diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index cd5537927..6dfcbff46 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -548,11 +548,11 @@ def process_file_upload_with_document_task( ): """ Celery task to process uploaded file with existing pending document. - + This task is used by the 2-phase document upload flow: - Phase 1 (API): Creates pending document (visible in UI immediately) - Phase 2 (this task): Updates document status: pending → processing → ready/failed - + Args: document_id: ID of the pending document created in Phase 1 temp_path: Path to the uploaded file @@ -634,7 +634,7 @@ async def _process_file_with_document( ): """ Process file and update existing pending document status. - + This function implements Phase 2 of the 2-phase document upload: - Sets document status to 'processing' (shows spinner in UI) - Processes the file (parsing, embedding, chunking) @@ -669,11 +669,15 @@ async def _process_file_with_document( file_size = os.path.getsize(temp_path) logger.info(f"[_process_file_with_document] File size: {file_size} bytes") except Exception as e: - logger.warning(f"[_process_file_with_document] Could not get file size: {e}") + logger.warning( + f"[_process_file_with_document] Could not get file size: {e}" + ) file_size = None # Create notification for document processing - logger.info(f"[_process_file_with_document] Creating notification for: {filename}") + logger.info( + f"[_process_file_with_document] Creating notification for: {filename}" + ) notification = ( await NotificationService.document_processing.notify_processing_started( session=session, @@ -822,7 +826,9 @@ async def _process_file_with_document( if os.path.exists(temp_path): try: os.unlink(temp_path) - logger.info(f"[_process_file_with_document] Cleaned up temp file: {temp_path}") + logger.info( + f"[_process_file_with_document] Cleaned up temp file: {temp_path}" + ) except Exception as cleanup_error: logger.warning( f"[_process_file_with_document] Failed to clean up temp file: {cleanup_error}" diff --git a/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py b/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py index ff7a11645..ef3a30e43 100644 --- a/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py @@ -154,9 +154,7 @@ async def _cleanup_stale_notifications(): f"Found {len(stale_notification_ids)} stale connector indexing notifications " f"(no Redis heartbeat key): {stale_notification_ids}" ) - logger.info( - f"Connector IDs for document cleanup: {stale_connector_ids}" - ) + logger.info(f"Connector IDs for document cleanup: {stale_connector_ids}") # O(1) Batch UPDATE notifications using JSONB || operator # This merges the update data into existing notification_metadata diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 05a4007ae..46cd069c9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -140,7 +140,9 @@ async def index_airtable_records( log_entry, success_msg, {"bases_count": 0} ) # CRITICAL: Update timestamp even when no bases found so Electric SQL syncs - await update_connector_last_indexed(session, connector, update_last_indexed) + await update_connector_last_indexed( + session, connector, update_last_indexed + ) await session.commit() return 0, None # Return None (not error) when no items found @@ -277,22 +279,28 @@ async def index_airtable_records( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): - existing_document.status = DocumentStatus.ready() + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): + existing_document.status = ( + DocumentStatus.ready() + ) documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - records_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'record_id': record_id, - 'record': record, - 'base_name': base_name, - 'table_name': table_name, - }) + records_to_process.append( + { + "document": existing_document, + "is_new": False, + "markdown_content": markdown_content, + "content_hash": content_hash, + "record_id": record_id, + "record": record, + "base_name": base_name, + "table_name": table_name, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -339,25 +347,31 @@ async def index_airtable_records( session.add(document) new_documents_created = True - records_to_process.append({ - 'document': document, - 'is_new': True, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'record_id': record_id, - 'record': record, - 'base_name': base_name, - 'table_name': table_name, - }) + records_to_process.append( + { + "document": document, + "is_new": True, + "markdown_content": markdown_content, + "content_hash": content_hash, + "record_id": record_id, + "record": record, + "base_name": base_name, + "table_name": table_name, + } + ) except Exception as e: - logger.error(f"Error in Phase 1 for record: {e!s}", exc_info=True) + logger.error( + f"Error in Phase 1 for record: {e!s}", exc_info=True + ) documents_failed += 1 continue # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([r for r in records_to_process if r['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([r for r in records_to_process if r['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -374,7 +388,7 @@ async def index_airtable_records( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -387,13 +401,18 @@ async def index_airtable_records( if user_llm: document_metadata_for_summary = { - "record_id": item['record_id'], - "created_time": item['record'].get("CREATED_TIME()", ""), + "record_id": item["record_id"], + "created_time": item["record"].get("CREATED_TIME()", ""), "document_type": "Airtable Record", "connector_type": "Airtable", } - summary_content, summary_embedding = await generate_document_summary( - item['markdown_content'], user_llm, document_metadata_for_summary + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item["markdown_content"], + user_llm, + document_metadata_for_summary, ) else: # Fallback to simple summary if no LLM configured @@ -402,18 +421,18 @@ async def index_airtable_records( summary_content ) - chunks = await create_document_chunks(item['markdown_content']) + chunks = await create_document_chunks(item["markdown_content"]) # Update document to READY with actual content - document.title = item['record_id'] + document.title = item["record_id"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "record_id": item['record_id'], - "created_time": item['record'].get("CREATED_TIME()", ""), - "base_name": item['base_name'], - "table_name": item['table_name'], + "record_id": item["record_id"], + "created_time": item["record"].get("CREATED_TIME()", ""), + "base_name": item["base_name"], + "table_name": item["table_name"], "connector_id": connector_id, } safe_set_chunks(document, chunks) @@ -430,13 +449,17 @@ async def index_airtable_records( await session.commit() except Exception as e: - logger.error(f"Error processing Airtable record: {e!s}", exc_info=True) + logger.error( + f"Error processing Airtable record: {e!s}", exc_info=True + ) # Mark document as failed with reason (visible in UI) try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue @@ -446,7 +469,9 @@ async def index_airtable_records( total_processed = documents_indexed # Final commit to ensure all documents are persisted (safety net) - logger.info(f"Final commit: Total {documents_indexed} Airtable records processed") + logger.info( + f"Final commit: Total {documents_indexed} Airtable records processed" + ) try: await session.commit() logger.info( diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index b5b4e5559..da32e84a6 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -31,29 +31,30 @@ def get_current_timestamp() -> datetime: def safe_set_chunks(document: Document, chunks: list) -> None: """ Safely assign chunks to a document without triggering lazy loading. - + ALWAYS use this instead of `document.chunks = chunks` to avoid SQLAlchemy async errors (MissingGreenlet / greenlet_spawn). - + Why this is needed: - Direct assignment `document.chunks = chunks` triggers SQLAlchemy to load the OLD chunks first (for comparison/orphan detection) - This lazy loading fails in async context with asyncpg driver - set_committed_value bypasses this by setting the value directly - + This function is safe regardless of how the document was loaded (with or without selectinload). - + Args: document: The Document object to update chunks: List of Chunk objects to assign - + Example: # Instead of: document.chunks = chunks (DANGEROUS!) safe_set_chunks(document, chunks) # Always safe """ from sqlalchemy.orm.attributes import set_committed_value - set_committed_value(document, 'chunks', chunks) + + set_committed_value(document, "chunks", chunks) async def check_duplicate_document_by_hash( diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index fbf90b345..d60884539 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -261,7 +261,9 @@ async def index_bookstack_pages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.info( f"Document for BookStack page {page_name} unchanged. Skipping." @@ -270,20 +272,22 @@ async def index_bookstack_pages( continue # Queue existing document for update (will be set to processing in Phase 2) - pages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'page_id': page_id, - 'page_name': page_name, - 'page_slug': page_slug, - 'book_id': book_id, - 'book_slug': book_slug, - 'chapter_id': chapter_id, - 'page_url': page_url, - 'page_content': page_content, - 'full_content': full_content, - 'content_hash': content_hash, - }) + pages_to_process.append( + { + "document": existing_document, + "is_new": False, + "page_id": page_id, + "page_name": page_name, + "page_slug": page_slug, + "book_id": book_id, + "book_slug": book_slug, + "chapter_id": chapter_id, + "page_url": page_url, + "page_content": page_content, + "full_content": full_content, + "content_hash": content_hash, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -331,20 +335,22 @@ async def index_bookstack_pages( session.add(document) new_documents_created = True - pages_to_process.append({ - 'document': document, - 'is_new': True, - 'page_id': page_id, - 'page_name': page_name, - 'page_slug': page_slug, - 'book_id': book_id, - 'book_slug': book_slug, - 'chapter_id': chapter_id, - 'page_url': page_url, - 'page_content': page_content, - 'full_content': full_content, - 'content_hash': content_hash, - }) + pages_to_process.append( + { + "document": document, + "is_new": True, + "page_id": page_id, + "page_name": page_name, + "page_slug": page_slug, + "book_id": book_id, + "book_slug": book_slug, + "chapter_id": chapter_id, + "page_url": page_url, + "page_content": page_content, + "full_content": full_content, + "content_hash": content_hash, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) @@ -353,7 +359,9 @@ async def index_bookstack_pages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -370,7 +378,7 @@ async def index_bookstack_pages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -383,23 +391,23 @@ async def index_bookstack_pages( # Build document metadata doc_metadata = { - "page_id": item['page_id'], - "page_name": item['page_name'], - "page_slug": item['page_slug'], - "book_id": item['book_id'], - "book_slug": item['book_slug'], - "chapter_id": item['chapter_id'], + "page_id": item["page_id"], + "page_name": item["page_name"], + "page_slug": item["page_slug"], + "book_id": item["book_id"], + "book_slug": item["book_slug"], + "chapter_id": item["chapter_id"], "base_url": bookstack_base_url, - "page_url": item['page_url'], + "page_url": item["page_url"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } if user_llm: summary_metadata = { - "page_name": item['page_name'], - "page_id": item['page_id'], - "book_id": item['book_id'], + "page_name": item["page_name"], + "page_id": item["page_id"], + "book_id": item["book_id"], "document_type": "BookStack Page", "connector_type": "BookStack", } @@ -407,17 +415,15 @@ async def index_bookstack_pages( summary_content, summary_embedding, ) = await generate_document_summary( - item['full_content'], user_llm, summary_metadata + item["full_content"], user_llm, summary_metadata ) else: # Fallback to simple summary if no LLM configured - summary_content = ( - f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n" - ) - if item['page_content']: + summary_content = f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n" + if item["page_content"]: # Take first 1000 characters of content for summary - content_preview = item['page_content'][:1000] - if len(item['page_content']) > 1000: + content_preview = item["page_content"][:1000] + if len(item["page_content"]) > 1000: content_preview += "..." summary_content += f"Content Preview: {content_preview}\n\n" summary_embedding = config.embedding_model_instance.embed( @@ -425,12 +431,12 @@ async def index_bookstack_pages( ) # Process chunks - using the full page content - chunks = await create_document_chunks(item['full_content']) + chunks = await create_document_chunks(item["full_content"]) # Update document to READY with actual content - document.title = item['page_name'] + document.title = item["page_name"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = doc_metadata safe_set_chunks(document, chunks) @@ -456,7 +462,9 @@ async def index_bookstack_pages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) skipped_pages.append( f"{item.get('page_name', 'Unknown')} (processing error)" ) @@ -473,7 +481,9 @@ async def index_bookstack_pages( ) try: await session.commit() - logger.info("Successfully committed all BookStack document changes to database") + logger.info( + "Successfully committed all BookStack document changes to database" + ) except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 934e56744..47c5d8b3b 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -260,7 +260,9 @@ async def index_clickup_tasks( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.info( f"Document for ClickUp task {task_name} unchanged. Skipping." @@ -272,22 +274,24 @@ async def index_clickup_tasks( logger.info( f"Content changed for ClickUp task {task_name}. Queuing for update." ) - tasks_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'task_content': task_content, - 'content_hash': content_hash, - 'task_id': task_id, - 'task_name': task_name, - 'task_status': task_status, - 'task_priority': task_priority, - 'task_list_name': task_list_name, - 'task_space_name': task_space_name, - 'task_assignees': task_assignees, - 'task_due_date': task_due_date, - 'task_created': task_created, - 'task_updated': task_updated, - }) + tasks_to_process.append( + { + "document": existing_document, + "is_new": False, + "task_content": task_content, + "content_hash": content_hash, + "task_id": task_id, + "task_name": task_name, + "task_status": task_status, + "task_priority": task_priority, + "task_list_name": task_list_name, + "task_space_name": task_space_name, + "task_assignees": task_assignees, + "task_due_date": task_due_date, + "task_created": task_created, + "task_updated": task_updated, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -335,22 +339,24 @@ async def index_clickup_tasks( session.add(document) new_documents_created = True - tasks_to_process.append({ - 'document': document, - 'is_new': True, - 'task_content': task_content, - 'content_hash': content_hash, - 'task_id': task_id, - 'task_name': task_name, - 'task_status': task_status, - 'task_priority': task_priority, - 'task_list_name': task_list_name, - 'task_space_name': task_space_name, - 'task_assignees': task_assignees, - 'task_due_date': task_due_date, - 'task_created': task_created, - 'task_updated': task_updated, - }) + tasks_to_process.append( + { + "document": document, + "is_new": True, + "task_content": task_content, + "content_hash": content_hash, + "task_id": task_id, + "task_name": task_name, + "task_status": task_status, + "task_priority": task_priority, + "task_list_name": task_list_name, + "task_space_name": task_space_name, + "task_assignees": task_assignees, + "task_due_date": task_due_date, + "task_created": task_created, + "task_updated": task_updated, + } + ) except Exception as e: logger.error( @@ -362,7 +368,9 @@ async def index_clickup_tasks( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([t for t in tasks_to_process if t['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([t for t in tasks_to_process if t['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -379,7 +387,7 @@ async def index_clickup_tasks( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -392,13 +400,13 @@ async def index_clickup_tasks( if user_llm: document_metadata_for_summary = { - "task_id": item['task_id'], - "task_name": item['task_name'], - "task_status": item['task_status'], - "task_priority": item['task_priority'], - "task_list": item['task_list_name'], - "task_space": item['task_space_name'], - "assignees": len(item['task_assignees']), + "task_id": item["task_id"], + "task_name": item["task_name"], + "task_status": item["task_status"], + "task_priority": item["task_priority"], + "task_list": item["task_list_name"], + "task_space": item["task_space_name"], + "assignees": len(item["task_assignees"]), "document_type": "ClickUp Task", "connector_type": "ClickUp", } @@ -406,30 +414,30 @@ async def index_clickup_tasks( summary_content, summary_embedding, ) = await generate_document_summary( - item['task_content'], user_llm, document_metadata_for_summary + item["task_content"], user_llm, document_metadata_for_summary ) else: - summary_content = item['task_content'] + summary_content = item["task_content"] summary_embedding = config.embedding_model_instance.embed( - item['task_content'] + item["task_content"] ) - chunks = await create_document_chunks(item['task_content']) + chunks = await create_document_chunks(item["task_content"]) # Update document to READY with actual content - document.title = item['task_name'] + document.title = item["task_name"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "task_id": item['task_id'], - "task_name": item['task_name'], - "task_status": item['task_status'], - "task_priority": item['task_priority'], - "task_assignees": item['task_assignees'], - "task_due_date": item['task_due_date'], - "task_created": item['task_created'], - "task_updated": item['task_updated'], + "task_id": item["task_id"], + "task_name": item["task_name"], + "task_status": item["task_status"], + "task_priority": item["task_priority"], + "task_assignees": item["task_assignees"], + "task_due_date": item["task_due_date"], + "task_created": item["task_created"], + "task_updated": item["task_updated"], "connector_id": connector_id, "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } @@ -456,7 +464,9 @@ async def index_clickup_tasks( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 7fd842996..a3a059d4e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -262,23 +262,27 @@ async def index_confluence_pages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - pages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'full_content': full_content, - 'page_content': page_content, - 'content_hash': content_hash, - 'page_id': page_id, - 'page_title': page_title, - 'space_id': space_id, - 'comment_count': comment_count, - }) + pages_to_process.append( + { + "document": existing_document, + "is_new": False, + "full_content": full_content, + "page_content": page_content, + "content_hash": content_hash, + "page_id": page_id, + "page_title": page_title, + "space_id": space_id, + "comment_count": comment_count, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -323,17 +327,19 @@ async def index_confluence_pages( session.add(document) new_documents_created = True - pages_to_process.append({ - 'document': document, - 'is_new': True, - 'full_content': full_content, - 'page_content': page_content, - 'content_hash': content_hash, - 'page_id': page_id, - 'page_title': page_title, - 'space_id': space_id, - 'comment_count': comment_count, - }) + pages_to_process.append( + { + "document": document, + "is_new": True, + "full_content": full_content, + "page_content": page_content, + "content_hash": content_hash, + "page_id": page_id, + "page_title": page_title, + "space_id": space_id, + "comment_count": comment_count, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) @@ -342,7 +348,9 @@ async def index_confluence_pages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -359,7 +367,7 @@ async def index_confluence_pages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -372,10 +380,10 @@ async def index_confluence_pages( if user_llm: document_metadata = { - "page_title": item['page_title'], - "page_id": item['page_id'], - "space_id": item['space_id'], - "comment_count": item['comment_count'], + "page_title": item["page_title"], + "page_id": item["page_id"], + "space_id": item["space_id"], + "comment_count": item["comment_count"], "document_type": "Confluence Page", "connector_type": "Confluence", } @@ -383,17 +391,15 @@ async def index_confluence_pages( summary_content, summary_embedding, ) = await generate_document_summary( - item['full_content'], user_llm, document_metadata + item["full_content"], user_llm, document_metadata ) else: # Fallback to simple summary if no LLM configured - summary_content = ( - f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n" - ) - if item['page_content']: + summary_content = f"Confluence Page: {item['page_title']}\n\nSpace ID: {item['space_id']}\n\n" + if item["page_content"]: # Take first 1000 characters of content for summary - content_preview = item['page_content'][:1000] - if len(item['page_content']) > 1000: + content_preview = item["page_content"][:1000] + if len(item["page_content"]) > 1000: content_preview += "..." summary_content += f"Content Preview: {content_preview}\n\n" summary_content += f"Comments: {item['comment_count']}" @@ -402,18 +408,18 @@ async def index_confluence_pages( ) # Process chunks - using the full page content with comments - chunks = await create_document_chunks(item['full_content']) + chunks = await create_document_chunks(item["full_content"]) # Update document to READY with actual content - document.title = item['page_title'] + document.title = item["page_title"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "page_id": item['page_id'], - "page_title": item['page_title'], - "space_id": item['space_id'], - "comment_count": item['comment_count'], + "page_id": item["page_id"], + "page_title": item["page_title"], + "space_id": item["space_id"], + "comment_count": item["comment_count"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -440,7 +446,9 @@ async def index_confluence_pages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue # Skip this page and continue with others diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index e5f333531..1595897a0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -352,9 +352,7 @@ async def index_discord_messages( try: channels = await discord_client.get_text_channels(guild_id) if not channels: - logger.info( - f"No channels found in guild {guild_name}. Skipping." - ) + logger.info(f"No channels found in guild {guild_name}. Skipping.") skipped_channels.append(f"{guild_name} (no channels)") else: for channel in channels: @@ -456,25 +454,31 @@ async def index_discord_messages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): - existing_document.status = DocumentStatus.ready() + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): + existing_document.status = ( + DocumentStatus.ready() + ) documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'combined_document_string': combined_document_string, - 'content_hash': content_hash, - 'guild_name': guild_name, - 'guild_id': guild_id, - 'channel_name': channel_name, - 'channel_id': channel_id, - 'message_id': msg_id, - 'message_timestamp': msg_timestamp, - 'message_user_name': msg_user_name, - }) + messages_to_process.append( + { + "document": existing_document, + "is_new": False, + "combined_document_string": combined_document_string, + "content_hash": content_hash, + "guild_name": guild_name, + "guild_id": guild_id, + "channel_name": channel_name, + "channel_id": channel_id, + "message_id": msg_id, + "message_timestamp": msg_timestamp, + "message_user_name": msg_user_name, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -522,19 +526,21 @@ async def index_discord_messages( session.add(document) new_documents_created = True - messages_to_process.append({ - 'document': document, - 'is_new': True, - 'combined_document_string': combined_document_string, - 'content_hash': content_hash, - 'guild_name': guild_name, - 'guild_id': guild_id, - 'channel_name': channel_name, - 'channel_id': channel_id, - 'message_id': msg_id, - 'message_timestamp': msg_timestamp, - 'message_user_name': msg_user_name, - }) + messages_to_process.append( + { + "document": document, + "is_new": True, + "combined_document_string": combined_document_string, + "content_hash": content_hash, + "guild_name": guild_name, + "guild_id": guild_id, + "channel_name": channel_name, + "channel_id": channel_id, + "message_id": msg_id, + "message_timestamp": msg_timestamp, + "message_user_name": msg_user_name, + } + ) except Exception as e: logger.error( @@ -547,7 +553,9 @@ async def index_discord_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -564,31 +572,31 @@ async def index_discord_messages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() await session.commit() # Heavy processing (embeddings, chunks) - chunks = await create_document_chunks(item['combined_document_string']) + chunks = await create_document_chunks(item["combined_document_string"]) doc_embedding = config.embedding_model_instance.embed( - item['combined_document_string'] + item["combined_document_string"] ) # Update document to READY with actual content document.title = f"{item['guild_name']}#{item['channel_name']}" - document.content = item['combined_document_string'] - document.content_hash = item['content_hash'] + document.content = item["combined_document_string"] + document.content_hash = item["content_hash"] document.embedding = doc_embedding document.document_metadata = { - "guild_name": item['guild_name'], - "guild_id": item['guild_id'], - "channel_name": item['channel_name'], - "channel_id": item['channel_id'], - "message_id": item['message_id'], - "message_timestamp": item['message_timestamp'], - "message_user_name": item['message_user_name'], + "guild_name": item["guild_name"], + "guild_id": item["guild_id"], + "channel_name": item["channel_name"], + "channel_id": item["channel_id"], + "message_id": item["message_id"], + "message_timestamp": item["message_timestamp"], + "message_user_name": item["message_user_name"], "indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -612,7 +620,9 @@ async def index_discord_messages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 97cd31a09..212afff39 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -253,7 +253,9 @@ async def index_elasticsearch_documents( # If content is unchanged, skip. Otherwise queue for update. if existing_doc.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_doc.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_doc.status, DocumentStatus.READY + ): existing_doc.status = DocumentStatus.ready() logger.info( f"Skipping ES doc {doc_id} — already indexed (doc id {existing_doc.id})" @@ -262,17 +264,19 @@ async def index_elasticsearch_documents( continue # Queue existing document for update (will be set to processing in Phase 2) - docs_to_process.append({ - 'document': existing_doc, - 'is_new': False, - 'doc_id': doc_id, - 'title': title, - 'content': content, - 'content_hash': content_hash, - 'unique_identifier_hash': unique_identifier_hash, - 'hit': hit, - 'source': source, - }) + docs_to_process.append( + { + "document": existing_doc, + "is_new": False, + "doc_id": doc_id, + "title": title, + "content": content, + "content_hash": content_hash, + "unique_identifier_hash": unique_identifier_hash, + "hit": hit, + "source": source, + } + ) hits_collected += 1 continue @@ -310,17 +314,19 @@ async def index_elasticsearch_documents( session.add(document) new_documents_created = True - docs_to_process.append({ - 'document': document, - 'is_new': True, - 'doc_id': doc_id, - 'title': title, - 'content': content, - 'content_hash': content_hash, - 'unique_identifier_hash': unique_identifier_hash, - 'hit': hit, - 'source': source, - }) + docs_to_process.append( + { + "document": document, + "is_new": True, + "doc_id": doc_id, + "title": title, + "content": content, + "content_hash": content_hash, + "unique_identifier_hash": unique_identifier_hash, + "hit": hit, + "source": source, + } + ) hits_collected += 1 except Exception as e: @@ -330,7 +336,9 @@ async def index_elasticsearch_documents( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([d for d in docs_to_process if d['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([d for d in docs_to_process if d['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -347,7 +355,7 @@ async def index_elasticsearch_documents( await on_heartbeat_callback(documents_processed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -355,9 +363,9 @@ async def index_elasticsearch_documents( # Build metadata metadata = { - "elasticsearch_id": item['doc_id'], - "elasticsearch_index": item['hit'].get("_index", index_name), - "elasticsearch_score": item['hit'].get("_score"), + "elasticsearch_id": item["doc_id"], + "elasticsearch_index": item["hit"].get("_index", index_name), + "elasticsearch_score": item["hit"].get("_score"), "indexed_at": datetime.now().isoformat(), "source": "ELASTICSEARCH_CONNECTOR", "connector_id": connector_id, @@ -366,17 +374,17 @@ async def index_elasticsearch_documents( # Add any additional metadata fields specified in config if "ELASTICSEARCH_METADATA_FIELDS" in config: for field in config["ELASTICSEARCH_METADATA_FIELDS"]: - if field in item['source']: - metadata[f"es_{field}"] = item['source'][field] + if field in item["source"]: + metadata[f"es_{field}"] = item["source"][field] # Create chunks - chunks = await create_document_chunks(item['content']) + chunks = await create_document_chunks(item["content"]) # Update document to READY with actual content - document.title = item['title'] - document.content = item['content'] - document.content_hash = item['content_hash'] - document.unique_identifier_hash = item['unique_identifier_hash'] + document.title = item["title"] + document.content = item["content"] + document.content_hash = item["content_hash"] + document.unique_identifier_hash = item["unique_identifier_hash"] document.document_metadata = metadata safe_set_chunks(document, chunks) document.updated_at = get_current_timestamp() @@ -399,7 +407,9 @@ async def index_elasticsearch_documents( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue @@ -411,10 +421,14 @@ async def index_elasticsearch_documents( ) # Final commit for any remaining documents not yet committed in batches - logger.info(f"Final commit: Total {documents_processed} Elasticsearch documents processed") + logger.info( + f"Final commit: Total {documents_processed} Elasticsearch documents processed" + ) try: await session.commit() - logger.info("Successfully committed all Elasticsearch document changes to database") + logger.info( + "Successfully committed all Elasticsearch document changes to database" + ) except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index b37989a84..e1a1ddd4d 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -17,7 +17,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.config import config -from app.connectors.github_connector import GitHubConnector, RepositoryDigest +from app.connectors.github_connector import GitHubConnector from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService @@ -237,7 +237,9 @@ async def index_github_repos( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.info(f"Repository {repo_full_name} unchanged. Skipping.") documents_skipped += 1 @@ -247,14 +249,16 @@ async def index_github_repos( logger.info( f"Content changed for repository {repo_full_name}. Queuing for update." ) - repos_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'digest': digest, - 'content_hash': content_hash, - 'repo_full_name': repo_full_name, - 'unique_identifier_hash': unique_identifier_hash, - }) + repos_to_process.append( + { + "document": existing_document, + "is_new": False, + "digest": digest, + "content_hash": content_hash, + "repo_full_name": repo_full_name, + "unique_identifier_hash": unique_identifier_hash, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -298,14 +302,16 @@ async def index_github_repos( session.add(document) new_documents_created = True - repos_to_process.append({ - 'document': document, - 'is_new': True, - 'digest': digest, - 'content_hash': content_hash, - 'repo_full_name': repo_full_name, - 'unique_identifier_hash': unique_identifier_hash, - }) + repos_to_process.append( + { + "document": document, + "is_new": True, + "digest": digest, + "content_hash": content_hash, + "repo_full_name": repo_full_name, + "unique_identifier_hash": unique_identifier_hash, + } + ) except Exception as repo_err: logger.error( @@ -317,7 +323,9 @@ async def index_github_repos( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([r for r in repos_to_process if r['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([r for r in repos_to_process if r['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -334,9 +342,9 @@ async def index_github_repos( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] - digest = item['digest'] - repo_full_name = item['repo_full_name'] + document = item["document"] + digest = item["digest"] + repo_full_name = item["repo_full_name"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only @@ -353,7 +361,9 @@ async def index_github_repos( "document_type": "GitHub Repository", "connector_type": "GitHub", "ingestion_method": "gitingest", - "file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree, + "file_tree": digest.tree[:2000] + if len(digest.tree) > 2000 + else digest.tree, "estimated_tokens": digest.estimated_tokens, } @@ -377,13 +387,17 @@ async def index_github_repos( f"## Summary\n{digest.summary}\n\n" f"## File Structure\n{digest.tree[:3000]}" ) - summary_embedding = config.embedding_model_instance.embed(summary_text) + summary_embedding = config.embedding_model_instance.embed( + summary_text + ) # Chunk the full digest content for granular search try: chunks_data = await create_document_chunks(digest.content) except Exception as chunk_err: - logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}") + logger.error( + f"Failed to chunk repository {repo_full_name}: {chunk_err}" + ) chunks_data = await _simple_chunk_content(digest.content) # Update document to READY with actual content @@ -401,7 +415,7 @@ async def index_github_repos( document.title = repo_full_name document.content = summary_text - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = doc_metadata safe_set_chunks(document, chunks_data) @@ -433,7 +447,9 @@ async def index_github_repos( document.status = DocumentStatus.failed(str(repo_err)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) errors.append(f"Failed processing {repo_full_name}: {repo_err}") documents_failed += 1 continue @@ -442,7 +458,9 @@ async def index_github_repos( await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit - logger.info(f"Final commit: Total {documents_processed} GitHub repositories processed") + logger.info( + f"Final commit: Total {documents_processed} GitHub repositories processed" + ) try: await session.commit() logger.info( diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index ad749e61c..822e58d36 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -345,25 +345,29 @@ async def index_google_calendar_events( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - events_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'event_markdown': event_markdown, - 'content_hash': content_hash, - 'event_id': event_id, - 'event_summary': event_summary, - 'calendar_id': calendar_id, - 'start_time': start_time, - 'end_time': end_time, - 'location': location, - 'description': description, - }) + events_to_process.append( + { + "document": existing_document, + "is_new": False, + "event_markdown": event_markdown, + "content_hash": content_hash, + "event_id": event_id, + "event_summary": event_summary, + "calendar_id": calendar_id, + "start_time": start_time, + "end_time": end_time, + "location": location, + "description": description, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -411,19 +415,21 @@ async def index_google_calendar_events( session.add(document) new_documents_created = True - events_to_process.append({ - 'document': document, - 'is_new': True, - 'event_markdown': event_markdown, - 'content_hash': content_hash, - 'event_id': event_id, - 'event_summary': event_summary, - 'calendar_id': calendar_id, - 'start_time': start_time, - 'end_time': end_time, - 'location': location, - 'description': description, - }) + events_to_process.append( + { + "document": document, + "is_new": True, + "event_markdown": event_markdown, + "content_hash": content_hash, + "event_id": event_id, + "event_summary": event_summary, + "calendar_id": calendar_id, + "start_time": start_time, + "end_time": end_time, + "location": location, + "description": description, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) @@ -432,7 +438,9 @@ async def index_google_calendar_events( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -449,7 +457,7 @@ async def index_google_calendar_events( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -462,48 +470,53 @@ async def index_google_calendar_events( if user_llm: document_metadata_for_summary = { - "event_id": item['event_id'], - "event_summary": item['event_summary'], - "calendar_id": item['calendar_id'], - "start_time": item['start_time'], - "end_time": item['end_time'], - "location": item['location'] or "No location", + "event_id": item["event_id"], + "event_summary": item["event_summary"], + "calendar_id": item["calendar_id"], + "start_time": item["start_time"], + "end_time": item["end_time"], + "location": item["location"] or "No location", "document_type": "Google Calendar Event", "connector_type": "Google Calendar", } - summary_content, summary_embedding = await generate_document_summary( - item['event_markdown'], user_llm, document_metadata_for_summary + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item["event_markdown"], user_llm, document_metadata_for_summary ) else: - summary_content = f"Google Calendar Event: {item['event_summary']}\n\n" + summary_content = ( + f"Google Calendar Event: {item['event_summary']}\n\n" + ) summary_content += f"Calendar: {item['calendar_id']}\n" summary_content += f"Start: {item['start_time']}\n" summary_content += f"End: {item['end_time']}\n" - if item['location']: + if item["location"]: summary_content += f"Location: {item['location']}\n" - if item['description']: - desc_preview = item['description'][:1000] - if len(item['description']) > 1000: + if item["description"]: + desc_preview = item["description"][:1000] + if len(item["description"]) > 1000: desc_preview += "..." summary_content += f"Description: {desc_preview}\n" summary_embedding = config.embedding_model_instance.embed( summary_content ) - chunks = await create_document_chunks(item['event_markdown']) + chunks = await create_document_chunks(item["event_markdown"]) # Update document to READY with actual content - document.title = item['event_summary'] + document.title = item["event_summary"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "event_id": item['event_id'], - "event_summary": item['event_summary'], - "calendar_id": item['calendar_id'], - "start_time": item['start_time'], - "end_time": item['end_time'], - "location": item['location'], + "event_id": item["event_id"], + "event_summary": item["event_summary"], + "calendar_id": item["calendar_id"], + "start_time": item["start_time"], + "end_time": item["end_time"], + "location": item["location"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -527,7 +540,9 @@ async def index_google_calendar_events( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 8eae35d00..f7624cffe 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -435,7 +435,7 @@ async def _index_full_scan( on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int]: """Perform full scan indexing of a folder. - + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Collect all files and create pending documents (visible in UI immediately) - Phase 2: Process each file: pending → processing → ready/failed @@ -533,7 +533,9 @@ async def _index_full_scan( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents") + logger.info( + f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents" + ) await session.commit() # ======================================================================= @@ -568,9 +570,7 @@ async def _index_full_scan( if documents_indexed % 10 == 0 and documents_indexed > 0: await session.commit() - logger.info( - f"Committed batch: {documents_indexed} files indexed so far" - ) + logger.info(f"Committed batch: {documents_indexed} files indexed so far") logger.info( f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed" @@ -597,7 +597,7 @@ async def _index_with_delta_sync( Note: include_subfolders is accepted for API consistency but delta sync automatically tracks changes across all folders including subfolders. - + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Collect all changes and create pending documents (visible in UI immediately) - Phase 2: Process each file: pending → processing → ready/failed @@ -676,7 +676,7 @@ async def _index_with_delta_sync( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing pending documents") + logger.info("Phase 1: Committing pending documents") await session.commit() # ======================================================================= @@ -685,7 +685,7 @@ async def _index_with_delta_sync( # ======================================================================= logger.info(f"Phase 2: Processing {len(changes_to_process)} changes") - for change, file, pending_doc in changes_to_process: + for _, file, pending_doc in changes_to_process: # Check if it's time for a heartbeat update if on_heartbeat_callback: current_time = time.time() @@ -728,17 +728,17 @@ async def _create_pending_document_for_file( ) -> tuple[Document | None, bool]: """ Create a pending document for a Google Drive file if it doesn't exist. - + This is Phase 1 of the 2-phase document status update pattern. Creates documents with 'pending' status so they appear in UI immediately. - + Args: session: Database session file: File metadata from Google Drive API connector_id: ID of the Drive connector search_space_id: ID of the search space user_id: ID of the user - + Returns: Tuple of (document, should_skip): - (existing_doc, False): Existing document that needs update @@ -746,28 +746,28 @@ async def _create_pending_document_for_file( - (None, True): File should be skipped (unchanged, rename-only, or folder) """ from app.connectors.google_drive.file_types import should_skip_file - + file_id = file.get("id") file_name = file.get("name", "Unknown") mime_type = file.get("mimeType", "") - + # Skip folders and shortcuts if should_skip_file(mime_type): return None, True - + if not file_id: return None, True - + # Generate unique identifier hash for this file unique_identifier_hash = generate_unique_identifier_hash( DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id ) - + # Check if document exists existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash ) - + if existing_document: # Check if this is a rename-only update (content unchanged) incoming_md5 = file.get("md5Checksum") @@ -775,7 +775,7 @@ async def _create_pending_document_for_file( doc_metadata = existing_document.document_metadata or {} stored_md5 = doc_metadata.get("md5_checksum") stored_modified_time = doc_metadata.get("modified_time") - + # Determine if content changed content_unchanged = False if incoming_md5 and stored_md5: @@ -783,16 +783,18 @@ async def _create_pending_document_for_file( elif not incoming_md5 and incoming_modified_time and stored_modified_time: # Google Workspace file - use modifiedTime as fallback content_unchanged = incoming_modified_time == stored_modified_time - + if content_unchanged: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() return None, True - + # Content changed - return existing document for update return existing_document, False - + # Create new pending document document = Document( search_space_id=search_space_id, @@ -815,7 +817,7 @@ async def _create_pending_document_for_file( connector_id=connector_id, ) session.add(document) - + return document, False @@ -958,7 +960,7 @@ async def _process_single_file( ) -> tuple[int, int, int]: """ Process a single file by downloading and using Surfsense's file processor. - + Implements Phase 2 of the 2-phase document status update pattern. Updates document status: pending → processing → ready/failed @@ -1042,12 +1044,13 @@ async def _process_single_file( processed_doc = await check_document_by_unique_identifier( session, unique_identifier_hash ) - if processed_doc: - # Ensure status is READY - if not DocumentStatus.is_state(processed_doc.status, DocumentStatus.READY): - processed_doc.status = DocumentStatus.ready() - processed_doc.updated_at = get_current_timestamp() - await session.commit() + # Ensure status is READY + if processed_doc and not DocumentStatus.is_state( + processed_doc.status, DocumentStatus.READY + ): + processed_doc.status = DocumentStatus.ready() + processed_doc.updated_at = get_current_timestamp() + await session.commit() logger.info(f"Successfully indexed Google Drive file: {file_name}") return 1, 0, 0 @@ -1061,7 +1064,9 @@ async def _process_single_file( pending_document.updated_at = get_current_timestamp() await session.commit() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) return 0, 0, 1 diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 89e8796d3..c7caee4da 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -228,7 +228,9 @@ async def index_google_gmail_messages( documents_indexed = 0 documents_skipped = 0 documents_failed = 0 # Track messages that failed processing - duplicate_content_count = 0 # Track messages skipped due to duplicate content_hash + duplicate_content_count = ( + 0 # Track messages skipped due to duplicate content_hash + ) # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() @@ -294,23 +296,27 @@ async def index_google_gmail_messages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'message_id': message_id, - 'thread_id': thread_id, - 'subject': subject, - 'sender': sender, - 'date_str': date_str, - }) + messages_to_process.append( + { + "document": existing_document, + "is_new": False, + "markdown_content": markdown_content, + "content_hash": content_hash, + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date_str": date_str, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -356,17 +362,19 @@ async def index_google_gmail_messages( session.add(document) new_documents_created = True - messages_to_process.append({ - 'document': document, - 'is_new': True, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'message_id': message_id, - 'thread_id': thread_id, - 'subject': subject, - 'sender': sender, - 'date_str': date_str, - }) + messages_to_process.append( + { + "document": document, + "is_new": True, + "markdown_content": markdown_content, + "content_hash": content_hash, + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date_str": date_str, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True) @@ -375,7 +383,9 @@ async def index_google_gmail_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -392,7 +402,7 @@ async def index_google_gmail_messages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -405,16 +415,21 @@ async def index_google_gmail_messages( if user_llm: document_metadata_for_summary = { - "message_id": item['message_id'], - "thread_id": item['thread_id'], - "subject": item['subject'], - "sender": item['sender'], - "date": item['date_str'], + "message_id": item["message_id"], + "thread_id": item["thread_id"], + "subject": item["subject"], + "sender": item["sender"], + "date": item["date_str"], "document_type": "Gmail Message", "connector_type": "Google Gmail", } - summary_content, summary_embedding = await generate_document_summary( - item['markdown_content'], user_llm, document_metadata_for_summary + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item["markdown_content"], + user_llm, + document_metadata_for_summary, ) else: summary_content = f"Google Gmail Message: {item['subject']}\n\n" @@ -424,19 +439,19 @@ async def index_google_gmail_messages( summary_content ) - chunks = await create_document_chunks(item['markdown_content']) + chunks = await create_document_chunks(item["markdown_content"]) # Update document to READY with actual content - document.title = item['subject'] + document.title = item["subject"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "message_id": item['message_id'], - "thread_id": item['thread_id'], - "subject": item['subject'], - "sender": item['sender'], - "date": item['date_str'], + "message_id": item["message_id"], + "thread_id": item["thread_id"], + "subject": item["subject"], + "sender": item["sender"], + "date": item["date_str"], "connector_id": connector_id, } safe_set_chunks(document, chunks) @@ -459,7 +474,9 @@ async def index_google_gmail_messages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 038df0f46..65f56ce46 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -239,23 +239,27 @@ async def index_jira_issues( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - issues_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'issue_content': issue_content, - 'content_hash': content_hash, - 'issue_id': issue_id, - 'issue_identifier': issue_identifier, - 'issue_title': issue_title, - 'formatted_issue': formatted_issue, - 'comment_count': comment_count, - }) + issues_to_process.append( + { + "document": existing_document, + "is_new": False, + "issue_content": issue_content, + "content_hash": content_hash, + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "formatted_issue": formatted_issue, + "comment_count": comment_count, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -301,17 +305,19 @@ async def index_jira_issues( session.add(document) new_documents_created = True - issues_to_process.append({ - 'document': document, - 'is_new': True, - 'issue_content': issue_content, - 'content_hash': content_hash, - 'issue_id': issue_id, - 'issue_identifier': issue_identifier, - 'issue_title': issue_title, - 'formatted_issue': formatted_issue, - 'comment_count': comment_count, - }) + issues_to_process.append( + { + "document": document, + "is_new": True, + "issue_content": issue_content, + "content_hash": content_hash, + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "formatted_issue": formatted_issue, + "comment_count": comment_count, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True) @@ -320,7 +326,9 @@ async def index_jira_issues( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -337,7 +345,7 @@ async def index_jira_issues( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -350,11 +358,11 @@ async def index_jira_issues( if user_llm: document_metadata = { - "issue_key": item['issue_identifier'], - "issue_title": item['issue_title'], - "status": item['formatted_issue'].get("status", "Unknown"), - "priority": item['formatted_issue'].get("priority", "Unknown"), - "comment_count": item['comment_count'], + "issue_key": item["issue_identifier"], + "issue_title": item["issue_title"], + "status": item["formatted_issue"].get("status", "Unknown"), + "priority": item["formatted_issue"].get("priority", "Unknown"), + "comment_count": item["comment_count"], "document_type": "Jira Issue", "connector_type": "Jira", } @@ -362,34 +370,32 @@ async def index_jira_issues( summary_content, summary_embedding, ) = await generate_document_summary( - item['issue_content'], user_llm, document_metadata + item["issue_content"], user_llm, document_metadata ) else: # Fallback to simple summary if no LLM configured summary_content = f"Jira Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['formatted_issue'].get('status', 'Unknown')}\n\n" - if item['formatted_issue'].get("description"): - summary_content += ( - f"Description: {item['formatted_issue'].get('description')}\n\n" - ) + if item["formatted_issue"].get("description"): + summary_content += f"Description: {item['formatted_issue'].get('description')}\n\n" summary_content += f"Comments: {item['comment_count']}" summary_embedding = config.embedding_model_instance.embed( summary_content ) # Process chunks - using the full issue content with comments - chunks = await create_document_chunks(item['issue_content']) + chunks = await create_document_chunks(item["issue_content"]) # Update document to READY with actual content document.title = f"{item['issue_identifier']}: {item['issue_title']}" document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "issue_id": item['issue_id'], - "issue_identifier": item['issue_identifier'], - "issue_title": item['issue_title'], - "state": item['formatted_issue'].get("status", "Unknown"), - "comment_count": item['comment_count'], + "issue_id": item["issue_id"], + "issue_identifier": item["issue_identifier"], + "issue_title": item["issue_title"], + "state": item["formatted_issue"].get("status", "Unknown"), + "comment_count": item["comment_count"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -416,7 +422,9 @@ async def index_jira_issues( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue # Skip this issue and continue with others diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 45e1e357a..87bafb3c0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -272,7 +272,9 @@ async def index_linear_issues( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.info( f"Document for Linear issue {issue_identifier} unchanged. Skipping." @@ -281,19 +283,21 @@ async def index_linear_issues( continue # Queue existing document for update (will be set to processing in Phase 2) - issues_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'issue_content': issue_content, - 'content_hash': content_hash, - 'issue_id': issue_id, - 'issue_identifier': issue_identifier, - 'issue_title': issue_title, - 'state': state, - 'description': description, - 'comment_count': comment_count, - 'priority': priority, - }) + issues_to_process.append( + { + "document": existing_document, + "is_new": False, + "issue_content": issue_content, + "content_hash": content_hash, + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": state, + "description": description, + "comment_count": comment_count, + "priority": priority, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -338,19 +342,21 @@ async def index_linear_issues( session.add(document) new_documents_created = True - issues_to_process.append({ - 'document': document, - 'is_new': True, - 'issue_content': issue_content, - 'content_hash': content_hash, - 'issue_id': issue_id, - 'issue_identifier': issue_identifier, - 'issue_title': issue_title, - 'state': state, - 'description': description, - 'comment_count': comment_count, - 'priority': priority, - }) + issues_to_process.append( + { + "document": document, + "is_new": True, + "issue_content": issue_content, + "content_hash": content_hash, + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": state, + "description": description, + "comment_count": comment_count, + "priority": priority, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for issue: {e!s}", exc_info=True) @@ -359,7 +365,9 @@ async def index_linear_issues( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([i for i in issues_to_process if i['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -376,7 +384,7 @@ async def index_linear_issues( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -389,20 +397,23 @@ async def index_linear_issues( if user_llm: document_metadata_for_summary = { - "issue_id": item['issue_identifier'], - "issue_title": item['issue_title'], - "state": item['state'], - "priority": item['priority'], - "comment_count": item['comment_count'], + "issue_id": item["issue_identifier"], + "issue_title": item["issue_title"], + "state": item["state"], + "priority": item["priority"], + "comment_count": item["comment_count"], "document_type": "Linear Issue", "connector_type": "Linear", } - summary_content, summary_embedding = await generate_document_summary( - item['issue_content'], user_llm, document_metadata_for_summary + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item["issue_content"], user_llm, document_metadata_for_summary ) else: # Fallback to simple summary if no LLM configured - description = item['description'] + description = item["description"] if description and len(description) > 1000: description = description[:997] + "..." summary_content = f"Linear Issue {item['issue_identifier']}: {item['issue_title']}\n\nStatus: {item['state']}\n\n" @@ -413,19 +424,19 @@ async def index_linear_issues( summary_content ) - chunks = await create_document_chunks(item['issue_content']) + chunks = await create_document_chunks(item["issue_content"]) # Update document to READY with actual content document.title = f"{item['issue_identifier']}: {item['issue_title']}" document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "issue_id": item['issue_id'], - "issue_identifier": item['issue_identifier'], - "issue_title": item['issue_title'], - "state": item['state'], - "comment_count": item['comment_count'], + "issue_id": item["issue_id"], + "issue_identifier": item["issue_identifier"], + "issue_title": item["issue_title"], + "state": item["state"], + "comment_count": item["comment_count"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -452,7 +463,9 @@ async def index_linear_issues( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) skipped_issues.append( f"{item.get('issue_identifier', 'Unknown')} (processing error)" ) @@ -466,7 +479,9 @@ async def index_linear_issues( logger.info(f"Final commit: Total {documents_indexed} Linear issues processed") try: await session.commit() - logger.info("Successfully committed all Linear document changes to database") + logger.info( + "Successfully committed all Linear document changes to database" + ) except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 80d4ef3cf..04af80e53 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -305,7 +305,9 @@ async def index_luma_events( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.info( f"Document for Luma event {event_name} unchanged. Skipping." @@ -314,23 +316,25 @@ async def index_luma_events( continue # Queue existing document for update (will be set to processing in Phase 2) - events_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'event_id': event_id, - 'event_name': event_name, - 'event_url': event_url, - 'event_markdown': event_markdown, - 'content_hash': content_hash, - 'start_at': start_at, - 'end_at': end_at, - 'timezone': timezone, - 'location': location, - 'city': city, - 'host_names': host_names, - 'description': description, - 'cover_url': cover_url, - }) + events_to_process.append( + { + "document": existing_document, + "is_new": False, + "event_id": event_id, + "event_name": event_name, + "event_url": event_url, + "event_markdown": event_markdown, + "content_hash": content_hash, + "start_at": start_at, + "end_at": end_at, + "timezone": timezone, + "location": location, + "city": city, + "host_names": host_names, + "description": description, + "cover_url": cover_url, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -380,23 +384,25 @@ async def index_luma_events( session.add(document) new_documents_created = True - events_to_process.append({ - 'document': document, - 'is_new': True, - 'event_id': event_id, - 'event_name': event_name, - 'event_url': event_url, - 'event_markdown': event_markdown, - 'content_hash': content_hash, - 'start_at': start_at, - 'end_at': end_at, - 'timezone': timezone, - 'location': location, - 'city': city, - 'host_names': host_names, - 'description': description, - 'cover_url': cover_url, - }) + events_to_process.append( + { + "document": document, + "is_new": True, + "event_id": event_id, + "event_name": event_name, + "event_url": event_url, + "event_markdown": event_markdown, + "content_hash": content_hash, + "start_at": start_at, + "end_at": end_at, + "timezone": timezone, + "location": location, + "city": city, + "host_names": host_names, + "description": description, + "cover_url": cover_url, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) @@ -405,7 +411,9 @@ async def index_luma_events( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -422,7 +430,7 @@ async def index_luma_events( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -435,15 +443,15 @@ async def index_luma_events( if user_llm: document_metadata_for_summary = { - "event_id": item['event_id'], - "event_name": item['event_name'], - "event_url": item['event_url'], - "start_at": item['start_at'], - "end_at": item['end_at'], - "timezone": item['timezone'], - "location": item['location'] or "No location", - "city": item['city'], - "hosts": item['host_names'], + "event_id": item["event_id"], + "event_name": item["event_name"], + "event_url": item["event_url"], + "start_at": item["start_at"], + "end_at": item["end_at"], + "timezone": item["timezone"], + "location": item["location"] or "No location", + "city": item["city"], + "hosts": item["host_names"], "document_type": "Luma Event", "connector_type": "Luma", } @@ -451,26 +459,26 @@ async def index_luma_events( summary_content, summary_embedding, ) = await generate_document_summary( - item['event_markdown'], user_llm, document_metadata_for_summary + item["event_markdown"], user_llm, document_metadata_for_summary ) else: # Fallback to simple summary if no LLM configured summary_content = f"Luma Event: {item['event_name']}\n\n" - if item['event_url']: + if item["event_url"]: summary_content += f"URL: {item['event_url']}\n" summary_content += f"Start: {item['start_at']}\n" summary_content += f"End: {item['end_at']}\n" - if item['timezone']: + if item["timezone"]: summary_content += f"Timezone: {item['timezone']}\n" - if item['location']: + if item["location"]: summary_content += f"Location: {item['location']}\n" - if item['city']: + if item["city"]: summary_content += f"City: {item['city']}\n" - if item['host_names']: + if item["host_names"]: summary_content += f"Hosts: {item['host_names']}\n" - if item['description']: - desc_preview = item['description'][:1000] - if len(item['description']) > 1000: + if item["description"]: + desc_preview = item["description"][:1000] + if len(item["description"]) > 1000: desc_preview += "..." summary_content += f"Description: {desc_preview}\n" @@ -478,24 +486,24 @@ async def index_luma_events( summary_content ) - chunks = await create_document_chunks(item['event_markdown']) + chunks = await create_document_chunks(item["event_markdown"]) # Update document to READY with actual content - document.title = item['event_name'] + document.title = item["event_name"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "event_id": item['event_id'], - "event_name": item['event_name'], - "event_url": item['event_url'], - "start_at": item['start_at'], - "end_at": item['end_at'], - "timezone": item['timezone'], - "location": item['location'], - "city": item['city'], - "hosts": item['host_names'], - "cover_url": item['cover_url'], + "event_id": item["event_id"], + "event_name": item["event_name"], + "event_url": item["event_url"], + "start_at": item["start_at"], + "end_at": item["end_at"], + "timezone": item["timezone"], + "location": item["location"], + "city": item["city"], + "hosts": item["host_names"], + "cover_url": item["cover_url"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -522,7 +530,9 @@ async def index_luma_events( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) skipped_events.append( f"{item.get('event_name', 'Unknown')} (processing error)" ) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 37927b779..52704e173 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -354,20 +354,24 @@ async def index_notion_pages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - pages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'page_id': page_id, - 'page_title': page_title, - }) + pages_to_process.append( + { + "document": existing_document, + "is_new": False, + "markdown_content": markdown_content, + "content_hash": content_hash, + "page_id": page_id, + "page_title": page_title, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -410,14 +414,16 @@ async def index_notion_pages( session.add(document) new_documents_created = True - pages_to_process.append({ - 'document': document, - 'is_new': True, - 'markdown_content': markdown_content, - 'content_hash': content_hash, - 'page_id': page_id, - 'page_title': page_title, - }) + pages_to_process.append( + { + "document": document, + "is_new": True, + "markdown_content": markdown_content, + "content_hash": content_hash, + "page_id": page_id, + "page_title": page_title, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for page: {e!s}", exc_info=True) @@ -426,7 +432,9 @@ async def index_notion_pages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([p for p in pages_to_process if p['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -443,7 +451,7 @@ async def index_notion_pages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() @@ -456,13 +464,18 @@ async def index_notion_pages( if user_llm: document_metadata_for_summary = { - "page_title": item['page_title'], - "page_id": item['page_id'], + "page_title": item["page_title"], + "page_id": item["page_id"], "document_type": "Notion Page", "connector_type": "Notion", } - summary_content, summary_embedding = await generate_document_summary( - item['markdown_content'], user_llm, document_metadata_for_summary + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item["markdown_content"], + user_llm, + document_metadata_for_summary, ) else: # Fallback to simple summary if no LLM configured @@ -471,16 +484,16 @@ async def index_notion_pages( summary_content ) - chunks = await create_document_chunks(item['markdown_content']) + chunks = await create_document_chunks(item["markdown_content"]) # Update document to READY with actual content - document.title = item['page_title'] + document.title = item["page_title"] document.content = summary_content - document.content_hash = item['content_hash'] + document.content_hash = item["content_hash"] document.embedding = summary_embedding document.document_metadata = { - "page_title": item['page_title'], - "page_id": item['page_id'], + "page_title": item["page_title"], + "page_id": item["page_id"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -504,7 +517,9 @@ async def index_notion_pages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) skipped_pages.append(f"{item['page_title']} (processing error)") documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index 0e6934e2c..6dea1a730 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -382,27 +382,31 @@ async def index_obsidian_vault( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.debug(f"Note {title} unchanged, skipping") skipped_count += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - files_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'file_info': file_info, - 'content': content, - 'body_content': body_content, - 'frontmatter': frontmatter, - 'wiki_links': wiki_links, - 'tags': tags, - 'title': title, - 'relative_path': relative_path, - 'content_hash': content_hash, - 'unique_identifier_hash': unique_identifier_hash, - }) + files_to_process.append( + { + "document": existing_document, + "is_new": False, + "file_info": file_info, + "content": content, + "body_content": body_content, + "frontmatter": frontmatter, + "wiki_links": wiki_links, + "tags": tags, + "title": title, + "relative_path": relative_path, + "content_hash": content_hash, + "unique_identifier_hash": unique_identifier_hash, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -445,20 +449,22 @@ async def index_obsidian_vault( session.add(document) new_documents_created = True - files_to_process.append({ - 'document': document, - 'is_new': True, - 'file_info': file_info, - 'content': content, - 'body_content': body_content, - 'frontmatter': frontmatter, - 'wiki_links': wiki_links, - 'tags': tags, - 'title': title, - 'relative_path': relative_path, - 'content_hash': content_hash, - 'unique_identifier_hash': unique_identifier_hash, - }) + files_to_process.append( + { + "document": document, + "is_new": True, + "file_info": file_info, + "content": content, + "body_content": body_content, + "frontmatter": frontmatter, + "wiki_links": wiki_links, + "tags": tags, + "title": title, + "relative_path": relative_path, + "content_hash": content_hash, + "unique_identifier_hash": unique_identifier_hash, + } + ) except Exception as e: logger.exception( @@ -469,7 +475,9 @@ async def index_obsidian_vault( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -491,22 +499,22 @@ async def index_obsidian_vault( await on_heartbeat_callback(indexed_count) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() await session.commit() # Extract data from item - title = item['title'] - relative_path = item['relative_path'] - content = item['content'] - body_content = item['body_content'] - frontmatter = item['frontmatter'] - wiki_links = item['wiki_links'] - tags = item['tags'] - content_hash = item['content_hash'] - file_info = item['file_info'] + title = item["title"] + relative_path = item["relative_path"] + content = item["content"] + body_content = item["body_content"] + frontmatter = item["frontmatter"] + wiki_links = item["wiki_links"] + tags = item["tags"] + content_hash = item["content_hash"] + file_info = item["file_info"] # Build metadata document_metadata = { @@ -584,7 +592,9 @@ async def index_obsidian_vault( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) failed_count += 1 continue @@ -592,9 +602,7 @@ async def index_obsidian_vault( await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches - logger.info( - f"Final commit: Total {indexed_count} Obsidian notes processed" - ) + logger.info(f"Final commit: Total {indexed_count} Obsidian notes processed") try: await session.commit() logger.info( diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 61faa39b3..111552fa6 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -314,7 +314,9 @@ async def index_slack_messages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() logger.info( f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping." @@ -323,18 +325,20 @@ async def index_slack_messages( continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'combined_document_string': combined_document_string, - 'content_hash': content_hash, - 'channel_name': channel_name, - 'channel_id': channel_id, - 'msg_ts': msg_ts, - 'start_date': start_date_str, - 'end_date': end_date_str, - 'message_count': len(formatted_messages), - }) + messages_to_process.append( + { + "document": existing_document, + "is_new": False, + "combined_document_string": combined_document_string, + "content_hash": content_hash, + "channel_name": channel_name, + "channel_id": channel_id, + "msg_ts": msg_ts, + "start_date": start_date_str, + "end_date": end_date_str, + "message_count": len(formatted_messages), + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -377,18 +381,20 @@ async def index_slack_messages( session.add(document) new_documents_created = True - messages_to_process.append({ - 'document': document, - 'is_new': True, - 'combined_document_string': combined_document_string, - 'content_hash': content_hash, - 'channel_name': channel_name, - 'channel_id': channel_id, - 'msg_ts': msg_ts, - 'start_date': start_date_str, - 'end_date': end_date_str, - 'message_count': len(formatted_messages), - }) + messages_to_process.append( + { + "document": document, + "is_new": True, + "combined_document_string": combined_document_string, + "content_hash": content_hash, + "channel_name": channel_name, + "channel_id": channel_id, + "msg_ts": msg_ts, + "start_date": start_date_str, + "end_date": end_date_str, + "message_count": len(formatted_messages), + } + ) logger.info( f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}" @@ -409,7 +415,9 @@ async def index_slack_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -426,29 +434,29 @@ async def index_slack_messages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() await session.commit() # Heavy processing (embeddings, chunks) - chunks = await create_document_chunks(item['combined_document_string']) + chunks = await create_document_chunks(item["combined_document_string"]) doc_embedding = config.embedding_model_instance.embed( - item['combined_document_string'] + item["combined_document_string"] ) # Update document to READY with actual content - document.title = item['channel_name'] - document.content = item['combined_document_string'] - document.content_hash = item['content_hash'] + document.title = item["channel_name"] + document.content = item["combined_document_string"] + document.content_hash = item["content_hash"] document.embedding = doc_embedding document.document_metadata = { - "channel_name": item['channel_name'], - "channel_id": item['channel_id'], - "start_date": item['start_date'], - "end_date": item['end_date'], - "message_count": item['message_count'], + "channel_name": item["channel_name"], + "channel_id": item["channel_id"], + "start_date": item["start_date"], + "end_date": item["end_date"], + "message_count": item["message_count"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -475,7 +483,9 @@ async def index_slack_messages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 27259fd6f..1b13a2c37 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -332,25 +332,31 @@ async def index_teams_messages( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): - existing_document.status = DocumentStatus.ready() + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): + existing_document.status = ( + DocumentStatus.ready() + ) documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'combined_document_string': combined_document_string, - 'content_hash': content_hash, - 'team_name': team_name, - 'team_id': team_id, - 'channel_name': channel_name, - 'channel_id': channel_id, - 'message_id': message_id, - 'start_date': start_date_str, - 'end_date': end_date_str, - }) + messages_to_process.append( + { + "document": existing_document, + "is_new": False, + "combined_document_string": combined_document_string, + "content_hash": content_hash, + "team_name": team_name, + "team_id": team_id, + "channel_name": channel_name, + "channel_id": channel_id, + "message_id": message_id, + "start_date": start_date_str, + "end_date": end_date_str, + } + ) continue # Document doesn't exist by unique_identifier_hash @@ -400,19 +406,21 @@ async def index_teams_messages( session.add(document) new_documents_created = True - messages_to_process.append({ - 'document': document, - 'is_new': True, - 'combined_document_string': combined_document_string, - 'content_hash': content_hash, - 'team_name': team_name, - 'team_id': team_id, - 'channel_name': channel_name, - 'channel_id': channel_id, - 'message_id': message_id, - 'start_date': start_date_str, - 'end_date': end_date_str, - }) + messages_to_process.append( + { + "document": document, + "is_new": True, + "combined_document_string": combined_document_string, + "content_hash": content_hash, + "team_name": team_name, + "team_id": team_id, + "channel_name": channel_name, + "channel_id": channel_id, + "message_id": message_id, + "start_date": start_date_str, + "end_date": end_date_str, + } + ) except Exception as e: logger.error( @@ -432,7 +440,9 @@ async def index_teams_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -449,30 +459,30 @@ async def index_teams_messages( await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item['document'] + document = item["document"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only document.status = DocumentStatus.processing() await session.commit() # Heavy processing (embeddings, chunks) - chunks = await create_document_chunks(item['combined_document_string']) + chunks = await create_document_chunks(item["combined_document_string"]) doc_embedding = config.embedding_model_instance.embed( - item['combined_document_string'] + item["combined_document_string"] ) # Update document to READY with actual content document.title = f"{item['team_name']} - {item['channel_name']}" - document.content = item['combined_document_string'] - document.content_hash = item['content_hash'] + document.content = item["combined_document_string"] + document.content_hash = item["content_hash"] document.embedding = doc_embedding document.document_metadata = { - "team_name": item['team_name'], - "team_id": item['team_id'], - "channel_name": item['channel_name'], - "channel_id": item['channel_id'], - "start_date": item['start_date'], - "end_date": item['end_date'], + "team_name": item["team_name"], + "team_id": item["team_id"], + "channel_name": item["channel_name"], + "channel_id": item["channel_id"], + "start_date": item["start_date"], + "end_date": item["end_date"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -497,7 +507,9 @@ async def index_teams_messages( document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue @@ -510,9 +522,7 @@ async def index_teams_messages( ) try: await session.commit() - logger.info( - "Successfully committed all Teams document changes to database" - ) + logger.info("Successfully committed all Teams document changes to database") except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index 5d25b4623..5b3fa02b0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -184,22 +184,28 @@ async def index_crawled_urls( if existing_document: # Document exists - check if it's already being processed - if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING): + if DocumentStatus.is_state( + existing_document.status, DocumentStatus.PENDING + ): logger.info(f"URL {url} already pending. Skipping.") documents_skipped += 1 continue - if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING): + if DocumentStatus.is_state( + existing_document.status, DocumentStatus.PROCESSING + ): logger.info(f"URL {url} already processing. Skipping.") documents_skipped += 1 continue # Queue existing document for potential update check - urls_to_process.append({ - 'document': existing_document, - 'is_new': False, - 'url': url, - 'unique_identifier_hash': unique_identifier_hash, - }) + urls_to_process.append( + { + "document": existing_document, + "is_new": False, + "url": url, + "unique_identifier_hash": unique_identifier_hash, + } + ) continue # Create new document with PENDING status (visible in UI immediately) @@ -224,12 +230,14 @@ async def index_crawled_urls( session.add(document) new_documents_created = True - urls_to_process.append({ - 'document': document, - 'is_new': True, - 'url': url, - 'unique_identifier_hash': unique_identifier_hash, - }) + urls_to_process.append( + { + "document": document, + "is_new": True, + "url": url, + "unique_identifier_hash": unique_identifier_hash, + } + ) except Exception as e: logger.error(f"Error in Phase 1 for URL {url}: {e!s}", exc_info=True) @@ -238,7 +246,9 @@ async def index_crawled_urls( # Commit all pending documents - they all appear in UI now if new_documents_created: - logger.info(f"Phase 1: Committing {len([u for u in urls_to_process if u['is_new']])} pending documents") + logger.info( + f"Phase 1: Committing {len([u for u in urls_to_process if u['is_new']])} pending documents" + ) await session.commit() # ======================================================================= @@ -255,9 +265,9 @@ async def index_crawled_urls( await on_heartbeat_callback(documents_indexed + documents_updated) last_heartbeat_time = current_time - document = item['document'] - url = item['url'] - is_new = item['is_new'] + document = item["document"] + url = item["url"] + is_new = item["is_new"] try: # Set to PROCESSING and commit - shows "processing" in UI for THIS document only @@ -298,7 +308,9 @@ async def index_crawled_urls( continue # Format content as structured document for summary generation - structured_document = crawler.format_to_structured_document(crawl_result) + structured_document = crawler.format_to_structured_document( + crawl_result + ) # Generate content hash using a version WITHOUT metadata structured_document_for_hash = crawler.format_to_structured_document( @@ -339,7 +351,9 @@ async def index_crawled_urls( f"(existing document ID: {duplicate_by_content.id}). " f"Marking as failed." ) - document.status = DocumentStatus.failed("Duplicate content exists") + document.status = DocumentStatus.failed( + "Duplicate content exists" + ) document.updated_at = get_current_timestamp() await session.commit() duplicate_content_count += 1 @@ -360,7 +374,10 @@ async def index_crawled_urls( "document_type": "Crawled URL", "crawler_type": crawler_type, } - summary_content, summary_embedding = await generate_document_summary( + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( structured_document, user_llm, document_metadata_for_summary ) else: @@ -423,7 +440,9 @@ async def index_crawled_urls( document.updated_at = get_current_timestamp() await session.commit() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) documents_failed += 1 continue @@ -438,7 +457,9 @@ async def index_crawled_urls( ) try: await session.commit() - logger.info("Successfully committed all webcrawler document changes to database") + logger.info( + "Successfully committed all webcrawler document changes to database" + ) except Exception as e: # Handle any remaining integrity errors gracefully if "duplicate key value violates unique constraint" in str(e).lower(): diff --git a/surfsense_backend/app/tasks/document_processors/base.py b/surfsense_backend/app/tasks/document_processors/base.py index c8046868c..2047ec63d 100644 --- a/surfsense_backend/app/tasks/document_processors/base.py +++ b/surfsense_backend/app/tasks/document_processors/base.py @@ -17,29 +17,30 @@ md = MarkdownifyTransformer() def safe_set_chunks(document: Document, chunks: list) -> None: """ Safely assign chunks to a document without triggering lazy loading. - + ALWAYS use this instead of `document.chunks = chunks` to avoid SQLAlchemy async errors (MissingGreenlet / greenlet_spawn). - + Why this is needed: - Direct assignment `document.chunks = chunks` triggers SQLAlchemy to load the OLD chunks first (for comparison/orphan detection) - This lazy loading fails in async context with asyncpg driver - set_committed_value bypasses this by setting the value directly - + This function is safe regardless of how the document was loaded (with or without selectinload). - + Args: document: The Document object to update chunks: List of Chunk objects to assign - + Example: # Instead of: document.chunks = chunks (DANGEROUS!) safe_set_chunks(document, chunks) # Always safe """ from sqlalchemy.orm.attributes import set_committed_value - set_committed_value(document, 'chunks', chunks) + + set_committed_value(document, "chunks", chunks) def get_current_timestamp() -> datetime: diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index e9c395c83..a513bcaf0 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -91,7 +91,9 @@ async def add_circleback_meeting_document( # Document exists - check if content has changed if existing_document.content_hash == content_hash: # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + if not DocumentStatus.is_state( + existing_document.status, DocumentStatus.READY + ): existing_document.status = DocumentStatus.ready() await session.commit() logger.info(f"Circleback meeting {meeting_id} unchanged. Skipping.") @@ -110,7 +112,7 @@ async def add_circleback_meeting_document( # PHASE 1: Create document with PENDING status # This makes the document visible in the UI immediately # ======================================================================= - + # Fetch the user who set up the Circleback connector (preferred) # or fall back to search space owner if no connector found created_by_user_id = None @@ -173,7 +175,7 @@ async def add_circleback_meeting_document( # ======================================================================= # PHASE 3: Process the document content # ======================================================================= - + # Get LLM for generating summary llm = await get_document_summary_llm(session, search_space_id) if not llm: @@ -243,7 +245,7 @@ async def add_circleback_meeting_document( await session.commit() await session.refresh(document) - + if existing_document: logger.info( f"Updated Circleback meeting document {meeting_id} in search space {search_space_id}" @@ -267,7 +269,9 @@ async def add_circleback_meeting_document( document.updated_at = get_current_timestamp() await session.commit() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) raise db_error except Exception as e: await session.rollback() @@ -279,5 +283,7 @@ async def add_circleback_meeting_document( document.updated_at = get_current_timestamp() await session.commit() except Exception as status_error: - logger.error(f"Failed to update document status to failed: {status_error}") + logger.error( + f"Failed to update document status to failed: {status_error}" + ) raise RuntimeError(f"Failed to process Circleback meeting: {e!s}") from e diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index e14dc3f42..3fa57e998 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -1629,16 +1629,16 @@ async def process_file_in_background_with_document( ) -> Document | None: """ Process file and update existing pending document (2-phase pattern). - + This function is Phase 2 of the real-time document status updates: - Phase 1 (API): Created document with pending status - Phase 2 (this): Process file and update document to ready/failed - + The document already exists with pending status. This function: 1. Parses the file content (markdown, audio, or ETL services) 2. Updates the document with content, embeddings, and chunks 3. Sets status to 'ready' on success - + Args: document: Existing document with pending status file_path: Path to the uploaded file @@ -1650,7 +1650,7 @@ async def process_file_in_background_with_document( log_entry: Log entry for this task connector: Optional connector info for Google Drive files notification: Optional notification for progress updates - + Returns: Updated Document object if successful, None if duplicate content detected """ @@ -1665,13 +1665,18 @@ async def process_file_in_background_with_document( etl_service = None # ===== STEP 1: Parse file content based on type ===== - + # Check if the file is a markdown or text file if filename.lower().endswith((".md", ".markdown", ".txt")): # Update notification: parsing stage if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Reading file" + await ( + NotificationService.document_processing.notify_processing_progress( + session, + notification, + stage="parsing", + stage_message="Reading file", + ) ) await task_logger.log_task_progress( @@ -1695,8 +1700,13 @@ async def process_file_in_background_with_document( ): # Update notification: parsing stage (transcription) if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Transcribing audio" + await ( + NotificationService.document_processing.notify_processing_progress( + session, + notification, + stage="parsing", + stage_message="Transcribing audio", + ) ) await task_logger.log_task_progress( @@ -1708,7 +1718,8 @@ async def process_file_in_background_with_document( # Transcribe audio stt_service_type = ( "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") + if app_config.STT_SERVICE + and app_config.STT_SERVICE.startswith("local/") else "external" ) @@ -1719,7 +1730,9 @@ async def process_file_in_background_with_document( transcribed_text = result.get("text", "") if not transcribed_text: raise ValueError("Transcription returned empty text") - markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}" + markdown_content = ( + f"# Transcription of {filename}\n\n{transcribed_text}" + ) else: with open(file_path, "rb") as audio_file: transcription_kwargs = { @@ -1728,12 +1741,18 @@ async def process_file_in_background_with_document( "api_key": app_config.STT_SERVICE_API_KEY, } if app_config.STT_SERVICE_API_BASE: - transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - transcription_response = await atranscription(**transcription_kwargs) + transcription_kwargs["api_base"] = ( + app_config.STT_SERVICE_API_BASE + ) + transcription_response = await atranscription( + **transcription_kwargs + ) transcribed_text = transcription_response.get("text", "") if not transcribed_text: raise ValueError("Transcription returned empty text") - markdown_content = f"# Transcription of {filename}\n\n{transcribed_text}" + markdown_content = ( + f"# Transcription of {filename}\n\n{transcribed_text}" + ) etl_service = "AUDIO_TRANSCRIPTION" # Clean up temp file @@ -1742,13 +1761,18 @@ async def process_file_in_background_with_document( else: # Document files - use ETL service - from app.services.page_limit_service import PageLimitExceededError, PageLimitService + from app.services.page_limit_service import ( + PageLimitExceededError, + PageLimitService, + ) page_limit_service = PageLimitService(session) # Estimate page count try: - estimated_pages = page_limit_service.estimate_pages_before_processing(file_path) + estimated_pages = page_limit_service.estimate_pages_before_processing( + file_path + ) except Exception: file_size = os.path.getsize(file_path) estimated_pages = max(1, file_size // (80 * 1024)) @@ -1759,14 +1783,22 @@ async def process_file_in_background_with_document( if app_config.ETL_SERVICE == "UNSTRUCTURED": if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Extracting content" + session, + notification, + stage="parsing", + stage_message="Extracting content", ) from langchain_unstructured import UnstructuredLoader loader = UnstructuredLoader( - file_path, mode="elements", post_processors=[], languages=["eng"], - include_orig_elements=False, include_metadata=False, strategy="auto" + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", ) docs = await loader.aload() markdown_content = await convert_document_to_markdown(docs) @@ -1775,37 +1807,55 @@ async def process_file_in_background_with_document( etl_service = "UNSTRUCTURED" # Update page usage - await page_limit_service.update_page_usage(user_id, final_page_count, allow_exceed=True) + await page_limit_service.update_page_usage( + user_id, final_page_count, allow_exceed=True + ) elif app_config.ETL_SERVICE == "LLAMACLOUD": if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Extracting content" + session, + notification, + stage="parsing", + stage_message="Extracting content", ) result = await parse_with_llamacloud_retry( - file_path=file_path, estimated_pages=estimated_pages, - task_logger=task_logger, log_entry=log_entry + file_path=file_path, + estimated_pages=estimated_pages, + task_logger=task_logger, + log_entry=log_entry, + ) + markdown_documents = await result.aget_markdown_documents( + split_by_page=False ) - markdown_documents = await result.aget_markdown_documents(split_by_page=False) if not markdown_documents: - raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}") + raise RuntimeError( + f"LlamaCloud parsing returned no documents: {filename}" + ) markdown_content = markdown_documents[0].text etl_service = "LLAMACLOUD" # Update page usage - await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True) + await page_limit_service.update_page_usage( + user_id, estimated_pages, allow_exceed=True + ) elif app_config.ETL_SERVICE == "DOCLING": if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Extracting content" + session, + notification, + stage="parsing", + stage_message="Extracting content", ) # Suppress logging during Docling import getLogger("docling.pipeline.base_pipeline").setLevel(ERROR) getLogger("docling.document_converter").setLevel(ERROR) - getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel(ERROR) + getLogger( + "docling_core.transforms.chunker.hierarchical_chunker" + ).setLevel(ERROR) from docling.document_converter import DocumentConverter @@ -1815,7 +1865,9 @@ async def process_file_in_background_with_document( etl_service = "DOCLING" # Update page usage - await page_limit_service.update_page_usage(user_id, estimated_pages, allow_exceed=True) + await page_limit_service.update_page_usage( + user_id, estimated_pages, allow_exceed=True + ) else: raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") @@ -1829,7 +1881,7 @@ async def process_file_in_background_with_document( # ===== STEP 2: Check for duplicate content ===== content_hash = generate_content_hash(markdown_content, search_space_id) - + existing_by_content = await check_duplicate_document(session, content_hash) if existing_by_content and existing_by_content.id != document.id: # Duplicate content found - mark this document as failed @@ -1846,7 +1898,7 @@ async def process_file_in_background_with_document( ) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - + if user_llm: document_metadata = { "file_name": filename, @@ -1881,10 +1933,10 @@ async def process_file_in_background_with_document( **(document.document_metadata or {}), } flag_modified(document, "document_metadata") - + # Use safe_set_chunks to avoid async issues safe_set_chunks(document, chunks) - + document.blocknote_document = blocknote_json document.content_needs_reindexing = False document.updated_at = get_current_timestamp() @@ -1922,7 +1974,11 @@ async def process_file_in_background_with_document( log_entry, error_message, str(e), - {"error_type": type(e).__name__, "filename": filename, "document_id": document.id}, + { + "error_type": type(e).__name__, + "filename": filename, + "document_id": document.id, + }, ) logging.error(f"Error processing file with document: {error_message}") raise diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index 19092b592..e83d7c855 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -136,11 +136,19 @@ async def add_youtube_video_document( document = existing_document is_new_document = False # Check if already being processed - if DocumentStatus.is_state(existing_document.status, DocumentStatus.PENDING): - logging.info(f"YouTube video {video_id} already pending. Returning existing.") + if DocumentStatus.is_state( + existing_document.status, DocumentStatus.PENDING + ): + logging.info( + f"YouTube video {video_id} already pending. Returning existing." + ) return existing_document - if DocumentStatus.is_state(existing_document.status, DocumentStatus.PROCESSING): - logging.info(f"YouTube video {video_id} already processing. Returning existing.") + if DocumentStatus.is_state( + existing_document.status, DocumentStatus.PROCESSING + ): + logging.info( + f"YouTube video {video_id} already processing. Returning existing." + ) return existing_document else: # Create new document with PENDING status (visible in UI immediately) @@ -300,7 +308,9 @@ async def add_youtube_video_document( "video_id": video_id, }, ) - logging.info(f"Document for YouTube video {video_id} unchanged. Marking as ready.") + logging.info( + f"Document for YouTube video {video_id} unchanged. Marking as ready." + ) document.status = DocumentStatus.ready() await session.commit() return document @@ -408,7 +418,9 @@ async def add_youtube_video_document( # Mark document as failed if it exists if document: try: - document.status = DocumentStatus.failed(f"Database error: {str(db_error)[:150]}") + document.status = DocumentStatus.failed( + f"Database error: {str(db_error)[:150]}" + ) document.updated_at = get_current_timestamp() await session.commit() except Exception: diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx index 2bba85085..b214c96be 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx @@ -38,7 +38,9 @@ export function DocumentTypeChip({ type, className }: { type: string; className? className={`inline-flex items-center gap-1.5 rounded bg-muted/40 px-2 py-1 text-xs text-muted-foreground max-w-full overflow-hidden ${className ?? ""}`} > {icon} - {fullLabel} + + {fullLabel} + ); diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsFilters.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsFilters.tsx index 028f38098..6bd5f8460 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsFilters.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsFilters.tsx @@ -68,9 +68,7 @@ export function DocumentsFilters({ const filteredTypes = useMemo(() => { if (!typeSearchQuery.trim()) return uniqueTypes; const query = typeSearchQuery.toLowerCase(); - return uniqueTypes.filter((type) => - getDocumentTypeLabel(type).toLowerCase().includes(query) - ); + return uniqueTypes.filter((type) => getDocumentTypeLabel(type).toLowerCase().includes(query)); }, [uniqueTypes, typeSearchQuery]); const typeCounts = useMemo(() => { @@ -156,94 +154,95 @@ export function DocumentsFilters({ {/* Filter Buttons Group */}