diff --git a/.vscode/settings.json b/.vscode/settings.json index f134660b6..42d09dcad 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "biome.configurationPath": "./surfsense_web/biome.json" + "biome.configurationPath": "./surfsense_web/biome.json", + "python-envs.pythonProjects": [] } \ No newline at end of file diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 2c2fec48b..6ac7c55de 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -76,6 +76,11 @@ SLACK_CLIENT_ID=your_slack_client_id_here SLACK_CLIENT_SECRET=your_slack_client_secret_here SLACK_REDIRECT_URI=http://localhost:8000/api/v1/auth/slack/connector/callback +# Teams OAuth Configuration +TEAMS_CLIENT_ID=your_teams_client_id_here +TEAMS_CLIENT_SECRET=your_teams_client_secret_here +TEAMS_REDIRECT_URI=http://localhost:8000/api/v1/auth/teams/connector/callback + # Embedding Model # Examples: # # Get sentence transformers embeddings diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index e76e69e94..448e2c253 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -117,6 +117,11 @@ class Config: DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI") DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN") + # Microsoft Teams OAuth + TEAMS_CLIENT_ID = os.getenv("TEAMS_CLIENT_ID") + TEAMS_CLIENT_SECRET = os.getenv("TEAMS_CLIENT_SECRET") + TEAMS_REDIRECT_URI = os.getenv("TEAMS_REDIRECT_URI") + # ClickUp OAuth CLICKUP_CLIENT_ID = os.getenv("CLICKUP_CLIENT_ID") CLICKUP_CLIENT_SECRET = os.getenv("CLICKUP_CLIENT_SECRET") diff --git a/surfsense_backend/app/connectors/teams_connector.py b/surfsense_backend/app/connectors/teams_connector.py new file mode 100644 index 000000000..e11a2aad0 --- /dev/null +++ b/surfsense_backend/app/connectors/teams_connector.py @@ -0,0 +1,323 @@ +""" +Microsoft Teams Connector + +A module for interacting with Microsoft Teams Graph API to retrieve teams, channels, and message history. + +Supports OAuth-based authentication with token refresh. +""" + +import logging +from datetime import datetime +from typing import Any + +import httpx +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.db import SearchSourceConnector +from app.routes.teams_add_connector_route import refresh_teams_token +from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase +from app.utils.oauth_security import TokenEncryption + +logger = logging.getLogger(__name__) + + +class TeamsConnector: + """Class for retrieving teams, channels, and message history from Microsoft Teams.""" + + # Microsoft Graph API endpoints + GRAPH_API_BASE = "https://graph.microsoft.com/v1.0" + + def __init__( + self, + access_token: str | None = None, + session: AsyncSession | None = None, + connector_id: int | None = None, + credentials: TeamsAuthCredentialsBase | None = None, + ): + """ + Initialize the TeamsConnector with an access token or OAuth credentials. + + Args: + access_token: Microsoft Graph API access token (optional, for backward compatibility) + session: Database session for token refresh (optional) + connector_id: Connector ID for token refresh (optional) + credentials: Teams OAuth credentials (optional, will be loaded from DB if not provided) + """ + self._session = session + self._connector_id = connector_id + self._credentials = credentials + self._access_token = access_token + + async def _get_valid_token(self) -> str: + """ + Get valid Microsoft Teams access token, refreshing if needed. + + Returns: + Valid access token + + Raises: + ValueError: If credentials are missing or invalid + Exception: If token refresh fails + """ + # If we have a direct token (backward compatibility), use it + if ( + self._access_token + and self._session is None + and self._connector_id is None + and self._credentials is None + ): + return self._access_token + + # Load credentials from DB if not provided + if self._credentials is None: + if not self._session or not self._connector_id: + raise ValueError( + "Cannot load credentials: session and connector_id required" + ) + + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise ValueError(f"Connector {self._connector_id} not found") + + config_data = connector.config.copy() + + # Decrypt credentials if they are encrypted + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + try: + token_encryption = TokenEncryption(config.SECRET_KEY) + + # Decrypt sensitive fields + if config_data.get("access_token"): + config_data["access_token"] = token_encryption.decrypt_token( + config_data["access_token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + + logger.info( + "Decrypted Teams credentials for connector %s", + self._connector_id, + ) + except Exception as e: + logger.error( + "Failed to decrypt Teams credentials for connector %s: %s", + self._connector_id, + str(e), + ) + raise ValueError( + f"Failed to decrypt Teams credentials: {e!s}" + ) from e + + try: + self._credentials = TeamsAuthCredentialsBase.from_dict(config_data) + except Exception as e: + raise ValueError(f"Invalid Teams credentials: {e!s}") from e + + # Check if token is expired and refreshable + if self._credentials.is_expired and self._credentials.is_refreshable: + try: + logger.info( + "Teams token expired for connector %s, refreshing...", + self._connector_id, + ) + + # Get connector for refresh + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise RuntimeError( + f"Connector {self._connector_id} not found; cannot refresh token." + ) + + # Refresh token + connector = await refresh_teams_token(self._session, connector) + + # Reload credentials after refresh + config_data = connector.config.copy() + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("access_token"): + config_data["access_token"] = token_encryption.decrypt_token( + config_data["access_token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + + self._credentials = TeamsAuthCredentialsBase.from_dict(config_data) + + logger.info( + "Successfully refreshed Teams token for connector %s", + self._connector_id, + ) + except Exception as e: + logger.error( + "Failed to refresh Teams token for connector %s: %s", + self._connector_id, + str(e), + ) + raise ValueError( + f"Failed to refresh Teams OAuth credentials: {e!s}" + ) from e + + return self._credentials.access_token + + async def get_joined_teams(self) -> list[dict[str, Any]]: + """ + Get list of all teams the user is a member of. + + Returns: + List of team objects with id, display_name, etc. + """ + access_token = await self._get_valid_token() + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.GRAPH_API_BASE}/me/joinedTeams", + headers={"Authorization": f"Bearer {access_token}"}, + timeout=30.0, + ) + + if response.status_code != 200: + raise ValueError( + f"Failed to get joined teams: {response.status_code} - {response.text}" + ) + + data = response.json() + return data.get("value", []) + + async def get_team_channels(self, team_id: str) -> list[dict[str, Any]]: + """ + Get list of all channels in a team. + + Args: + team_id: The team ID + + Returns: + List of channel objects + """ + access_token = await self._get_valid_token() + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.GRAPH_API_BASE}/teams/{team_id}/channels", + headers={"Authorization": f"Bearer {access_token}"}, + timeout=30.0, + ) + + if response.status_code != 200: + raise ValueError( + f"Failed to get channels for team {team_id}: {response.status_code} - {response.text}" + ) + + data = response.json() + return data.get("value", []) + + async def get_channel_messages( + self, + team_id: str, + channel_id: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> list[dict[str, Any]]: + """ + Get messages from a specific channel with optional date filtering. + + Args: + team_id: The team ID + channel_id: The channel ID + start_date: Optional start date for filtering messages + end_date: Optional end date for filtering messages + + Returns: + List of message objects + """ + access_token = await self._get_valid_token() + + async with httpx.AsyncClient() as client: + url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages" + + # Build query parameters for date filtering if needed + params = {} + if start_date or end_date: + filter_parts = [] + if start_date: + filter_parts.append( + f"createdDateTime ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" + ) + if end_date: + filter_parts.append( + f"createdDateTime le {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" + ) + if filter_parts: + params["$filter"] = " and ".join(filter_parts) + + response = await client.get( + url, + headers={"Authorization": f"Bearer {access_token}"}, + params=params, + timeout=30.0, + ) + + if response.status_code != 200: + raise ValueError( + f"Failed to get messages from channel {channel_id}: {response.status_code} - {response.text}" + ) + + data = response.json() + return data.get("value", []) + + async def get_message_replies( + self, team_id: str, channel_id: str, message_id: str + ) -> list[dict[str, Any]]: + """ + Get replies to a specific message. + + Args: + team_id: The team ID + channel_id: The channel ID + message_id: The message ID + + Returns: + List of reply message objects + """ + access_token = await self._get_valid_token() + + async with httpx.AsyncClient() as client: + url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages/{message_id}/replies" + + response = await client.get( + url, + headers={"Authorization": f"Bearer {access_token}"}, + timeout=30.0, + ) + + if response.status_code != 200: + logger.warning( + "Failed to get replies for message %s: %s - %s", + message_id, + response.status_code, + response.text, + ) + return [] + + data = response.json() + return data.get("value", []) diff --git a/surfsense_backend/app/connectors/teams_history.py b/surfsense_backend/app/connectors/teams_history.py new file mode 100644 index 000000000..314ee6304 --- /dev/null +++ b/surfsense_backend/app/connectors/teams_history.py @@ -0,0 +1,254 @@ +""" +Microsoft Teams History Module + +A module for retrieving conversation history from Microsoft Teams channels. +Allows fetching team lists, channel lists, and message history with date range filtering. +""" + +import logging +from datetime import datetime +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.teams_connector import TeamsConnector +from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase + +logger = logging.getLogger(__name__) + + +class TeamsHistory: + """Class for retrieving conversation history from Microsoft Teams channels.""" + + def __init__( + self, + access_token: str | None = None, + session: AsyncSession | None = None, + connector_id: int | None = None, + credentials: TeamsAuthCredentialsBase | None = None, + ): + """ + Initialize the TeamsHistory class. + + Args: + access_token: Microsoft Graph API access token (optional, for backward compatibility) + session: Database session for token refresh (optional) + connector_id: Connector ID for token refresh (optional) + credentials: Teams OAuth credentials (optional, will be loaded from DB if not provided) + """ + self.connector = TeamsConnector( + access_token=access_token, + session=session, + connector_id=connector_id, + credentials=credentials, + ) + + async def get_all_teams(self) -> list[dict[str, Any]]: + """ + Get list of all teams the user has access to. + + Returns: + List of team objects containing team metadata. + """ + try: + teams = await self.connector.get_joined_teams() + logger.info("Retrieved %s teams", len(teams)) + return teams + except Exception as e: + logger.error("Error fetching teams: %s", str(e)) + raise + + async def get_channels_for_team(self, team_id: str) -> list[dict[str, Any]]: + """ + Get list of all channels in a specific team. + + Args: + team_id: The ID of the team + + Returns: + List of channel objects containing channel metadata. + """ + try: + channels = await self.connector.get_team_channels(team_id) + logger.info("Retrieved %s channels for team %s", len(channels), team_id) + return channels + except Exception as e: + logger.error("Error fetching channels for team %s: %s", team_id, str(e)) + raise + + async def get_messages_from_channel( + self, + team_id: str, + channel_id: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + include_replies: bool = True, + ) -> list[dict[str, Any]]: + """ + Get messages from a specific channel with optional date filtering. + + Args: + team_id: The ID of the team + channel_id: The ID of the channel + start_date: Optional start date for filtering messages + end_date: Optional end date for filtering messages + include_replies: Whether to include reply messages (default: True) + + Returns: + List of message objects with content and metadata. + """ + try: + messages = await self.connector.get_channel_messages( + team_id, channel_id, start_date, end_date + ) + + logger.info( + "Retrieved %s messages from channel %s in team %s", + len(messages), + channel_id, + team_id, + ) + + # Fetch replies if requested + if include_replies: + all_messages = [] + for message in messages: + all_messages.append(message) + # Get replies for this message + try: + replies = await self.connector.get_message_replies( + team_id, channel_id, message.get("id") + ) + all_messages.extend(replies) + except Exception: + logger.warning( + "Failed to get replies for message %s", + message.get("id"), + exc_info=True, + ) + # Continue without replies for this message + + logger.info( + "Total messages including replies: %s for channel %s", + len(all_messages), + channel_id, + ) + return all_messages + + return messages + + except Exception as e: + logger.error( + "Error fetching messages from channel %s in team %s: %s", + channel_id, + team_id, + str(e), + ) + raise + + async def get_all_messages_from_team( + self, + team_id: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + include_replies: bool = True, + ) -> dict[str, list[dict[str, Any]]]: + """ + Get all messages from all channels in a team. + + Args: + team_id: The ID of the team + start_date: Optional start date for filtering messages + end_date: Optional end date for filtering messages + include_replies: Whether to include reply messages (default: True) + + Returns: + Dictionary mapping channel IDs to lists of messages. + """ + try: + channels = await self.get_channels_for_team(team_id) + all_channel_messages = {} + + for channel in channels: + channel_id = channel.get("id") + channel_name = channel.get("displayName", "Unknown") + + try: + messages = await self.get_messages_from_channel( + team_id, channel_id, start_date, end_date, include_replies + ) + all_channel_messages[channel_id] = messages + logger.info( + "Fetched %s messages from channel '%s' (%s)", + len(messages), + channel_name, + channel_id, + ) + except Exception: + logger.error( + "Failed to fetch messages from channel '%s' (%s)", + channel_name, + channel_id, + exc_info=True, + ) + all_channel_messages[channel_id] = [] + + return all_channel_messages + + except Exception as e: + logger.error("Error fetching messages from team %s: %s", team_id, str(e)) + raise + + async def get_all_messages( + self, + start_date: datetime | None = None, + end_date: datetime | None = None, + include_replies: bool = True, + ) -> dict[str, dict[str, list[dict[str, Any]]]]: + """ + Get all messages from all teams and channels the user has access to. + + Args: + start_date: Optional start date for filtering messages + end_date: Optional end date for filtering messages + include_replies: Whether to include reply messages (default: True) + + Returns: + Nested dictionary: team_id -> channel_id -> list of messages. + """ + try: + teams = await self.get_all_teams() + all_messages = {} + + for team in teams: + team_id = team.get("id") + team_name = team.get("displayName", "Unknown") + + try: + team_messages = await self.get_all_messages_from_team( + team_id, start_date, end_date, include_replies + ) + all_messages[team_id] = team_messages + total_messages = sum( + len(messages) for messages in team_messages.values() + ) + logger.info( + "Fetched %s total messages from team '%s' (%s)", + total_messages, + team_name, + team_id, + ) + except Exception: + logger.error( + "Failed to fetch messages from team '%s' (%s)", + team_name, + team_id, + exc_info=True, + ) + all_messages[team_id] = {} + + return all_messages + + except Exception as e: + logger.error("Error fetching all messages: %s", str(e)) + raise diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index fbd53bd06..d54254f9c 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -36,6 +36,7 @@ class DocumentType(str, Enum): CRAWLED_URL = "CRAWLED_URL" FILE = "FILE" SLACK_CONNECTOR = "SLACK_CONNECTOR" + TEAMS_CONNECTOR = "TEAMS_CONNECTOR" NOTION_CONNECTOR = "NOTION_CONNECTOR" YOUTUBE_VIDEO = "YOUTUBE_VIDEO" GITHUB_CONNECTOR = "GITHUB_CONNECTOR" @@ -62,6 +63,7 @@ class SearchSourceConnectorType(str, Enum): LINKUP_API = "LINKUP_API" BAIDU_SEARCH_API = "BAIDU_SEARCH_API" # Baidu AI Search API for Chinese web search SLACK_CONNECTOR = "SLACK_CONNECTOR" + TEAMS_CONNECTOR = "TEAMS_CONNECTOR" NOTION_CONNECTOR = "NOTION_CONNECTOR" GITHUB_CONNECTOR = "GITHUB_CONNECTOR" LINEAR_CONNECTOR = "LINEAR_CONNECTOR" diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index 47d540e7d..b4e94c732 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -31,6 +31,7 @@ from .rbac_routes import router as rbac_router from .search_source_connectors_routes import router as search_source_connectors_router from .search_spaces_routes import router as search_spaces_router from .slack_add_connector_route import router as slack_add_connector_router +from .teams_add_connector_route import router as teams_add_connector_router router = APIRouter() @@ -50,6 +51,7 @@ router.include_router(linear_add_connector_router) router.include_router(luma_add_connector_router) router.include_router(notion_add_connector_router) router.include_router(slack_add_connector_router) +router.include_router(teams_add_connector_router) router.include_router(discord_add_connector_router) router.include_router(jira_add_connector_router) router.include_router(confluence_add_connector_router) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 58a50a6f8..337e1af85 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1188,6 +1188,69 @@ async def run_discord_indexing( logger.error(f"Error in background Discord indexing task: {e!s}") +async def run_teams_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """ + Create a new session and run the Microsoft Teams indexing task. + This prevents session leaks by creating a dedicated session for the background task. + """ + async with async_session_maker() as session: + await run_teams_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + + +async def run_teams_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """ + Background task to run Microsoft Teams indexing. + Args: + session: Database session + connector_id: ID of the Teams connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + try: + from app.tasks.connector_indexers.teams_indexer import index_teams_messages + + # Index Teams messages without updating last_indexed_at (we'll do it separately) + documents_processed, error_or_warning = await index_teams_messages( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + update_last_indexed=False, # Don't update timestamp in the indexing function + ) + + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) + if documents_processed > 0: + await update_connector_last_indexed(session, connector_id) + logger.info( + f"Teams indexing completed successfully: {documents_processed} documents processed" + ) + else: + logger.error( + f"Teams indexing failed or no documents processed: {error_or_warning}" + ) + except Exception as e: + logger.error(f"Error in background Teams indexing task: {e!s}") + + # Add new helper functions for Jira indexing async def run_jira_indexing_with_new_session( connector_id: int, diff --git a/surfsense_backend/app/routes/teams_add_connector_route.py b/surfsense_backend/app/routes/teams_add_connector_route.py new file mode 100644 index 000000000..a84db47c9 --- /dev/null +++ b/surfsense_backend/app/routes/teams_add_connector_route.py @@ -0,0 +1,473 @@ +""" +Microsoft Teams Connector OAuth Routes. + +Handles OAuth 2.0 authentication flow for Microsoft Teams connector using Microsoft Graph API. +""" + +import logging +from datetime import UTC, datetime, timedelta +from uuid import UUID + +import httpx +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import RedirectResponse +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase +from app.users import current_active_user +from app.utils.connector_naming import ( + check_duplicate_connector, + extract_identifier_from_credentials, + generate_unique_connector_name, +) +from app.utils.oauth_security import OAuthStateManager, TokenEncryption + +logger = logging.getLogger(__name__) + +router = APIRouter() + +# Microsoft identity platform endpoints +AUTHORIZATION_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize" +TOKEN_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/token" + +# OAuth scopes for Microsoft Teams (Graph API) +SCOPES = [ + "offline_access", # Required for refresh tokens + "User.Read", # Read user profile + "Team.ReadBasic.All", # Read basic team information + "Channel.ReadBasic.All", # Read basic channel information + "ChannelMessage.Read.All", # Read messages in channels +] + +# Initialize security utilities +_state_manager = None +_token_encryption = None + + +def get_state_manager() -> OAuthStateManager: + """Get or create OAuth state manager instance.""" + global _state_manager + if _state_manager is None: + if not config.SECRET_KEY: + raise ValueError("SECRET_KEY must be set for OAuth security") + _state_manager = OAuthStateManager(config.SECRET_KEY) + return _state_manager + + +def get_token_encryption() -> TokenEncryption: + """Get or create token encryption instance.""" + global _token_encryption + if _token_encryption is None: + if not config.SECRET_KEY: + raise ValueError("SECRET_KEY must be set for token encryption") + _token_encryption = TokenEncryption(config.SECRET_KEY) + return _token_encryption + + +@router.get("/auth/teams/connector/add") +async def connect_teams(space_id: int, user: User = Depends(current_active_user)): + """ + Initiate Microsoft Teams OAuth flow. + + Args: + space_id: The search space ID + user: Current authenticated user + + Returns: + Authorization URL for redirect + """ + try: + if not space_id: + raise HTTPException(status_code=400, detail="space_id is required") + + if not config.TEAMS_CLIENT_ID: + raise HTTPException( + status_code=500, detail="Microsoft Teams OAuth not configured." + ) + + if not config.SECRET_KEY: + raise HTTPException( + status_code=500, detail="SECRET_KEY not configured for OAuth security." + ) + + # Generate secure state parameter with HMAC signature + state_manager = get_state_manager() + state_encoded = state_manager.generate_secure_state(space_id, user.id) + + # Build authorization URL + from urllib.parse import urlencode + + auth_params = { + "client_id": config.TEAMS_CLIENT_ID, + "response_type": "code", + "redirect_uri": config.TEAMS_REDIRECT_URI, + "response_mode": "query", + "scope": " ".join(SCOPES), + "state": state_encoded, + } + + auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}" + + logger.info( + "Generated Microsoft Teams OAuth URL for user %s, space %s", + user.id, + space_id, + ) + return {"auth_url": auth_url} + + except Exception as e: + logger.error( + "Failed to initiate Microsoft Teams OAuth: %s", str(e), exc_info=True + ) + raise HTTPException( + status_code=500, + detail=f"Failed to initiate Microsoft Teams OAuth: {e!s}", + ) from e + + +@router.get("/auth/teams/connector/callback") +async def teams_callback( + code: str | None = None, + error: str | None = None, + error_description: str | None = None, + state: str | None = None, + session: AsyncSession = Depends(get_async_session), +): + """ + Handle Microsoft Teams OAuth callback. + + Args: + code: Authorization code from Microsoft (if user granted access) + error: Error code from Microsoft (if user denied access or error occurred) + error_description: Human-readable error description + state: State parameter containing user/space info + session: Database session + + Returns: + Redirect response to frontend + """ + try: + # Handle OAuth errors (e.g., user denied access) + if error: + error_msg = error_description or error + logger.warning("Microsoft Teams OAuth error: %s", error_msg) + redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=teams_auth_failed&message={error_msg}" + return RedirectResponse(url=redirect_url) + + # Validate required parameters + if not code or not state: + raise HTTPException( + status_code=400, detail="Missing required OAuth parameters" + ) + + # Verify and decode state parameter + state_manager = get_state_manager() + try: + data = state_manager.validate_state(state) + space_id = data["space_id"] + user_id = UUID(data["user_id"]) + except (HTTPException, ValueError, KeyError) as e: + logger.error("Invalid OAuth state: %s", str(e)) + redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state" + return RedirectResponse(url=redirect_url) + + # Exchange authorization code for access token + token_data = { + "client_id": config.TEAMS_CLIENT_ID, + "client_secret": config.TEAMS_CLIENT_SECRET, + "code": code, + "redirect_uri": config.TEAMS_REDIRECT_URI, + "grant_type": "authorization_code", + } + + async with httpx.AsyncClient() as client: + token_response = await client.post( + TOKEN_URL, + data=token_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + + if token_response.status_code != 200: + error_detail = token_response.text + try: + error_json = token_response.json() + error_detail = error_json.get("error_description", error_detail) + except Exception: + pass + raise HTTPException( + status_code=400, detail=f"Token exchange failed: {error_detail}" + ) + + token_json = token_response.json() + + # Extract tokens from response + access_token = token_json.get("access_token") + refresh_token = token_json.get("refresh_token") + + if not access_token: + raise HTTPException( + status_code=400, detail="No access token received from Microsoft" + ) + + # Encrypt sensitive tokens before storing + token_encryption = get_token_encryption() + + # Calculate expiration time (UTC, tz-aware) + expires_at = None + if token_json.get("expires_in"): + now_utc = datetime.now(UTC) + expires_at = now_utc + timedelta(seconds=int(token_json["expires_in"])) + + # Fetch user info from Microsoft Graph API + user_info = {} + tenant_info = {} + try: + async with httpx.AsyncClient() as client: + # Get user profile + user_response = await client.get( + "https://graph.microsoft.com/v1.0/me", + headers={"Authorization": f"Bearer {access_token}"}, + timeout=30.0, + ) + if user_response.status_code == 200: + user_data = user_response.json() + user_info = { + "user_id": user_data.get("id"), + "user_name": user_data.get("displayName"), + "user_email": user_data.get("mail") + or user_data.get("userPrincipalName"), + } + + # Get organization/tenant info + org_response = await client.get( + "https://graph.microsoft.com/v1.0/organization", + headers={"Authorization": f"Bearer {access_token}"}, + timeout=30.0, + ) + if org_response.status_code == 200: + org_data = org_response.json() + if org_data.get("value") and len(org_data["value"]) > 0: + org = org_data["value"][0] + tenant_info = { + "tenant_id": org.get("id"), + "tenant_name": org.get("displayName"), + } + except Exception as e: + logger.warning( + "Failed to fetch user/tenant info from Microsoft Graph: %s", str(e) + ) + + # Store the encrypted tokens and user/tenant info in connector config + connector_config = { + "access_token": token_encryption.encrypt_token(access_token), + "refresh_token": token_encryption.encrypt_token(refresh_token) + if refresh_token + else None, + "token_type": token_json.get("token_type", "Bearer"), + "expires_in": token_json.get("expires_in"), + "expires_at": expires_at.isoformat() if expires_at else None, + "scope": token_json.get("scope"), + "tenant_id": tenant_info.get("tenant_id"), + "tenant_name": tenant_info.get("tenant_name"), + "user_id": user_info.get("user_id"), + # Mark that token is encrypted for backward compatibility + "_token_encrypted": True, + } + + # Extract unique identifier from connector credentials + connector_identifier = extract_identifier_from_credentials( + SearchSourceConnectorType.TEAMS_CONNECTOR, connector_config + ) + + # Check for duplicate connector (same tenant already connected) + is_duplicate = await check_duplicate_connector( + session, + SearchSourceConnectorType.TEAMS_CONNECTOR, + space_id, + user_id, + connector_identifier, + ) + + if is_duplicate: + logger.warning( + "Duplicate Microsoft Teams connector for user %s, space %s, tenant %s", + user_id, + space_id, + tenant_info.get("tenant_name"), + ) + redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=duplicate_connector&message=This Microsoft Teams tenant is already connected to this space" + return RedirectResponse(url=redirect_url) + + # Generate unique connector name + connector_name = await generate_unique_connector_name( + session, + SearchSourceConnectorType.TEAMS_CONNECTOR, + space_id, + connector_config, + ) + + # Create new connector + new_connector = SearchSourceConnector( + connector_type=SearchSourceConnectorType.TEAMS_CONNECTOR, + config=connector_config, + is_enabled=True, + search_space_id=space_id, + user_id=user_id, + connector_name=connector_name, + ) + + try: + session.add(new_connector) + await session.commit() + await session.refresh(new_connector) + + logger.info( + "Successfully created Microsoft Teams connector %s for user %s", + new_connector.id, + user_id, + ) + + # Redirect to frontend with success + redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?success=teams_connected&connector_id={new_connector.id}" + return RedirectResponse(url=redirect_url) + + except IntegrityError as e: + await session.rollback() + logger.error("Database integrity error creating Teams connector: %s", str(e)) + redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed" + return RedirectResponse(url=redirect_url) + + except HTTPException: + raise + except (IntegrityError, ValueError) as e: + logger.error("Teams OAuth callback error: %s", str(e), exc_info=True) + redirect_url = f"{config.NEXT_FRONTEND_URL}/dashboard?error=teams_auth_error" + return RedirectResponse(url=redirect_url) + + +async def refresh_teams_token( + session: AsyncSession, connector: SearchSourceConnector +) -> SearchSourceConnector: + """ + Refresh Microsoft Teams OAuth tokens. + + Args: + session: Database session + connector: The connector to refresh + + Returns: + Updated connector with refreshed tokens + + Raises: + HTTPException: If token refresh fails + """ + logger.info( + "Refreshing Microsoft Teams OAuth tokens for connector %s", connector.id + ) + + credentials = TeamsAuthCredentialsBase.from_dict(connector.config) + + # Decrypt tokens if they are encrypted + token_encryption = get_token_encryption() + is_encrypted = connector.config.get("_token_encrypted", False) + refresh_token = credentials.refresh_token + + if is_encrypted and refresh_token: + try: + refresh_token = token_encryption.decrypt_token(refresh_token) + except Exception as e: + logger.error("Failed to decrypt refresh token: %s", str(e)) + raise HTTPException( + status_code=500, detail="Failed to decrypt stored refresh token" + ) from e + + if not refresh_token: + raise HTTPException( + status_code=400, + detail=f"No refresh token available for connector {connector.id}", + ) + + # Microsoft uses oauth2/v2.0/token for token refresh + refresh_data = { + "client_id": config.TEAMS_CLIENT_ID, + "client_secret": config.TEAMS_CLIENT_SECRET, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "scope": " ".join(SCOPES), + } + + async with httpx.AsyncClient() as client: + token_response = await client.post( + TOKEN_URL, + data=refresh_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + + if token_response.status_code != 200: + error_detail = token_response.text + try: + error_json = token_response.json() + error_detail = error_json.get("error_description", error_detail) + except Exception: + pass + raise HTTPException( + status_code=400, detail=f"Token refresh failed: {error_detail}" + ) + + token_json = token_response.json() + + # Extract new tokens + access_token = token_json.get("access_token") + new_refresh_token = token_json.get("refresh_token") + + if not access_token: + raise HTTPException( + status_code=400, detail="No access token received from Microsoft refresh" + ) + + # Calculate expiration time (UTC, tz-aware) + expires_at = None + expires_in = token_json.get("expires_in") + if expires_in: + now_utc = datetime.now(UTC) + expires_at = now_utc + timedelta(seconds=int(expires_in)) + + # Update credentials object with encrypted tokens + credentials.access_token = token_encryption.encrypt_token(access_token) + if new_refresh_token: + credentials.refresh_token = token_encryption.encrypt_token(new_refresh_token) + credentials.expires_in = expires_in + credentials.expires_at = expires_at + credentials.scope = token_json.get("scope") + + # Preserve tenant/user info + if not credentials.tenant_id: + credentials.tenant_id = connector.config.get("tenant_id") + if not credentials.tenant_name: + credentials.tenant_name = connector.config.get("tenant_name") + if not credentials.user_id: + credentials.user_id = connector.config.get("user_id") + + # Update connector config with encrypted tokens + credentials_dict = credentials.to_dict() + credentials_dict["_token_encrypted"] = True + connector.config = credentials_dict + + await session.commit() + await session.refresh(connector) + + logger.info( + "Successfully refreshed Microsoft Teams tokens for connector %s", connector.id + ) + + return connector diff --git a/surfsense_backend/app/schemas/teams_auth_credentials.py b/surfsense_backend/app/schemas/teams_auth_credentials.py new file mode 100644 index 000000000..41688b102 --- /dev/null +++ b/surfsense_backend/app/schemas/teams_auth_credentials.py @@ -0,0 +1,79 @@ +""" +Microsoft Teams OAuth credentials schema. +""" + +from datetime import UTC, datetime + +from pydantic import BaseModel, field_validator + + +class TeamsAuthCredentialsBase(BaseModel): + """Microsoft Teams OAuth credentials.""" + + access_token: str + refresh_token: str | None = None + token_type: str = "Bearer" + expires_in: int | None = None + expires_at: datetime | None = None + scope: str | None = None + tenant_id: str | None = None + tenant_name: str | None = None + user_id: str | None = None + + @property + def is_expired(self) -> bool: + """Check if the credentials have expired.""" + if self.expires_at is None: + return False + return self.expires_at <= datetime.now(UTC) + + @property + def is_refreshable(self) -> bool: + """Check if the credentials can be refreshed.""" + return self.refresh_token is not None + + def to_dict(self) -> dict: + """Convert credentials to dictionary for storage.""" + return { + "access_token": self.access_token, + "refresh_token": self.refresh_token, + "token_type": self.token_type, + "expires_in": self.expires_in, + "expires_at": self.expires_at.isoformat() if self.expires_at else None, + "scope": self.scope, + "tenant_id": self.tenant_id, + "tenant_name": self.tenant_name, + "user_id": self.user_id, + } + + @classmethod + def from_dict(cls, data: dict) -> "TeamsAuthCredentialsBase": + """Create credentials from dictionary.""" + expires_at = None + if data.get("expires_at"): + expires_at = datetime.fromisoformat(data["expires_at"]) + + return cls( + access_token=data.get("access_token", ""), + refresh_token=data.get("refresh_token"), + token_type=data.get("token_type", "Bearer"), + expires_in=data.get("expires_in"), + expires_at=expires_at, + scope=data.get("scope"), + tenant_id=data.get("tenant_id"), + tenant_name=data.get("tenant_name"), + user_id=data.get("user_id"), + ) + + @field_validator("expires_at", mode="before") + @classmethod + def ensure_aware_utc(cls, v): + """Ensure datetime is timezone-aware (UTC).""" + if isinstance(v, str): + if v.endswith("Z"): + return datetime.fromisoformat(v.replace("Z", "+00:00")) + dt = datetime.fromisoformat(v) + return dt if dt.tzinfo else dt.replace(tzinfo=UTC) + if isinstance(v, datetime): + return v if v.tzinfo else v.replace(tzinfo=UTC) + return v diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 4e874729c..832aee4cc 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -2269,6 +2269,80 @@ class ConnectorService: return result_object, discord_docs + async def search_teams( + self, + user_query: str, + search_space_id: int, + top_k: int = 20, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> tuple: + """ + Search for Microsoft Teams messages and return both the source information and langchain documents. + + Uses combined chunk-level and document-level hybrid search with RRF fusion. + + Args: + user_query: The user's query + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + start_date: Optional start date for filtering documents by updated_at + end_date: Optional end date for filtering documents by updated_at + + Returns: + tuple: (sources_info, langchain_documents) + """ + teams_docs = await self._combined_rrf_search( + query_text=user_query, + search_space_id=search_space_id, + document_type="TEAMS_CONNECTOR", + top_k=top_k, + start_date=start_date, + end_date=end_date, + ) + + # Early return if no results + if not teams_docs: + return { + "id": 53, + "name": "Microsoft Teams", + "type": "TEAMS_CONNECTOR", + "sources": [], + }, [] + + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + team_name = metadata.get("team_name", "Unknown Team") + channel_name = metadata.get("channel_name", "Unknown Channel") + message_date = metadata.get("start_date", "") + title = f"Teams: {team_name} - {channel_name}" + if message_date: + title += f" ({message_date})" + return title + + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + team_id = metadata.get("team_id", "") + channel_id = metadata.get("channel_id", "") + if team_id and channel_id: + return f"https://teams.microsoft.com/l/channel/{channel_id}/General?groupId={team_id}" + return "" + + sources_list = self._build_chunk_sources_from_documents( + teams_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), + ) + + # Create result object + result_object = { + "id": 53, + "name": "Microsoft Teams", + "type": "TEAMS_CONNECTOR", + "sources": sources_list, + } + + return result_object, teams_docs + async def search_luma( self, user_query: str, diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 3cae1bbdb..1d1cbe361 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -564,6 +564,49 @@ async def _index_discord_messages( ) +@celery_app.task(name="index_teams_messages", bind=True) +def index_teams_messages_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Celery task to index Microsoft Teams messages.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_teams_messages( + connector_id, search_space_id, user_id, start_date, end_date + ) + ) + finally: + loop.close() + + +async def _index_teams_messages( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Index Microsoft Teams messages with new session.""" + from app.routes.search_source_connectors_routes import ( + run_teams_indexing, + ) + + async with get_celery_session_maker()() as session: + await run_teams_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + + @celery_app.task(name="index_luma_events", bind=True) def index_luma_events_task( self, diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py new file mode 100644 index 000000000..4fb4d719d --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -0,0 +1,471 @@ +""" +Microsoft Teams connector indexer. +""" + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.teams_history import TeamsHistory +from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_unique_identifier_hash, +) + +from .base import ( + build_document_metadata_markdown, + calculate_date_range, + check_document_by_unique_identifier, + get_connector_by_id, + get_current_timestamp, + logger, + update_connector_last_indexed, +) + + +async def index_teams_messages( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index Microsoft Teams messages from all accessible teams and channels. + + Args: + session: Database session + connector_id: ID of the Teams connector + search_space_id: ID of the search space to store documents in + user_id: ID of the user + start_date: Start date for indexing (YYYY-MM-DD format) + end_date: End date for indexing (YYYY-MM-DD format) + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="teams_messages_indexing", + source="connector_indexing_task", + message=f"Starting Microsoft Teams messages indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, + ) + + try: + # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving Teams connector {connector_id} from database", + {"stage": "connector_retrieval"}, + ) + + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.TEAMS_CONNECTOR + ) + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a Teams connector", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return ( + 0, + f"Connector with ID {connector_id} not found or is not a Teams connector", + ) + + # Initialize Teams client with auto-refresh support + await task_logger.log_task_progress( + log_entry, + f"Initializing Teams client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + teams_client = TeamsHistory(session=session, connector_id=connector_id) + + # Handle 'undefined' string from frontend (treat as None) + if start_date == "undefined" or start_date == "": + start_date = None + if end_date == "undefined" or end_date == "": + end_date = None + + # Calculate date range + await task_logger.log_task_progress( + log_entry, + "Calculating date range for Teams indexing", + { + "stage": "date_calculation", + "provided_start_date": start_date, + "provided_end_date": end_date, + }, + ) + + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + logger.info( + "Indexing Teams messages from %s to %s", start_date_str, end_date_str + ) + + await task_logger.log_task_progress( + log_entry, + f"Fetching Teams from {start_date_str} to {end_date_str}", + { + "stage": "fetch_teams", + "start_date": start_date_str, + "end_date": end_date_str, + }, + ) + + # Get all teams + try: + teams = await teams_client.get_all_teams() + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to get Teams for connector {connector_id}", + str(e), + {"error_type": "TeamsFetchError"}, + ) + return 0, f"Failed to get Teams: {e!s}" + + if not teams: + await task_logger.log_task_success( + log_entry, + f"No Teams found for connector {connector_id}", + {"teams_found": 0}, + ) + return 0, "No Teams found" + + # Track the number of documents indexed + documents_indexed = 0 + documents_skipped = 0 + skipped_channels = [] + + await task_logger.log_task_progress( + log_entry, + f"Starting to process {len(teams)} Teams", + {"stage": "process_teams", "total_teams": len(teams)}, + ) + + # Convert date strings to datetime objects for filtering + from datetime import datetime + + start_datetime = None + end_datetime = None + if start_date_str: + start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d") + if end_date_str: + end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d") + + # Process each team + for team in teams: + team_id = team.get("id") + team_name = team.get("displayName", "Unknown Team") + + try: + # Get channels for this team + channels = await teams_client.get_channels_for_team(team_id) + + if not channels: + logger.info("No channels found in team %s", team_name) + continue + + # Process each channel in the team + for channel in channels: + channel_id = channel.get("id") + channel_name = channel.get("displayName", "Unknown Channel") + + try: + # Get messages for this channel + messages = await teams_client.get_messages_from_channel( + team_id, + channel_id, + start_datetime, + end_datetime, + include_replies=True, + ) + + if not messages: + logger.info( + "No messages found in channel %s of team %s for the specified date range.", + channel_name, + team_name, + ) + documents_skipped += 1 + continue + + # Process each message + for msg in messages: + # Skip deleted messages or empty content + if msg.get("deletedDateTime"): + continue + + # Extract message details + message_id = msg.get("id", "") + created_datetime = msg.get("createdDateTime", "") + from_user = msg.get("from", {}) + user_name = from_user.get("user", {}).get( + "displayName", "Unknown User" + ) + user_email = from_user.get("user", {}).get( + "userPrincipalName", "Unknown Email" + ) + + # Extract message content + body = msg.get("body", {}) + content_type = body.get("contentType", "text") + msg_text = body.get("content", "") + + # Skip empty messages + if not msg_text or msg_text.strip() == "": + continue + + # Format document metadata + metadata_sections = [ + ( + "METADATA", + [ + f"TEAM_NAME: {team_name}", + f"TEAM_ID: {team_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_TIMESTAMP: {created_datetime}", + f"MESSAGE_USER_NAME: {user_name}", + f"MESSAGE_USER_EMAIL: {user_email}", + f"CONTENT_TYPE: {content_type}", + ], + ), + ( + "CONTENT", + [ + f"FORMAT: {content_type}", + "TEXT_START", + msg_text, + "TEXT_END", + ], + ), + ] + + # Build the document string + combined_document_string = build_document_metadata_markdown( + metadata_sections + ) + + # Generate unique identifier hash for this Teams message + unique_identifier = f"{team_id}_{channel_id}_{message_id}" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.TEAMS_CONNECTOR, + unique_identifier, + search_space_id, + ) + + # Generate content hash + content_hash = generate_content_hash( + combined_document_string, search_space_id + ) + + # Check if document with this unique identifier already exists + existing_document = ( + await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + "Document for Teams message %s in channel %s unchanged. Skipping.", + message_id, + channel_name, + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + "Content changed for Teams message %s in channel %s. Updating document.", + message_id, + channel_name, + ) + + # Update chunks and embedding + chunks = await create_document_chunks( + combined_document_string + ) + doc_embedding = config.embedding_model_instance.embed( + combined_document_string + ) + + # Update existing document + existing_document.content = combined_document_string + existing_document.content_hash = content_hash + existing_document.embedding = doc_embedding + existing_document.document_metadata = { + "team_name": team_name, + "team_id": team_id, + "channel_name": channel_name, + "channel_id": channel_id, + "start_date": start_date_str, + "end_date": end_date_str, + "message_count": len(messages), + "indexed_at": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + + # Delete old chunks and add new ones + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + + documents_indexed += 1 + logger.info( + "Successfully updated Teams message %s", message_id + ) + continue + + # Document doesn't exist - create new one + # Process chunks + chunks = await create_document_chunks( + combined_document_string + ) + doc_embedding = config.embedding_model_instance.embed( + combined_document_string + ) + + # Create and store new document + document = Document( + search_space_id=search_space_id, + title=f"Teams - {team_name} - {channel_name}", + document_type=DocumentType.TEAMS_CONNECTOR, + document_metadata={ + "team_name": team_name, + "team_id": team_id, + "channel_name": channel_name, + "channel_id": channel_id, + "start_date": start_date_str, + "end_date": end_date_str, + "message_count": len(messages), + "indexed_at": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + }, + content=combined_document_string, + embedding=doc_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + updated_at=get_current_timestamp(), + ) + + session.add(document) + documents_indexed += 1 + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info( + "Committing batch: %s Teams messages processed so far", + documents_indexed, + ) + await session.commit() + + logger.info( + "Successfully indexed channel %s in team %s with %s messages", + channel_name, + team_name, + len(messages), + ) + + except Exception as e: + logger.error( + "Error processing channel %s in team %s: %s", + channel_name, + team_name, + str(e), + ) + skipped_channels.append( + f"{team_name}/{channel_name} (processing error)" + ) + documents_skipped += 1 + continue + + except Exception as e: + logger.error("Error processing team %s: %s", team_name, str(e)) + continue + + # Update the last_indexed_at timestamp for the connector only if requested + # and if we successfully indexed at least one document + total_processed = documents_indexed + if total_processed > 0: + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit for any remaining documents not yet committed in batches + logger.info( + "Final commit: Total %s Teams messages processed", documents_indexed + ) + await session.commit() + + # Prepare result message + result_message = None + if skipped_channels: + result_message = f"Processed {total_processed} messages. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" + else: + result_message = f"Processed {total_processed} messages." + + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Teams indexing for connector {connector_id}", + { + "messages_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_channels_count": len(skipped_channels), + "result_message": result_message, + }, + ) + + logger.info( + "Teams indexing completed: %s new messages, %s skipped", + documents_indexed, + documents_skipped, + ) + return total_processed, result_message + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Teams indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error("Database error: %s", str(db_error)) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Teams messages for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error("Failed to index Teams messages: %s", str(e)) + return 0, f"Failed to index Teams messages: {e!s}" diff --git a/surfsense_backend/app/utils/connector_naming.py b/surfsense_backend/app/utils/connector_naming.py index f9f1fdd21..731f419d6 100644 --- a/surfsense_backend/app/utils/connector_naming.py +++ b/surfsense_backend/app/utils/connector_naming.py @@ -20,6 +20,7 @@ BASE_NAME_FOR_TYPE = { SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: "Google Drive", SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", SearchSourceConnectorType.SLACK_CONNECTOR: "Slack", + SearchSourceConnectorType.TEAMS_CONNECTOR: "Microsoft Teams", SearchSourceConnectorType.NOTION_CONNECTOR: "Notion", SearchSourceConnectorType.LINEAR_CONNECTOR: "Linear", SearchSourceConnectorType.JIRA_CONNECTOR: "Jira", @@ -53,6 +54,9 @@ def extract_identifier_from_credentials( if connector_type == SearchSourceConnectorType.SLACK_CONNECTOR: return credentials.get("team_name") + if connector_type == SearchSourceConnectorType.TEAMS_CONNECTOR: + return credentials.get("tenant_name") + if connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: return credentials.get("workspace_name") diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index c95f407a4..219641933 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -19,6 +19,7 @@ logger = logging.getLogger(__name__) # Mapping of connector types to their corresponding Celery task names CONNECTOR_TASK_MAP = { SearchSourceConnectorType.SLACK_CONNECTOR: "index_slack_messages", + SearchSourceConnectorType.TEAMS_CONNECTOR: "index_teams_messages", SearchSourceConnectorType.NOTION_CONNECTOR: "index_notion_pages", SearchSourceConnectorType.GITHUB_CONNECTOR: "index_github_repos", SearchSourceConnectorType.LINEAR_CONNECTOR: "index_linear_issues", diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/teams-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/teams-config.tsx new file mode 100644 index 000000000..ac08a6c03 --- /dev/null +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/teams-config.tsx @@ -0,0 +1,29 @@ +"use client"; + +import { Info } from "lucide-react"; +import type { FC } from "react"; +import type { ConnectorConfigProps } from "../index"; + +export interface TeamsConfigProps extends ConnectorConfigProps { + onNameChange?: (name: string) => void; +} + +export const TeamsConfig: FC = () => { + return ( +
+
+
+ +
+
+

Microsoft Teams Access

+

+ SurfSense will index messages from Teams channels that you have access to. The app can + only read messages from teams and channels where you are a member. Make sure you're a + member of the teams you want to index before connecting. +

+
+
+
+ ); +}; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx index 2575b3a69..267e85115 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx @@ -17,6 +17,7 @@ import { LumaConfig } from "./components/luma-config"; import { SearxngConfig } from "./components/searxng-config"; import { SlackConfig } from "./components/slack-config"; import { TavilyApiConfig } from "./components/tavily-api-config"; +import { TeamsConfig } from "./components/teams-config"; import { WebcrawlerConfig } from "./components/webcrawler-config"; export interface ConnectorConfigProps { @@ -52,6 +53,8 @@ export function getConnectorConfigComponent( return SlackConfig; case "DISCORD_CONNECTOR": return DiscordConfig; + case "TEAMS_CONNECTOR": + return TeamsConfig; case "CONFLUENCE_CONNECTOR": return ConfluenceConfig; case "BOOKSTACK_CONNECTOR": diff --git a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts index 287bc30f4..23982e6f3 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts @@ -51,6 +51,13 @@ export const OAUTH_CONNECTORS = [ connectorType: EnumConnectorName.SLACK_CONNECTOR, authEndpoint: "/api/v1/auth/slack/connector/add/", }, + { + id: "teams-connector", + title: "Microsoft Teams", + description: "Search Teams messages", + connectorType: EnumConnectorName.TEAMS_CONNECTOR, + authEndpoint: "/api/v1/auth/teams/connector/add/", + }, { id: "discord-connector", title: "Discord", diff --git a/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts b/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts index a0b271eb6..433a51e8c 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts @@ -11,6 +11,7 @@ export const CONNECTOR_TO_DOCUMENT_TYPE: Record = { // Direct mappings (connector type matches document type) SLACK_CONNECTOR: "SLACK_CONNECTOR", + TEAMS_CONNECTOR: "TEAMS_CONNECTOR", NOTION_CONNECTOR: "NOTION_CONNECTOR", GITHUB_CONNECTOR: "GITHUB_CONNECTOR", LINEAR_CONNECTOR: "LINEAR_CONNECTOR", diff --git a/surfsense_web/contracts/enums/connector.ts b/surfsense_web/contracts/enums/connector.ts index ae80cf871..fc65585e2 100644 --- a/surfsense_web/contracts/enums/connector.ts +++ b/surfsense_web/contracts/enums/connector.ts @@ -4,6 +4,7 @@ export enum EnumConnectorName { LINKUP_API = "LINKUP_API", BAIDU_SEARCH_API = "BAIDU_SEARCH_API", SLACK_CONNECTOR = "SLACK_CONNECTOR", + TEAMS_CONNECTOR = "TEAMS_CONNECTOR", NOTION_CONNECTOR = "NOTION_CONNECTOR", GITHUB_CONNECTOR = "GITHUB_CONNECTOR", LINEAR_CONNECTOR = "LINEAR_CONNECTOR", diff --git a/surfsense_web/contracts/enums/connectorIcons.tsx b/surfsense_web/contracts/enums/connectorIcons.tsx index 22bc734aa..befe132f9 100644 --- a/surfsense_web/contracts/enums/connectorIcons.tsx +++ b/surfsense_web/contracts/enums/connectorIcons.tsx @@ -31,6 +31,8 @@ export const getConnectorIcon = (connectorType: EnumConnectorName | string, clas return Baidu; case EnumConnectorName.SLACK_CONNECTOR: return Slack; + case EnumConnectorName.TEAMS_CONNECTOR: + return Microsoft Teams; case EnumConnectorName.NOTION_CONNECTOR: return Notion; case EnumConnectorName.DISCORD_CONNECTOR: