diff --git a/surfsense_backend/alembic/versions/9_add_discord_connector_enum_and_documenttype.py b/surfsense_backend/alembic/versions/9_add_discord_connector_enum_and_documenttype.py new file mode 100644 index 000000000..fbf748ae6 --- /dev/null +++ b/surfsense_backend/alembic/versions/9_add_discord_connector_enum_and_documenttype.py @@ -0,0 +1,112 @@ +"""Add DISCORD_CONNECTOR to SearchSourceConnectorType and DocumentType enums + +Revision ID: 9 +Revises: 8 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "9" +down_revision: Union[str, None] = "8" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +# Define the ENUM type name and the new value +CONNECTOR_ENUM = "searchsourceconnectortype" +CONNECTOR_NEW_VALUE = "DISCORD_CONNECTOR" +DOCUMENT_ENUM = "documenttype" +DOCUMENT_NEW_VALUE = "DISCORD_CONNECTOR" + + +def upgrade() -> None: + """Upgrade schema - add DISCORD_CONNECTOR to connector and document enum.""" + # Add DISCORD_CONNECTOR to searchsourceconnectortype + op.execute(f"ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}'") + # Add DISCORD_CONNECTOR to documenttype + op.execute(f"ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}'") + + +def downgrade() -> None: + """Downgrade schema - remove DISCORD_CONNECTOR from connector and document enum.""" + + # Old enum name + old_connector_enum_name = f"{CONNECTOR_ENUM}_old" + old_document_enum_name = f"{DOCUMENT_ENUM}_old" + + old_connector_values = ( + "SERPER_API", + "TAVILY_API", + "LINKUP_API", + "SLACK_CONNECTOR", + "NOTION_CONNECTOR", + "GITHUB_CONNECTOR", + "LINEAR_CONNECTOR", + ) + old_document_values = ( + "EXTENSION", + "CRAWLED_URL", + "FILE", + "SLACK_CONNECTOR", + "NOTION_CONNECTOR", + "YOUTUBE_VIDEO", + "GITHUB_CONNECTOR", + "LINEAR_CONNECTOR", + ) + + old_connector_values_sql = ", ".join([f"'{v}'" for v in old_connector_values]) + old_document_values_sql = ", ".join([f"'{v}'" for v in old_document_values]) + + # Table and column names (adjust if different) + connector_table_name = "search_source_connectors" + connector_column_name = "connector_type" + document_table_name = "documents" + document_column_name = "document_type" + + # Connector Enum Downgrade Steps + # 1. Rename the current connector enum type + op.execute(f"ALTER TYPE {CONNECTOR_ENUM} RENAME TO {old_connector_enum_name}") + + # 2. Create the new connector enum type with the old values + op.execute(f"CREATE TYPE {CONNECTOR_ENUM} AS ENUM({old_connector_values_sql})") + + # 3. Update the connector table: + op.execute( + f"ALTER TABLE {connector_table_name} " + f"ALTER COLUMN {connector_column_name} " + f"TYPE {CONNECTOR_ENUM} " + f"USING {connector_column_name}::text::{CONNECTOR_ENUM}" + ) + + # 4. Drop the old connector enum type + op.execute(f"DROP TYPE {old_connector_enum_name}") + + + # Document Enum Downgrade Steps + # 1. Rename the current document enum type + op.execute(f"ALTER TYPE {DOCUMENT_ENUM} RENAME TO {old_document_enum_name}") + + # 2. Create the new document enum type with the old values + op.execute(f"CREATE TYPE {DOCUMENT_ENUM} AS ENUM({old_document_values_sql})") + + # 3. Delete rows with the new value from the documents table + op.execute( + f"DELETE FROM {document_table_name} WHERE {document_column_name}::text = '{DOCUMENT_NEW_VALUE}'" + ) + + # 4. Alter the document table to use the new enum type (casting old values) + op.execute( + f"ALTER TABLE {document_table_name} " + f"ALTER COLUMN {document_column_name} " + f"TYPE {DOCUMENT_ENUM} " + f"USING {document_column_name}::text::{DOCUMENT_ENUM}" + ) + + # 5. Drop the old enum types + op.execute(f"DROP TYPE {old_document_enum_name}") + + # ### end Alembic commands ### diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 7ee566311..dca6cbde6 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -50,6 +50,7 @@ class DocumentType(str, Enum): YOUTUBE_VIDEO = "YOUTUBE_VIDEO" GITHUB_CONNECTOR = "GITHUB_CONNECTOR" LINEAR_CONNECTOR = "LINEAR_CONNECTOR" + DISCORD_CONNECTOR = "DISCORD_CONNECTOR" class SearchSourceConnectorType(str, Enum): SERPER_API = "SERPER_API" # NOT IMPLEMENTED YET : DON'T REMEMBER WHY : MOST PROBABLY BECAUSE WE NEED TO CRAWL THE RESULTS RETURNED BY IT @@ -59,6 +60,7 @@ class SearchSourceConnectorType(str, Enum): NOTION_CONNECTOR = "NOTION_CONNECTOR" GITHUB_CONNECTOR = "GITHUB_CONNECTOR" LINEAR_CONNECTOR = "LINEAR_CONNECTOR" + DISCORD_CONNECTOR = "DISCORD_CONNECTOR" class ChatType(str, Enum): GENERAL = "GENERAL" diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 15c815032..7df690e9b 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -19,7 +19,7 @@ from app.schemas import SearchSourceConnectorCreate, SearchSourceConnectorUpdate from app.users import current_active_user from app.utils.check_ownership import check_ownership from pydantic import BaseModel, Field, ValidationError -from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues +from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues, index_discord_messages from app.connectors.github_connector import GitHubConnector from datetime import datetime, timedelta import logging @@ -378,6 +378,30 @@ async def index_connector_content( background_tasks.add_task(run_linear_indexing_with_new_session, connector_id, search_space_id) response_message = "Linear indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: + # Determine the time range that will be indexed + if not connector.last_indexed_at: + start_date = "365 days ago" + else: + today = datetime.now().date() + if connector.last_indexed_at.date() == today: + # If last indexed today, go back 1 day to ensure we don't miss anything + start_date = (today - timedelta(days=1)).strftime("%Y-%m-%d") + else: + start_date = connector.last_indexed_at.strftime("%Y-%m-%d") + + indexing_from = start_date + indexing_to = today_str + + # Run indexing in background + logger.info( + f"Triggering Discord indexing for connector {connector_id} into search space {search_space_id}" + ) + background_tasks.add_task( + run_discord_indexing_with_new_session, connector_id, search_space_id + ) + response_message = "Discord indexing started in the background." + else: raise HTTPException( status_code=400, @@ -577,3 +601,45 @@ async def run_linear_indexing( await session.rollback() logger.error(f"Critical error in run_linear_indexing for connector {connector_id}: {e}", exc_info=True) # Optionally update status in DB to indicate failure + +# Add new helper functions for discord indexing +async def run_discord_indexing_with_new_session( + connector_id: int, + search_space_id: int +): + """ + Create a new session and run the Discord indexing task. + This prevents session leaks by creating a dedicated session for the background task. + """ + async with async_session_maker() as session: + await run_discord_indexing(session, connector_id, search_space_id) + +async def run_discord_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int +): + """ + Background task to run Discord indexing. + Args: + session: Database session + connector_id: ID of the Discord connector + search_space_id: ID of the search space + """ + try: + # Index Discord messages without updating last_indexed_at (we'll do it separately) + documents_processed, error_or_warning = await index_discord_messages( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + update_last_indexed=False # Don't update timestamp in the indexing function + ) + + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) + if documents_processed > 0: + await update_connector_last_indexed(session, connector_id) + logger.info(f"Discord indexing completed successfully: {documents_processed} documents processed") + else: + logger.error(f"Discord indexing failed or no documents processed: {error_or_warning}") + except Exception as e: + logger.error(f"Error in background Discord indexing task: {str(e)}") \ No newline at end of file diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 0e0964eeb..b552e5187 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -11,6 +11,8 @@ from app.connectors.slack_history import SlackHistory from app.connectors.notion_history import NotionHistoryConnector from app.connectors.github_connector import GitHubConnector from app.connectors.linear_connector import LinearConnector +from app.connectors.discord_connector import DiscordConnector +from discord import DiscordException from slack_sdk.errors import SlackApiError import logging @@ -912,3 +914,181 @@ async def index_linear_issues( await session.rollback() logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True) return 0, f"Failed to index Linear issues: {str(e)}" + +async def index_discord_messages( + session: AsyncSession, + connector_id: int, + search_space_id: int, + update_last_indexed: bool = True +) -> Tuple[int, Optional[str]]: + """ + Index Discord messages from all accessible channels. + + Args: + session: Database session + connector_id: ID of the Discord connector + search_space_id: ID of the search space to store documents in + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + try: + # Get the connector + result = await session.execute( + select(SearchSourceConnector) + .filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR + ) + ) + connector = result.scalars().first() + + if not connector: + return 0, f"Connector with ID {connector_id} not found or is not a Discord connector" + + # Get the Discord token from the connector config + discord_token = connector.config.get("DISCORD_BOT_TOKEN") + if not discord_token: + return 0, "Discord token not found in connector config" + + # Initialize Discord client + discord_client = DiscordConnector(token=discord_token) + + # Calculate date range + end_date = datetime.now(timezone.utc) + + # Use last_indexed_at as start date if available, otherwise use 365 days ago + if connector.last_indexed_at: + start_date = connector.last_indexed_at.replace(tzinfo=timezone.utc) + logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date") + else: + start_date = end_date - timedelta(days=365) # Use 365 days as default + logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (365 days ago) as start date") + + # Format dates for Discord API + start_date_str = start_date.isoformat() + end_date_str = end_date.isoformat() + + documents_indexed = 0 + documents_skipped = 0 + skipped_guilds = [] + + try: + await discord_client.start_bot() + guilds = await discord_client.get_guilds() + logger.info(f"Found {len(guilds)} guilds") + except Exception as e: + await discord_client.close_bot() + return 0, f"Failed to get Discord guilds: {str(e)}" + if not guilds: + await discord_client.close_bot() + return 0, "No Discord guilds found" + + # Process each guild + for guild in guilds: + guild_id = guild["id"] + guild_name = guild["name"] + logger.info(f"Processing guild: {guild_name} ({guild_id})") + try: + channels = await discord_client.get_text_channels(guild_id) + + if not channels: + logger.info(f"No channels found in guild {guild_name}. Skipping.") + skipped_guilds.append(f"{guild_name} (no channels)") + documents_skipped += 1 + continue + + for channel in channels: + channel_id = channel["id"] + channel_name = channel["name"] + + try: + messages = await discord_client.get_channel_history( + channel_id=channel_id, + start_date=start_date_str, + end_date=end_date_str, + limit=1000 + ) + except Exception as e: + logger.error(f"Failed to get messages for channel {channel_name}: {str(e)}") + documents_skipped += 1 + continue + + if not messages: + continue + + for message in messages: + try: + content = message.get("content", "") + if not content: + continue + + content_hash = generate_content_hash(content) + existing_doc_by_hash_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_document_by_hash = existing_doc_by_hash_result.scalars().first() + + if existing_document_by_hash: + documents_skipped += 1 + continue + + summary_content = f"Discord message by {message.get('author_name', 'Unknown')} in {channel_name} ({guild_name})\n\n{content}" + summary_embedding = config.embedding_model_instance.embed(summary_content) + chunks = [ + Chunk(content=chunk.text, embedding=config.embedding_model_instance.embed(chunk.text)) + for chunk in config.chunker_instance.chunk(content) + ] + document = Document( + search_space_id=search_space_id, + title=f"Discord - {guild_name}#{channel_name}", + document_type=DocumentType.DISCORD_CONNECTOR, + document_metadata={ + "guild_id": guild_id, + "guild_name": guild_name, + "channel_id": channel_id, + "channel_name": channel_name, + "message_id": message.get("id"), + "author_id": message.get("author_id"), + "author_name": message.get("author_name"), + "created_at": message.get("created_at"), + "indexed_at": datetime.now(timezone.utc).isoformat() + }, + content=summary_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks + ) + + session.add(document) + documents_indexed += 1 + + except Exception as e: + logger.error(f"Error processing Discord message: {str(e)}", exc_info=True) + documents_skipped += 1 + continue + + except Exception as e: + logger.error(f"Error processing guild {guild_name}: {str(e)}", exc_info=True) + skipped_guilds.append(f"{guild_name} (processing error)") + documents_skipped += 1 + continue + + if update_last_indexed and documents_indexed > 0: + connector.last_indexed_at = datetime.now(timezone.utc) + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + await session.commit() + await discord_client.close_bot() + logger.info(f"Discord indexing completed: {documents_indexed} new messages, {documents_skipped} skipped") + return documents_indexed, None + + except SQLAlchemyError as db_error: + await session.rollback() + logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True) + return 0, f"Database error: {str(db_error)}" + except Exception as e: + await session.rollback() + logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True) + return 0, f"Failed to index Discord messages: {str(e)}"