diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 6ac7c55de..2c2fec48b 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -76,11 +76,6 @@ SLACK_CLIENT_ID=your_slack_client_id_here SLACK_CLIENT_SECRET=your_slack_client_secret_here SLACK_REDIRECT_URI=http://localhost:8000/api/v1/auth/slack/connector/callback -# Teams OAuth Configuration -TEAMS_CLIENT_ID=your_teams_client_id_here -TEAMS_CLIENT_SECRET=your_teams_client_secret_here -TEAMS_REDIRECT_URI=http://localhost:8000/api/v1/auth/teams/connector/callback - # Embedding Model # Examples: # # Get sentence transformers embeddings diff --git a/surfsense_backend/alembic/versions/59_add_teams_connector_enums.py b/surfsense_backend/alembic/versions/59_add_teams_connector_enums.py deleted file mode 100644 index f13fbe9e5..000000000 --- a/surfsense_backend/alembic/versions/59_add_teams_connector_enums.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Add TEAMS_CONNECTOR to SearchSourceConnectorType and DocumentType enums - -Revision ID: 59 -Revises: 58 -""" - -from collections.abc import Sequence - -from alembic import op - -# revision identifiers, used by Alembic. -revision: str = "59" -down_revision: str | None = "58" -branch_labels: str | Sequence[str] | None = None -depends_on: str | Sequence[str] | None = None - -# Define the ENUM type name and the new value -CONNECTOR_ENUM = "searchsourceconnectortype" -CONNECTOR_NEW_VALUE = "TEAMS_CONNECTOR" -DOCUMENT_ENUM = "documenttype" -DOCUMENT_NEW_VALUE = "TEAMS_CONNECTOR" - - -def upgrade() -> None: - """Upgrade schema - add TEAMS_CONNECTOR to connector and document enum safely.""" - # Add TEAMS_CONNECTOR to searchsourceconnectortype only if not exists - op.execute( - f""" - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_enum - WHERE enumlabel = '{CONNECTOR_NEW_VALUE}' - AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{CONNECTOR_ENUM}') - ) THEN - ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}'; - END IF; - END$$; - """ - ) - - # Add TEAMS_CONNECTOR to documenttype only if not exists - op.execute( - f""" - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_enum - WHERE enumlabel = '{DOCUMENT_NEW_VALUE}' - AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{DOCUMENT_ENUM}') - ) THEN - ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}'; - END IF; - END$$; - """ - ) - - -def downgrade() -> None: - """Downgrade schema - remove TEAMS_CONNECTOR from connector and document enum.""" - - # Old enum name - old_connector_enum_name = f"{CONNECTOR_ENUM}_old" - old_document_enum_name = f"{DOCUMENT_ENUM}_old" - - # All connector values except TEAMS_CONNECTOR - old_connector_values = ( - "SERPER_API", - "TAVILY_API", - "SEARXNG_API", - "LINKUP_API", - "BAIDU_SEARCH_API", - "SLACK_CONNECTOR", - "NOTION_CONNECTOR", - "GITHUB_CONNECTOR", - "LINEAR_CONNECTOR", - "DISCORD_CONNECTOR", - "JIRA_CONNECTOR", - "CONFLUENCE_CONNECTOR", - "CLICKUP_CONNECTOR", - "GOOGLE_CALENDAR_CONNECTOR", - "GOOGLE_GMAIL_CONNECTOR", - "GOOGLE_DRIVE_CONNECTOR", - "AIRTABLE_CONNECTOR", - "LUMA_CONNECTOR", - "ELASTICSEARCH_CONNECTOR", - "WEBCRAWLER_CONNECTOR", - ) - - # All document values except TEAMS_CONNECTOR - old_document_values = ( - "EXTENSION", - "CRAWLED_URL", - "FILE", - "SLACK_CONNECTOR", - "NOTION_CONNECTOR", - "YOUTUBE_VIDEO", - "GITHUB_CONNECTOR", - "LINEAR_CONNECTOR", - "DISCORD_CONNECTOR", - "JIRA_CONNECTOR", - "CONFLUENCE_CONNECTOR", - "CLICKUP_CONNECTOR", - "GOOGLE_CALENDAR_CONNECTOR", - "GOOGLE_GMAIL_CONNECTOR", - "GOOGLE_DRIVE_FILE", - "AIRTABLE_CONNECTOR", - "LUMA_CONNECTOR", - "ELASTICSEARCH_CONNECTOR", - "BOOKSTACK_CONNECTOR", - "CIRCLEBACK", - "NOTE", - ) - - old_connector_values_sql = ", ".join([f"'{v}'" for v in old_connector_values]) - old_document_values_sql = ", ".join([f"'{v}'" for v in old_document_values]) - - # Table and column names - connector_table_name = "search_source_connectors" - connector_column_name = "connector_type" - document_table_name = "documents" - document_column_name = "document_type" - - # Connector Enum Downgrade Steps - # 1. Rename the current connector enum type - op.execute(f"ALTER TYPE {CONNECTOR_ENUM} RENAME TO {old_connector_enum_name}") - - # 2. Create the new connector enum type with the old values - op.execute(f"CREATE TYPE {CONNECTOR_ENUM} AS ENUM({old_connector_values_sql})") - - # 3. Alter the column to use the new connector enum type - op.execute( - f""" - ALTER TABLE {connector_table_name} - ALTER COLUMN {connector_column_name} TYPE {CONNECTOR_ENUM} - USING {connector_column_name}::text::{CONNECTOR_ENUM} - """ - ) - - # 4. Drop the old connector enum type - op.execute(f"DROP TYPE {old_connector_enum_name}") - - # Document Enum Downgrade Steps - # 1. Rename the current document enum type - op.execute(f"ALTER TYPE {DOCUMENT_ENUM} RENAME TO {old_document_enum_name}") - - # 2. Create the new document enum type with the old values - op.execute(f"CREATE TYPE {DOCUMENT_ENUM} AS ENUM({old_document_values_sql})") - - # 3. Alter the column to use the new document enum type - op.execute( - f""" - ALTER TABLE {document_table_name} - ALTER COLUMN {document_column_name} TYPE {DOCUMENT_ENUM} - USING {document_column_name}::text::{DOCUMENT_ENUM} - """ - ) - - # 4. Drop the old document enum type - op.execute(f"DROP TYPE {old_document_enum_name}") diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index e91d865fa..a3cdad359 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -26,7 +26,6 @@ _ALL_CONNECTORS: list[str] = [ "EXTENSION", "FILE", "SLACK_CONNECTOR", - "TEAMS_CONNECTOR", "NOTION_CONNECTOR", "YOUTUBE_VIDEO", "GITHUB_CONNECTOR", @@ -574,7 +573,6 @@ def create_search_knowledge_base_tool( - FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files) - NOTE: "SurfSense Notes" (notes created inside SurfSense) - SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications) - - TEAMS_CONNECTOR: "Microsoft Teams messages and conversations" (personal Teams communications) - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) - YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos) - GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 448e2c253..e76e69e94 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -117,11 +117,6 @@ class Config: DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI") DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN") - # Microsoft Teams OAuth - TEAMS_CLIENT_ID = os.getenv("TEAMS_CLIENT_ID") - TEAMS_CLIENT_SECRET = os.getenv("TEAMS_CLIENT_SECRET") - TEAMS_REDIRECT_URI = os.getenv("TEAMS_REDIRECT_URI") - # ClickUp OAuth CLICKUP_CLIENT_ID = os.getenv("CLICKUP_CLIENT_ID") CLICKUP_CLIENT_SECRET = os.getenv("CLICKUP_CLIENT_SECRET") diff --git a/surfsense_backend/app/connectors/teams_connector.py b/surfsense_backend/app/connectors/teams_connector.py deleted file mode 100644 index 29c2db127..000000000 --- a/surfsense_backend/app/connectors/teams_connector.py +++ /dev/null @@ -1,338 +0,0 @@ -""" -Microsoft Teams Connector - -A module for interacting with Microsoft Teams Graph API to retrieve teams, channels, and message history. - -Supports OAuth-based authentication with token refresh. -""" - -import logging -from datetime import datetime, timezone -from typing import Any - -import httpx -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.config import config -from app.db import SearchSourceConnector -from app.routes.teams_add_connector_route import refresh_teams_token -from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase -from app.utils.oauth_security import TokenEncryption - -logger = logging.getLogger(__name__) - - -class TeamsConnector: - """Class for retrieving teams, channels, and message history from Microsoft Teams.""" - - # Microsoft Graph API endpoints - GRAPH_API_BASE = "https://graph.microsoft.com/v1.0" - - def __init__( - self, - access_token: str | None = None, - session: AsyncSession | None = None, - connector_id: int | None = None, - credentials: TeamsAuthCredentialsBase | None = None, - ): - """ - Initialize the TeamsConnector with an access token or OAuth credentials. - - Args: - access_token: Microsoft Graph API access token (optional, for backward compatibility) - session: Database session for token refresh (optional) - connector_id: Connector ID for token refresh (optional) - credentials: Teams OAuth credentials (optional, will be loaded from DB if not provided) - """ - self._session = session - self._connector_id = connector_id - self._credentials = credentials - self._access_token = access_token - - async def _get_valid_token(self) -> str: - """ - Get valid Microsoft Teams access token, refreshing if needed. - - Returns: - Valid access token - - Raises: - ValueError: If credentials are missing or invalid - Exception: If token refresh fails - """ - # If we have a direct token (backward compatibility), use it - if ( - self._access_token - and self._session is None - and self._connector_id is None - and self._credentials is None - ): - return self._access_token - - # Load credentials from DB if not provided - if self._credentials is None: - if not self._session or not self._connector_id: - raise ValueError( - "Cannot load credentials: session and connector_id required" - ) - - result = await self._session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == self._connector_id - ) - ) - connector = result.scalars().first() - - if not connector: - raise ValueError(f"Connector {self._connector_id} not found") - - config_data = connector.config.copy() - - # Decrypt credentials if they are encrypted - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - try: - token_encryption = TokenEncryption(config.SECRET_KEY) - - # Decrypt sensitive fields - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - - logger.info( - "Decrypted Teams credentials for connector %s", - self._connector_id, - ) - except Exception as e: - logger.error( - "Failed to decrypt Teams credentials for connector %s: %s", - self._connector_id, - str(e), - ) - raise ValueError( - f"Failed to decrypt Teams credentials: {e!s}" - ) from e - - try: - self._credentials = TeamsAuthCredentialsBase.from_dict(config_data) - except Exception as e: - raise ValueError(f"Invalid Teams credentials: {e!s}") from e - - # Check if token is expired and refreshable - if self._credentials.is_expired and self._credentials.is_refreshable: - try: - logger.info( - "Teams token expired for connector %s, refreshing...", - self._connector_id, - ) - - # Get connector for refresh - result = await self._session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == self._connector_id - ) - ) - connector = result.scalars().first() - - if not connector: - raise RuntimeError( - f"Connector {self._connector_id} not found; cannot refresh token." - ) - - # Refresh token - connector = await refresh_teams_token(self._session, connector) - - # Reload credentials after refresh - config_data = connector.config.copy() - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - - self._credentials = TeamsAuthCredentialsBase.from_dict(config_data) - - logger.info( - "Successfully refreshed Teams token for connector %s", - self._connector_id, - ) - except Exception as e: - logger.error( - "Failed to refresh Teams token for connector %s: %s", - self._connector_id, - str(e), - ) - raise ValueError( - f"Failed to refresh Teams OAuth credentials: {e!s}" - ) from e - - return self._credentials.access_token - - async def get_joined_teams(self) -> list[dict[str, Any]]: - """ - Get list of all teams the user is a member of. - - Returns: - List of team objects with id, display_name, etc. - """ - access_token = await self._get_valid_token() - - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.GRAPH_API_BASE}/me/joinedTeams", - headers={"Authorization": f"Bearer {access_token}"}, - timeout=30.0, - ) - - if response.status_code != 200: - raise ValueError( - f"Failed to get joined teams: {response.status_code} - {response.text}" - ) - - data = response.json() - return data.get("value", []) - - async def get_team_channels(self, team_id: str) -> list[dict[str, Any]]: - """ - Get list of all channels in a team. - - Args: - team_id: The team ID - - Returns: - List of channel objects - """ - access_token = await self._get_valid_token() - - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.GRAPH_API_BASE}/teams/{team_id}/channels", - headers={"Authorization": f"Bearer {access_token}"}, - timeout=30.0, - ) - - if response.status_code != 200: - raise ValueError( - f"Failed to get channels for team {team_id}: {response.status_code} - {response.text}" - ) - - data = response.json() - return data.get("value", []) - - async def get_channel_messages( - self, - team_id: str, - channel_id: str, - start_date: datetime | None = None, - end_date: datetime | None = None, - ) -> list[dict[str, Any]]: - """ - Get messages from a specific channel with optional date filtering. - - Args: - team_id: The team ID - channel_id: The channel ID - start_date: Optional start date for filtering messages - end_date: Optional end date for filtering messages - - Returns: - List of message objects - """ - access_token = await self._get_valid_token() - - async with httpx.AsyncClient() as client: - url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages" - - # Note: The Graph API for channel messages doesn't support $filter parameter - # We fetch all messages and filter them client-side - response = await client.get( - url, - headers={"Authorization": f"Bearer {access_token}"}, - timeout=30.0, - ) - - if response.status_code != 200: - raise ValueError( - f"Failed to get messages from channel {channel_id}: {response.status_code} - {response.text}" - ) - - data = response.json() - messages = data.get("value", []) - - # Filter messages by date if needed (client-side filtering) - if start_date or end_date: - # Make sure comparison dates are timezone-aware (UTC) - if start_date and start_date.tzinfo is None: - start_date = start_date.replace(tzinfo=timezone.utc) - if end_date and end_date.tzinfo is None: - end_date = end_date.replace(tzinfo=timezone.utc) - - filtered_messages = [] - for message in messages: - created_at_str = message.get("createdDateTime") - if not created_at_str: - continue - - # Parse the ISO 8601 datetime string (already timezone-aware) - created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) - - # Check if message is within date range - if start_date and created_at < start_date: - continue - if end_date and created_at > end_date: - continue - - filtered_messages.append(message) - - return filtered_messages - - return messages - - async def get_message_replies( - self, team_id: str, channel_id: str, message_id: str - ) -> list[dict[str, Any]]: - """ - Get replies to a specific message. - - Args: - team_id: The team ID - channel_id: The channel ID - message_id: The message ID - - Returns: - List of reply message objects - """ - access_token = await self._get_valid_token() - - async with httpx.AsyncClient() as client: - url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages/{message_id}/replies" - - response = await client.get( - url, - headers={"Authorization": f"Bearer {access_token}"}, - timeout=30.0, - ) - - if response.status_code != 200: - logger.warning( - "Failed to get replies for message %s: %s - %s", - message_id, - response.status_code, - response.text, - ) - return [] - - data = response.json() - return data.get("value", []) diff --git a/surfsense_backend/app/connectors/teams_history.py b/surfsense_backend/app/connectors/teams_history.py deleted file mode 100644 index 314ee6304..000000000 --- a/surfsense_backend/app/connectors/teams_history.py +++ /dev/null @@ -1,254 +0,0 @@ -""" -Microsoft Teams History Module - -A module for retrieving conversation history from Microsoft Teams channels. -Allows fetching team lists, channel lists, and message history with date range filtering. -""" - -import logging -from datetime import datetime -from typing import Any - -from sqlalchemy.ext.asyncio import AsyncSession - -from app.connectors.teams_connector import TeamsConnector -from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase - -logger = logging.getLogger(__name__) - - -class TeamsHistory: - """Class for retrieving conversation history from Microsoft Teams channels.""" - - def __init__( - self, - access_token: str | None = None, - session: AsyncSession | None = None, - connector_id: int | None = None, - credentials: TeamsAuthCredentialsBase | None = None, - ): - """ - Initialize the TeamsHistory class. - - Args: - access_token: Microsoft Graph API access token (optional, for backward compatibility) - session: Database session for token refresh (optional) - connector_id: Connector ID for token refresh (optional) - credentials: Teams OAuth credentials (optional, will be loaded from DB if not provided) - """ - self.connector = TeamsConnector( - access_token=access_token, - session=session, - connector_id=connector_id, - credentials=credentials, - ) - - async def get_all_teams(self) -> list[dict[str, Any]]: - """ - Get list of all teams the user has access to. - - Returns: - List of team objects containing team metadata. - """ - try: - teams = await self.connector.get_joined_teams() - logger.info("Retrieved %s teams", len(teams)) - return teams - except Exception as e: - logger.error("Error fetching teams: %s", str(e)) - raise - - async def get_channels_for_team(self, team_id: str) -> list[dict[str, Any]]: - """ - Get list of all channels in a specific team. - - Args: - team_id: The ID of the team - - Returns: - List of channel objects containing channel metadata. - """ - try: - channels = await self.connector.get_team_channels(team_id) - logger.info("Retrieved %s channels for team %s", len(channels), team_id) - return channels - except Exception as e: - logger.error("Error fetching channels for team %s: %s", team_id, str(e)) - raise - - async def get_messages_from_channel( - self, - team_id: str, - channel_id: str, - start_date: datetime | None = None, - end_date: datetime | None = None, - include_replies: bool = True, - ) -> list[dict[str, Any]]: - """ - Get messages from a specific channel with optional date filtering. - - Args: - team_id: The ID of the team - channel_id: The ID of the channel - start_date: Optional start date for filtering messages - end_date: Optional end date for filtering messages - include_replies: Whether to include reply messages (default: True) - - Returns: - List of message objects with content and metadata. - """ - try: - messages = await self.connector.get_channel_messages( - team_id, channel_id, start_date, end_date - ) - - logger.info( - "Retrieved %s messages from channel %s in team %s", - len(messages), - channel_id, - team_id, - ) - - # Fetch replies if requested - if include_replies: - all_messages = [] - for message in messages: - all_messages.append(message) - # Get replies for this message - try: - replies = await self.connector.get_message_replies( - team_id, channel_id, message.get("id") - ) - all_messages.extend(replies) - except Exception: - logger.warning( - "Failed to get replies for message %s", - message.get("id"), - exc_info=True, - ) - # Continue without replies for this message - - logger.info( - "Total messages including replies: %s for channel %s", - len(all_messages), - channel_id, - ) - return all_messages - - return messages - - except Exception as e: - logger.error( - "Error fetching messages from channel %s in team %s: %s", - channel_id, - team_id, - str(e), - ) - raise - - async def get_all_messages_from_team( - self, - team_id: str, - start_date: datetime | None = None, - end_date: datetime | None = None, - include_replies: bool = True, - ) -> dict[str, list[dict[str, Any]]]: - """ - Get all messages from all channels in a team. - - Args: - team_id: The ID of the team - start_date: Optional start date for filtering messages - end_date: Optional end date for filtering messages - include_replies: Whether to include reply messages (default: True) - - Returns: - Dictionary mapping channel IDs to lists of messages. - """ - try: - channels = await self.get_channels_for_team(team_id) - all_channel_messages = {} - - for channel in channels: - channel_id = channel.get("id") - channel_name = channel.get("displayName", "Unknown") - - try: - messages = await self.get_messages_from_channel( - team_id, channel_id, start_date, end_date, include_replies - ) - all_channel_messages[channel_id] = messages - logger.info( - "Fetched %s messages from channel '%s' (%s)", - len(messages), - channel_name, - channel_id, - ) - except Exception: - logger.error( - "Failed to fetch messages from channel '%s' (%s)", - channel_name, - channel_id, - exc_info=True, - ) - all_channel_messages[channel_id] = [] - - return all_channel_messages - - except Exception as e: - logger.error("Error fetching messages from team %s: %s", team_id, str(e)) - raise - - async def get_all_messages( - self, - start_date: datetime | None = None, - end_date: datetime | None = None, - include_replies: bool = True, - ) -> dict[str, dict[str, list[dict[str, Any]]]]: - """ - Get all messages from all teams and channels the user has access to. - - Args: - start_date: Optional start date for filtering messages - end_date: Optional end date for filtering messages - include_replies: Whether to include reply messages (default: True) - - Returns: - Nested dictionary: team_id -> channel_id -> list of messages. - """ - try: - teams = await self.get_all_teams() - all_messages = {} - - for team in teams: - team_id = team.get("id") - team_name = team.get("displayName", "Unknown") - - try: - team_messages = await self.get_all_messages_from_team( - team_id, start_date, end_date, include_replies - ) - all_messages[team_id] = team_messages - total_messages = sum( - len(messages) for messages in team_messages.values() - ) - logger.info( - "Fetched %s total messages from team '%s' (%s)", - total_messages, - team_name, - team_id, - ) - except Exception: - logger.error( - "Failed to fetch messages from team '%s' (%s)", - team_name, - team_id, - exc_info=True, - ) - all_messages[team_id] = {} - - return all_messages - - except Exception as e: - logger.error("Error fetching all messages: %s", str(e)) - raise diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index d54254f9c..fbd53bd06 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -36,7 +36,6 @@ class DocumentType(str, Enum): CRAWLED_URL = "CRAWLED_URL" FILE = "FILE" SLACK_CONNECTOR = "SLACK_CONNECTOR" - TEAMS_CONNECTOR = "TEAMS_CONNECTOR" NOTION_CONNECTOR = "NOTION_CONNECTOR" YOUTUBE_VIDEO = "YOUTUBE_VIDEO" GITHUB_CONNECTOR = "GITHUB_CONNECTOR" @@ -63,7 +62,6 @@ class SearchSourceConnectorType(str, Enum): LINKUP_API = "LINKUP_API" BAIDU_SEARCH_API = "BAIDU_SEARCH_API" # Baidu AI Search API for Chinese web search SLACK_CONNECTOR = "SLACK_CONNECTOR" - TEAMS_CONNECTOR = "TEAMS_CONNECTOR" NOTION_CONNECTOR = "NOTION_CONNECTOR" GITHUB_CONNECTOR = "GITHUB_CONNECTOR" LINEAR_CONNECTOR = "LINEAR_CONNECTOR" diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index b4e94c732..47d540e7d 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -31,7 +31,6 @@ from .rbac_routes import router as rbac_router from .search_source_connectors_routes import router as search_source_connectors_router from .search_spaces_routes import router as search_spaces_router from .slack_add_connector_route import router as slack_add_connector_router -from .teams_add_connector_route import router as teams_add_connector_router router = APIRouter() @@ -51,7 +50,6 @@ router.include_router(linear_add_connector_router) router.include_router(luma_add_connector_router) router.include_router(notion_add_connector_router) router.include_router(slack_add_connector_router) -router.include_router(teams_add_connector_router) router.include_router(discord_add_connector_router) router.include_router(jira_add_connector_router) router.include_router(confluence_add_connector_router) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index c9831484d..58a50a6f8 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -558,7 +558,6 @@ async def index_connector_content( Currently supports: - SLACK_CONNECTOR: Indexes messages from all accessible Slack channels - - TEAMS_CONNECTOR: Indexes messages from all accessible Microsoft Teams channels - NOTION_CONNECTOR: Indexes pages from all accessible Notion pages - GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories - LINEAR_CONNECTOR: Indexes issues and comments from Linear @@ -632,19 +631,6 @@ async def index_connector_content( ) response_message = "Slack indexing started in the background." - elif connector.connector_type == SearchSourceConnectorType.TEAMS_CONNECTOR: - from app.tasks.celery_tasks.connector_tasks import ( - index_teams_messages_task, - ) - - logger.info( - f"Triggering Teams indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" - ) - index_teams_messages_task.delay( - connector_id, search_space_id, str(user.id), indexing_from, indexing_to - ) - response_message = "Teams indexing started in the background." - elif connector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import index_notion_pages_task @@ -1202,64 +1188,6 @@ async def run_discord_indexing( logger.error(f"Error in background Discord indexing task: {e!s}") -async def run_teams_indexing_with_new_session( - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str, - end_date: str, -): - """ - Create a new session and run the Microsoft Teams indexing task. - This prevents session leaks by creating a dedicated session for the background task. - """ - async with async_session_maker() as session: - await run_teams_indexing( - session, connector_id, search_space_id, user_id, start_date, end_date - ) - - -async def run_teams_indexing( - session: AsyncSession, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str, - end_date: str, -): - """ - Background task to run Microsoft Teams indexing. - Args: - session: Database session - connector_id: ID of the Teams connector - search_space_id: ID of the search space - user_id: ID of the user - start_date: Start date for indexing - end_date: End date for indexing - """ - try: - from app.tasks.connector_indexers.teams_indexer import index_teams_messages - - # Index Teams messages without updating last_indexed_at (we'll do it separately) - documents_processed, error_or_warning = await index_teams_messages( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, # Don't update timestamp in the indexing function - ) - - # Update last_indexed_at after successful indexing (even if 0 new docs - they were checked) - await update_connector_last_indexed(session, connector_id) - logger.info( - f"Teams indexing completed successfully: {documents_processed} documents processed. {error_or_warning or ''}" - ) - except Exception as e: - logger.error(f"Error in background Teams indexing task: {e!s}") - - # Add new helper functions for Jira indexing async def run_jira_indexing_with_new_session( connector_id: int, diff --git a/surfsense_backend/app/routes/teams_add_connector_route.py b/surfsense_backend/app/routes/teams_add_connector_route.py deleted file mode 100644 index ce014be0d..000000000 --- a/surfsense_backend/app/routes/teams_add_connector_route.py +++ /dev/null @@ -1,474 +0,0 @@ -""" -Microsoft Teams Connector OAuth Routes. - -Handles OAuth 2.0 authentication flow for Microsoft Teams connector using Microsoft Graph API. -""" - -import logging -from datetime import UTC, datetime, timedelta -from uuid import UUID - -import httpx -from fastapi import APIRouter, Depends, HTTPException -from fastapi.responses import RedirectResponse -from sqlalchemy.exc import IntegrityError -from sqlalchemy.ext.asyncio import AsyncSession - -from app.config import config -from app.db import ( - SearchSourceConnector, - SearchSourceConnectorType, - User, - get_async_session, -) -from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase -from app.users import current_active_user -from app.utils.connector_naming import ( - check_duplicate_connector, - extract_identifier_from_credentials, - generate_unique_connector_name, -) -from app.utils.oauth_security import OAuthStateManager, TokenEncryption - -logger = logging.getLogger(__name__) - -router = APIRouter() - -# Microsoft identity platform endpoints -AUTHORIZATION_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize" -TOKEN_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/token" - -# OAuth scopes for Microsoft Teams (Graph API) -SCOPES = [ - "offline_access", # Required for refresh tokens - "User.Read", # Read user profile - "Team.ReadBasic.All", # Read basic team information - "Channel.ReadBasic.All", # Read basic channel information - "ChannelMessage.Read.All", # Read messages in channels -] - -# Initialize security utilities -_state_manager = None -_token_encryption = None - - -def get_state_manager() -> OAuthStateManager: - """Get or create OAuth state manager instance.""" - global _state_manager - if _state_manager is None: - if not config.SECRET_KEY: - raise ValueError("SECRET_KEY must be set for OAuth security") - _state_manager = OAuthStateManager(config.SECRET_KEY) - return _state_manager - - -def get_token_encryption() -> TokenEncryption: - """Get or create token encryption instance.""" - global _token_encryption - if _token_encryption is None: - if not config.SECRET_KEY: - raise ValueError("SECRET_KEY must be set for token encryption") - _token_encryption = TokenEncryption(config.SECRET_KEY) - return _token_encryption - - -@router.get("/auth/teams/connector/add") -async def connect_teams(space_id: int, user: User = Depends(current_active_user)): - """ - Initiate Microsoft Teams OAuth flow. - - Args: - space_id: The search space ID - user: Current authenticated user - - Returns: - Authorization URL for redirect - """ - try: - if not space_id: - raise HTTPException(status_code=400, detail="space_id is required") - - if not config.TEAMS_CLIENT_ID: - raise HTTPException( - status_code=500, detail="Microsoft Teams OAuth not configured." - ) - - if not config.SECRET_KEY: - raise HTTPException( - status_code=500, detail="SECRET_KEY not configured for OAuth security." - ) - - # Generate secure state parameter with HMAC signature - state_manager = get_state_manager() - state_encoded = state_manager.generate_secure_state(space_id, user.id) - - # Build authorization URL - from urllib.parse import urlencode - - auth_params = { - "client_id": config.TEAMS_CLIENT_ID, - "response_type": "code", - "redirect_uri": config.TEAMS_REDIRECT_URI, - "response_mode": "query", - "scope": " ".join(SCOPES), - "state": state_encoded, - } - - auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}" - - logger.info( - "Generated Microsoft Teams OAuth URL for user %s, space %s", - user.id, - space_id, - ) - return {"auth_url": auth_url} - - except Exception as e: - logger.error( - "Failed to initiate Microsoft Teams OAuth: %s", str(e), exc_info=True - ) - raise HTTPException( - status_code=500, - detail=f"Failed to initiate Microsoft Teams OAuth: {e!s}", - ) from e - - -@router.get("/auth/teams/connector/callback") -async def teams_callback( - code: str | None = None, - error: str | None = None, - error_description: str | None = None, - state: str | None = None, - session: AsyncSession = Depends(get_async_session), -): - """ - Handle Microsoft Teams OAuth callback. - - Args: - code: Authorization code from Microsoft (if user granted access) - error: Error code from Microsoft (if user denied access or error occurred) - error_description: Human-readable error description - state: State parameter containing user/space info - session: Database session - - Returns: - Redirect response to frontend - """ - try: - # Handle OAuth errors (e.g., user denied access) - if error: - error_msg = error_description or error - logger.warning("Microsoft Teams OAuth error: %s", error_msg) - redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=teams_auth_failed&message={error_msg}" - return RedirectResponse(url=redirect_url) - - # Validate required parameters - if not code or not state: - raise HTTPException( - status_code=400, detail="Missing required OAuth parameters" - ) - - # Verify and decode state parameter - state_manager = get_state_manager() - try: - data = state_manager.validate_state(state) - space_id = data["space_id"] - user_id = UUID(data["user_id"]) - except (HTTPException, ValueError, KeyError) as e: - logger.error("Invalid OAuth state: %s", str(e)) - redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state" - return RedirectResponse(url=redirect_url) - - # Exchange authorization code for access token - token_data = { - "client_id": config.TEAMS_CLIENT_ID, - "client_secret": config.TEAMS_CLIENT_SECRET, - "code": code, - "redirect_uri": config.TEAMS_REDIRECT_URI, - "grant_type": "authorization_code", - } - - async with httpx.AsyncClient() as client: - token_response = await client.post( - TOKEN_URL, - data=token_data, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - timeout=30.0, - ) - - if token_response.status_code != 200: - error_detail = token_response.text - try: - error_json = token_response.json() - error_detail = error_json.get("error_description", error_detail) - except Exception: - pass - raise HTTPException( - status_code=400, detail=f"Token exchange failed: {error_detail}" - ) - - token_json = token_response.json() - - # Extract tokens from response - access_token = token_json.get("access_token") - refresh_token = token_json.get("refresh_token") - - if not access_token: - raise HTTPException( - status_code=400, detail="No access token received from Microsoft" - ) - - # Encrypt sensitive tokens before storing - token_encryption = get_token_encryption() - - # Calculate expiration time (UTC, tz-aware) - expires_at = None - if token_json.get("expires_in"): - now_utc = datetime.now(UTC) - expires_at = now_utc + timedelta(seconds=int(token_json["expires_in"])) - - # Fetch user info from Microsoft Graph API - user_info = {} - tenant_info = {} - try: - async with httpx.AsyncClient() as client: - # Get user profile - user_response = await client.get( - "https://graph.microsoft.com/v1.0/me", - headers={"Authorization": f"Bearer {access_token}"}, - timeout=30.0, - ) - if user_response.status_code == 200: - user_data = user_response.json() - user_info = { - "user_id": user_data.get("id"), - "user_name": user_data.get("displayName"), - "user_email": user_data.get("mail") - or user_data.get("userPrincipalName"), - } - - # Get organization/tenant info - org_response = await client.get( - "https://graph.microsoft.com/v1.0/organization", - headers={"Authorization": f"Bearer {access_token}"}, - timeout=30.0, - ) - if org_response.status_code == 200: - org_data = org_response.json() - if org_data.get("value") and len(org_data["value"]) > 0: - org = org_data["value"][0] - tenant_info = { - "tenant_id": org.get("id"), - "tenant_name": org.get("displayName"), - } - except Exception as e: - logger.warning( - "Failed to fetch user/tenant info from Microsoft Graph: %s", str(e) - ) - - # Store the encrypted tokens and user/tenant info in connector config - connector_config = { - "access_token": token_encryption.encrypt_token(access_token), - "refresh_token": token_encryption.encrypt_token(refresh_token) - if refresh_token - else None, - "token_type": token_json.get("token_type", "Bearer"), - "expires_in": token_json.get("expires_in"), - "expires_at": expires_at.isoformat() if expires_at else None, - "scope": token_json.get("scope"), - "tenant_id": tenant_info.get("tenant_id"), - "tenant_name": tenant_info.get("tenant_name"), - "user_id": user_info.get("user_id"), - # Mark that token is encrypted for backward compatibility - "_token_encrypted": True, - } - - # Extract unique identifier from connector credentials - connector_identifier = extract_identifier_from_credentials( - SearchSourceConnectorType.TEAMS_CONNECTOR, connector_config - ) - - # Check for duplicate connector (same tenant already connected) - is_duplicate = await check_duplicate_connector( - session, - SearchSourceConnectorType.TEAMS_CONNECTOR, - space_id, - user_id, - connector_identifier, - ) - - if is_duplicate: - logger.warning( - "Duplicate Microsoft Teams connector for user %s, space %s, tenant %s", - user_id, - space_id, - tenant_info.get("tenant_name"), - ) - redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=duplicate_connector&message=This Microsoft Teams tenant is already connected to this space" - return RedirectResponse(url=redirect_url) - - # Generate unique connector name - connector_name = await generate_unique_connector_name( - session, - SearchSourceConnectorType.TEAMS_CONNECTOR, - space_id, - user_id, - connector_identifier, - ) - - # Create new connector - new_connector = SearchSourceConnector( - name=connector_name, - connector_type=SearchSourceConnectorType.TEAMS_CONNECTOR, - is_indexable=True, - config=connector_config, - search_space_id=space_id, - user_id=user_id, - ) - - try: - session.add(new_connector) - await session.commit() - await session.refresh(new_connector) - - logger.info( - "Successfully created Microsoft Teams connector %s for user %s", - new_connector.id, - user_id, - ) - - # Redirect to frontend with success - redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?success=teams_connected&connector_id={new_connector.id}" - return RedirectResponse(url=redirect_url) - - except IntegrityError as e: - await session.rollback() - logger.error("Database integrity error creating Teams connector: %s", str(e)) - redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed" - return RedirectResponse(url=redirect_url) - - except HTTPException: - raise - except (IntegrityError, ValueError) as e: - logger.error("Teams OAuth callback error: %s", str(e), exc_info=True) - redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=teams_auth_error" - return RedirectResponse(url=redirect_url) - - -async def refresh_teams_token( - session: AsyncSession, connector: SearchSourceConnector -) -> SearchSourceConnector: - """ - Refresh Microsoft Teams OAuth tokens. - - Args: - session: Database session - connector: The connector to refresh - - Returns: - Updated connector with refreshed tokens - - Raises: - HTTPException: If token refresh fails - """ - logger.info( - "Refreshing Microsoft Teams OAuth tokens for connector %s", connector.id - ) - - credentials = TeamsAuthCredentialsBase.from_dict(connector.config) - - # Decrypt tokens if they are encrypted - token_encryption = get_token_encryption() - is_encrypted = connector.config.get("_token_encrypted", False) - refresh_token = credentials.refresh_token - - if is_encrypted and refresh_token: - try: - refresh_token = token_encryption.decrypt_token(refresh_token) - except Exception as e: - logger.error("Failed to decrypt refresh token: %s", str(e)) - raise HTTPException( - status_code=500, detail="Failed to decrypt stored refresh token" - ) from e - - if not refresh_token: - raise HTTPException( - status_code=400, - detail=f"No refresh token available for connector {connector.id}", - ) - - # Microsoft uses oauth2/v2.0/token for token refresh - refresh_data = { - "client_id": config.TEAMS_CLIENT_ID, - "client_secret": config.TEAMS_CLIENT_SECRET, - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "scope": " ".join(SCOPES), - } - - async with httpx.AsyncClient() as client: - token_response = await client.post( - TOKEN_URL, - data=refresh_data, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - timeout=30.0, - ) - - if token_response.status_code != 200: - error_detail = token_response.text - try: - error_json = token_response.json() - error_detail = error_json.get("error_description", error_detail) - except Exception: - pass - raise HTTPException( - status_code=400, detail=f"Token refresh failed: {error_detail}" - ) - - token_json = token_response.json() - - # Extract new tokens - access_token = token_json.get("access_token") - new_refresh_token = token_json.get("refresh_token") - - if not access_token: - raise HTTPException( - status_code=400, detail="No access token received from Microsoft refresh" - ) - - # Calculate expiration time (UTC, tz-aware) - expires_at = None - expires_in = token_json.get("expires_in") - if expires_in: - now_utc = datetime.now(UTC) - expires_at = now_utc + timedelta(seconds=int(expires_in)) - - # Update credentials object with encrypted tokens - credentials.access_token = token_encryption.encrypt_token(access_token) - if new_refresh_token: - credentials.refresh_token = token_encryption.encrypt_token(new_refresh_token) - credentials.expires_in = expires_in - credentials.expires_at = expires_at - credentials.scope = token_json.get("scope") - - # Preserve tenant/user info - if not credentials.tenant_id: - credentials.tenant_id = connector.config.get("tenant_id") - if not credentials.tenant_name: - credentials.tenant_name = connector.config.get("tenant_name") - if not credentials.user_id: - credentials.user_id = connector.config.get("user_id") - - # Update connector config with encrypted tokens - credentials_dict = credentials.to_dict() - credentials_dict["_token_encrypted"] = True - connector.config = credentials_dict - - await session.commit() - await session.refresh(connector) - - logger.info( - "Successfully refreshed Microsoft Teams tokens for connector %s", connector.id - ) - - return connector diff --git a/surfsense_backend/app/schemas/teams_auth_credentials.py b/surfsense_backend/app/schemas/teams_auth_credentials.py deleted file mode 100644 index 41688b102..000000000 --- a/surfsense_backend/app/schemas/teams_auth_credentials.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -Microsoft Teams OAuth credentials schema. -""" - -from datetime import UTC, datetime - -from pydantic import BaseModel, field_validator - - -class TeamsAuthCredentialsBase(BaseModel): - """Microsoft Teams OAuth credentials.""" - - access_token: str - refresh_token: str | None = None - token_type: str = "Bearer" - expires_in: int | None = None - expires_at: datetime | None = None - scope: str | None = None - tenant_id: str | None = None - tenant_name: str | None = None - user_id: str | None = None - - @property - def is_expired(self) -> bool: - """Check if the credentials have expired.""" - if self.expires_at is None: - return False - return self.expires_at <= datetime.now(UTC) - - @property - def is_refreshable(self) -> bool: - """Check if the credentials can be refreshed.""" - return self.refresh_token is not None - - def to_dict(self) -> dict: - """Convert credentials to dictionary for storage.""" - return { - "access_token": self.access_token, - "refresh_token": self.refresh_token, - "token_type": self.token_type, - "expires_in": self.expires_in, - "expires_at": self.expires_at.isoformat() if self.expires_at else None, - "scope": self.scope, - "tenant_id": self.tenant_id, - "tenant_name": self.tenant_name, - "user_id": self.user_id, - } - - @classmethod - def from_dict(cls, data: dict) -> "TeamsAuthCredentialsBase": - """Create credentials from dictionary.""" - expires_at = None - if data.get("expires_at"): - expires_at = datetime.fromisoformat(data["expires_at"]) - - return cls( - access_token=data.get("access_token", ""), - refresh_token=data.get("refresh_token"), - token_type=data.get("token_type", "Bearer"), - expires_in=data.get("expires_in"), - expires_at=expires_at, - scope=data.get("scope"), - tenant_id=data.get("tenant_id"), - tenant_name=data.get("tenant_name"), - user_id=data.get("user_id"), - ) - - @field_validator("expires_at", mode="before") - @classmethod - def ensure_aware_utc(cls, v): - """Ensure datetime is timezone-aware (UTC).""" - if isinstance(v, str): - if v.endswith("Z"): - return datetime.fromisoformat(v.replace("Z", "+00:00")) - dt = datetime.fromisoformat(v) - return dt if dt.tzinfo else dt.replace(tzinfo=UTC) - if isinstance(v, datetime): - return v if v.tzinfo else v.replace(tzinfo=UTC) - return v diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 832aee4cc..4e874729c 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -2269,80 +2269,6 @@ class ConnectorService: return result_object, discord_docs - async def search_teams( - self, - user_query: str, - search_space_id: int, - top_k: int = 20, - start_date: datetime | None = None, - end_date: datetime | None = None, - ) -> tuple: - """ - Search for Microsoft Teams messages and return both the source information and langchain documents. - - Uses combined chunk-level and document-level hybrid search with RRF fusion. - - Args: - user_query: The user's query - search_space_id: The search space ID to search in - top_k: Maximum number of results to return - start_date: Optional start date for filtering documents by updated_at - end_date: Optional end date for filtering documents by updated_at - - Returns: - tuple: (sources_info, langchain_documents) - """ - teams_docs = await self._combined_rrf_search( - query_text=user_query, - search_space_id=search_space_id, - document_type="TEAMS_CONNECTOR", - top_k=top_k, - start_date=start_date, - end_date=end_date, - ) - - # Early return if no results - if not teams_docs: - return { - "id": 53, - "name": "Microsoft Teams", - "type": "TEAMS_CONNECTOR", - "sources": [], - }, [] - - def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - team_name = metadata.get("team_name", "Unknown Team") - channel_name = metadata.get("channel_name", "Unknown Channel") - message_date = metadata.get("start_date", "") - title = f"Teams: {team_name} - {channel_name}" - if message_date: - title += f" ({message_date})" - return title - - def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - team_id = metadata.get("team_id", "") - channel_id = metadata.get("channel_id", "") - if team_id and channel_id: - return f"https://teams.microsoft.com/l/channel/{channel_id}/General?groupId={team_id}" - return "" - - sources_list = self._build_chunk_sources_from_documents( - teams_docs, - title_fn=_title_fn, - url_fn=_url_fn, - description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), - ) - - # Create result object - result_object = { - "id": 53, - "name": "Microsoft Teams", - "type": "TEAMS_CONNECTOR", - "sources": sources_list, - } - - return result_object, teams_docs - async def search_luma( self, user_query: str, diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 1d1cbe361..3cae1bbdb 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -564,49 +564,6 @@ async def _index_discord_messages( ) -@celery_app.task(name="index_teams_messages", bind=True) -def index_teams_messages_task( - self, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str, - end_date: str, -): - """Celery task to index Microsoft Teams messages.""" - import asyncio - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - loop.run_until_complete( - _index_teams_messages( - connector_id, search_space_id, user_id, start_date, end_date - ) - ) - finally: - loop.close() - - -async def _index_teams_messages( - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str, - end_date: str, -): - """Index Microsoft Teams messages with new session.""" - from app.routes.search_source_connectors_routes import ( - run_teams_indexing, - ) - - async with get_celery_session_maker()() as session: - await run_teams_indexing( - session, connector_id, search_space_id, user_id, start_date, end_date - ) - - @celery_app.task(name="index_luma_events", bind=True) def index_luma_events_task( self, diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py deleted file mode 100644 index c1e778768..000000000 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ /dev/null @@ -1,473 +0,0 @@ -""" -Microsoft Teams connector indexer. -""" - -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession - -from app.config import config -from app.connectors.teams_history import TeamsHistory -from app.db import Document, DocumentType, SearchSourceConnectorType -from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - generate_content_hash, - generate_unique_identifier_hash, -) - -from .base import ( - build_document_metadata_markdown, - calculate_date_range, - check_document_by_unique_identifier, - get_connector_by_id, - get_current_timestamp, - logger, - update_connector_last_indexed, -) - - -async def index_teams_messages( - session: AsyncSession, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str | None = None, - end_date: str | None = None, - update_last_indexed: bool = True, -) -> tuple[int, str | None]: - """ - Index Microsoft Teams messages from all accessible teams and channels. - - Args: - session: Database session - connector_id: ID of the Teams connector - search_space_id: ID of the search space to store documents in - user_id: ID of the user - start_date: Start date for indexing (YYYY-MM-DD format) - end_date: End date for indexing (YYYY-MM-DD format) - update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - - Returns: - Tuple containing (number of documents indexed, error message or None) - """ - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="teams_messages_indexing", - source="connector_indexing_task", - message=f"Starting Microsoft Teams messages indexing for connector {connector_id}", - metadata={ - "connector_id": connector_id, - "user_id": str(user_id), - "start_date": start_date, - "end_date": end_date, - }, - ) - - try: - # Get the connector - await task_logger.log_task_progress( - log_entry, - f"Retrieving Teams connector {connector_id} from database", - {"stage": "connector_retrieval"}, - ) - - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.TEAMS_CONNECTOR - ) - - if not connector: - await task_logger.log_task_failure( - log_entry, - f"Connector with ID {connector_id} not found or is not a Teams connector", - "Connector not found", - {"error_type": "ConnectorNotFound"}, - ) - return ( - 0, - f"Connector with ID {connector_id} not found or is not a Teams connector", - ) - - # Initialize Teams client with auto-refresh support - await task_logger.log_task_progress( - log_entry, - f"Initializing Teams client for connector {connector_id}", - {"stage": "client_initialization"}, - ) - - teams_client = TeamsHistory(session=session, connector_id=connector_id) - - # Handle 'undefined' string from frontend (treat as None) - if start_date == "undefined" or start_date == "": - start_date = None - if end_date == "undefined" or end_date == "": - end_date = None - - # Calculate date range - await task_logger.log_task_progress( - log_entry, - "Calculating date range for Teams indexing", - { - "stage": "date_calculation", - "provided_start_date": start_date, - "provided_end_date": end_date, - }, - ) - - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) - - logger.info( - "Indexing Teams messages from %s to %s", start_date_str, end_date_str - ) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Teams from {start_date_str} to {end_date_str}", - { - "stage": "fetch_teams", - "start_date": start_date_str, - "end_date": end_date_str, - }, - ) - - # Get all teams - try: - teams = await teams_client.get_all_teams() - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to get Teams for connector {connector_id}", - str(e), - {"error_type": "TeamsFetchError"}, - ) - return 0, f"Failed to get Teams: {e!s}" - - if not teams: - await task_logger.log_task_success( - log_entry, - f"No Teams found for connector {connector_id}", - {"teams_found": 0}, - ) - return 0, "No Teams found" - - # Track the number of documents indexed - documents_indexed = 0 - documents_skipped = 0 - skipped_channels = [] - - await task_logger.log_task_progress( - log_entry, - f"Starting to process {len(teams)} Teams", - {"stage": "process_teams", "total_teams": len(teams)}, - ) - - # Convert date strings to datetime objects for filtering - from datetime import datetime, timezone - - start_datetime = None - end_datetime = None - if start_date_str: - # Parse as naive datetime and make it timezone-aware (UTC) - start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc) - if end_date_str: - # Parse as naive datetime, set to end of day, and make it timezone-aware (UTC) - end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - - # Process each team - for team in teams: - team_id = team.get("id") - team_name = team.get("displayName", "Unknown Team") - - try: - # Get channels for this team - channels = await teams_client.get_channels_for_team(team_id) - - if not channels: - logger.info("No channels found in team %s", team_name) - continue - - # Process each channel in the team - for channel in channels: - channel_id = channel.get("id") - channel_name = channel.get("displayName", "Unknown Channel") - - try: - # Get messages for this channel - messages = await teams_client.get_messages_from_channel( - team_id, - channel_id, - start_datetime, - end_datetime, - include_replies=True, - ) - - if not messages: - logger.info( - "No messages found in channel %s of team %s for the specified date range.", - channel_name, - team_name, - ) - documents_skipped += 1 - continue - - # Process each message - for msg in messages: - # Skip deleted messages or empty content - if msg.get("deletedDateTime"): - continue - - # Extract message details - message_id = msg.get("id", "") - created_datetime = msg.get("createdDateTime", "") - from_user = msg.get("from", {}) - user_name = from_user.get("user", {}).get( - "displayName", "Unknown User" - ) - user_email = from_user.get("user", {}).get( - "userPrincipalName", "Unknown Email" - ) - - # Extract message content - body = msg.get("body", {}) - content_type = body.get("contentType", "text") - msg_text = body.get("content", "") - - # Skip empty messages - if not msg_text or msg_text.strip() == "": - continue - - # Format document metadata - metadata_sections = [ - ( - "METADATA", - [ - f"TEAM_NAME: {team_name}", - f"TEAM_ID: {team_id}", - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_TIMESTAMP: {created_datetime}", - f"MESSAGE_USER_NAME: {user_name}", - f"MESSAGE_USER_EMAIL: {user_email}", - f"CONTENT_TYPE: {content_type}", - ], - ), - ( - "CONTENT", - [ - f"FORMAT: {content_type}", - "TEXT_START", - msg_text, - "TEXT_END", - ], - ), - ] - - # Build the document string - combined_document_string = build_document_metadata_markdown( - metadata_sections - ) - - # Generate unique identifier hash for this Teams message - unique_identifier = f"{team_id}_{channel_id}_{message_id}" - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.TEAMS_CONNECTOR, - unique_identifier, - search_space_id, - ) - - # Generate content hash - content_hash = generate_content_hash( - combined_document_string, search_space_id - ) - - # Check if document with this unique identifier already exists - existing_document = ( - await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - logger.info( - "Document for Teams message %s in channel %s unchanged. Skipping.", - message_id, - channel_name, - ) - documents_skipped += 1 - continue - else: - # Content has changed - update the existing document - logger.info( - "Content changed for Teams message %s in channel %s. Updating document.", - message_id, - channel_name, - ) - - # Update chunks and embedding - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = config.embedding_model_instance.embed( - combined_document_string - ) - - # Update existing document - existing_document.content = combined_document_string - existing_document.content_hash = content_hash - existing_document.embedding = doc_embedding - existing_document.document_metadata = { - "team_name": team_name, - "team_id": team_id, - "channel_name": channel_name, - "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(messages), - "indexed_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - } - - # Delete old chunks and add new ones - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info( - "Successfully updated Teams message %s", message_id - ) - continue - - # Document doesn't exist - create new one - # Process chunks - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = config.embedding_model_instance.embed( - combined_document_string - ) - - # Create and store new document - document = Document( - search_space_id=search_space_id, - title=f"Teams - {team_name} - {channel_name}", - document_type=DocumentType.TEAMS_CONNECTOR, - document_metadata={ - "team_name": team_name, - "team_id": team_id, - "channel_name": channel_name, - "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(messages), - "indexed_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - }, - content=combined_document_string, - embedding=doc_embedding, - chunks=chunks, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - updated_at=get_current_timestamp(), - ) - - session.add(document) - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - "Committing batch: %s Teams messages processed so far", - documents_indexed, - ) - await session.commit() - - logger.info( - "Successfully indexed channel %s in team %s with %s messages", - channel_name, - team_name, - len(messages), - ) - - except Exception as e: - logger.error( - "Error processing channel %s in team %s: %s", - channel_name, - team_name, - str(e), - ) - skipped_channels.append( - f"{team_name}/{channel_name} (processing error)" - ) - documents_skipped += 1 - continue - - except Exception as e: - logger.error("Error processing team %s: %s", team_name, str(e)) - continue - - # Update the last_indexed_at timestamp for the connector only if requested - # and if we successfully indexed at least one document - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit for any remaining documents not yet committed in batches - logger.info( - "Final commit: Total %s Teams messages processed", documents_indexed - ) - await session.commit() - - # Prepare result message - result_message = None - if skipped_channels: - result_message = f"Processed {total_processed} messages. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" - else: - result_message = f"Processed {total_processed} messages." - - # Log success - await task_logger.log_task_success( - log_entry, - f"Successfully completed Teams indexing for connector {connector_id}", - { - "messages_processed": total_processed, - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "skipped_channels_count": len(skipped_channels), - "result_message": result_message, - }, - ) - - logger.info( - "Teams indexing completed: %s new messages, %s skipped", - documents_indexed, - documents_skipped, - ) - return total_processed, result_message - - except SQLAlchemyError as db_error: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Database error during Teams indexing for connector {connector_id}", - str(db_error), - {"error_type": "SQLAlchemyError"}, - ) - logger.error("Database error: %s", str(db_error)) - return 0, f"Database error: {db_error!s}" - except Exception as e: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Failed to index Teams messages for connector {connector_id}", - str(e), - {"error_type": type(e).__name__}, - ) - logger.error("Failed to index Teams messages: %s", str(e)) - return 0, f"Failed to index Teams messages: {e!s}" diff --git a/surfsense_backend/app/utils/connector_naming.py b/surfsense_backend/app/utils/connector_naming.py index 731f419d6..f9f1fdd21 100644 --- a/surfsense_backend/app/utils/connector_naming.py +++ b/surfsense_backend/app/utils/connector_naming.py @@ -20,7 +20,6 @@ BASE_NAME_FOR_TYPE = { SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: "Google Drive", SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", SearchSourceConnectorType.SLACK_CONNECTOR: "Slack", - SearchSourceConnectorType.TEAMS_CONNECTOR: "Microsoft Teams", SearchSourceConnectorType.NOTION_CONNECTOR: "Notion", SearchSourceConnectorType.LINEAR_CONNECTOR: "Linear", SearchSourceConnectorType.JIRA_CONNECTOR: "Jira", @@ -54,9 +53,6 @@ def extract_identifier_from_credentials( if connector_type == SearchSourceConnectorType.SLACK_CONNECTOR: return credentials.get("team_name") - if connector_type == SearchSourceConnectorType.TEAMS_CONNECTOR: - return credentials.get("tenant_name") - if connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: return credentials.get("workspace_name") diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 219641933..c95f407a4 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -19,7 +19,6 @@ logger = logging.getLogger(__name__) # Mapping of connector types to their corresponding Celery task names CONNECTOR_TASK_MAP = { SearchSourceConnectorType.SLACK_CONNECTOR: "index_slack_messages", - SearchSourceConnectorType.TEAMS_CONNECTOR: "index_teams_messages", SearchSourceConnectorType.NOTION_CONNECTOR: "index_notion_pages", SearchSourceConnectorType.GITHUB_CONNECTOR: "index_github_repos", SearchSourceConnectorType.LINEAR_CONNECTOR: "index_linear_issues", diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/teams-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/teams-config.tsx deleted file mode 100644 index ac08a6c03..000000000 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/teams-config.tsx +++ /dev/null @@ -1,29 +0,0 @@ -"use client"; - -import { Info } from "lucide-react"; -import type { FC } from "react"; -import type { ConnectorConfigProps } from "../index"; - -export interface TeamsConfigProps extends ConnectorConfigProps { - onNameChange?: (name: string) => void; -} - -export const TeamsConfig: FC = () => { - return ( -
-
-
- -
-
-

Microsoft Teams Access

-

- SurfSense will index messages from Teams channels that you have access to. The app can - only read messages from teams and channels where you are a member. Make sure you're a - member of the teams you want to index before connecting. -

-
-
-
- ); -}; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx index 267e85115..2575b3a69 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx @@ -17,7 +17,6 @@ import { LumaConfig } from "./components/luma-config"; import { SearxngConfig } from "./components/searxng-config"; import { SlackConfig } from "./components/slack-config"; import { TavilyApiConfig } from "./components/tavily-api-config"; -import { TeamsConfig } from "./components/teams-config"; import { WebcrawlerConfig } from "./components/webcrawler-config"; export interface ConnectorConfigProps { @@ -53,8 +52,6 @@ export function getConnectorConfigComponent( return SlackConfig; case "DISCORD_CONNECTOR": return DiscordConfig; - case "TEAMS_CONNECTOR": - return TeamsConfig; case "CONFLUENCE_CONNECTOR": return ConfluenceConfig; case "BOOKSTACK_CONNECTOR": diff --git a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts index 23982e6f3..287bc30f4 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts @@ -51,13 +51,6 @@ export const OAUTH_CONNECTORS = [ connectorType: EnumConnectorName.SLACK_CONNECTOR, authEndpoint: "/api/v1/auth/slack/connector/add/", }, - { - id: "teams-connector", - title: "Microsoft Teams", - description: "Search Teams messages", - connectorType: EnumConnectorName.TEAMS_CONNECTOR, - authEndpoint: "/api/v1/auth/teams/connector/add/", - }, { id: "discord-connector", title: "Discord", diff --git a/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts b/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts index 433a51e8c..a0b271eb6 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts @@ -11,7 +11,6 @@ export const CONNECTOR_TO_DOCUMENT_TYPE: Record = { // Direct mappings (connector type matches document type) SLACK_CONNECTOR: "SLACK_CONNECTOR", - TEAMS_CONNECTOR: "TEAMS_CONNECTOR", NOTION_CONNECTOR: "NOTION_CONNECTOR", GITHUB_CONNECTOR: "GITHUB_CONNECTOR", LINEAR_CONNECTOR: "LINEAR_CONNECTOR", diff --git a/surfsense_web/contracts/enums/connector.ts b/surfsense_web/contracts/enums/connector.ts index fc65585e2..ae80cf871 100644 --- a/surfsense_web/contracts/enums/connector.ts +++ b/surfsense_web/contracts/enums/connector.ts @@ -4,7 +4,6 @@ export enum EnumConnectorName { LINKUP_API = "LINKUP_API", BAIDU_SEARCH_API = "BAIDU_SEARCH_API", SLACK_CONNECTOR = "SLACK_CONNECTOR", - TEAMS_CONNECTOR = "TEAMS_CONNECTOR", NOTION_CONNECTOR = "NOTION_CONNECTOR", GITHUB_CONNECTOR = "GITHUB_CONNECTOR", LINEAR_CONNECTOR = "LINEAR_CONNECTOR", diff --git a/surfsense_web/contracts/enums/connectorIcons.tsx b/surfsense_web/contracts/enums/connectorIcons.tsx index befe132f9..22bc734aa 100644 --- a/surfsense_web/contracts/enums/connectorIcons.tsx +++ b/surfsense_web/contracts/enums/connectorIcons.tsx @@ -31,8 +31,6 @@ export const getConnectorIcon = (connectorType: EnumConnectorName | string, clas return Baidu; case EnumConnectorName.SLACK_CONNECTOR: return Slack; - case EnumConnectorName.TEAMS_CONNECTOR: - return Microsoft Teams; case EnumConnectorName.NOTION_CONNECTOR: return Notion; case EnumConnectorName.DISCORD_CONNECTOR: diff --git a/surfsense_web/contracts/types/connector.types.ts b/surfsense_web/contracts/types/connector.types.ts index f864ae16f..5b67297ae 100644 --- a/surfsense_web/contracts/types/connector.types.ts +++ b/surfsense_web/contracts/types/connector.types.ts @@ -8,7 +8,6 @@ export const searchSourceConnectorTypeEnum = z.enum([ "LINKUP_API", "BAIDU_SEARCH_API", "SLACK_CONNECTOR", - "TEAMS_CONNECTOR", "NOTION_CONNECTOR", "GITHUB_CONNECTOR", "LINEAR_CONNECTOR",