From 18035b3728b472318fc135d12a962729d3a71081 Mon Sep 17 00:00:00 2001 From: Manoj Aggarwal Date: Fri, 9 Jan 2026 13:20:47 -0800 Subject: [PATCH] Add MS Teams connector --- .../versions/59_add_teams_connector_enums.py | 160 ++++++++++++++++++ .../agents/new_chat/tools/knowledge_base.py | 2 + .../app/connectors/teams_connector.py | 51 ++++-- .../routes/search_source_connectors_routes.py | 22 ++- .../app/routes/teams_add_connector_route.py | 7 +- .../tasks/connector_indexers/teams_indexer.py | 8 +- .../contracts/types/connector.types.ts | 1 + 7 files changed, 222 insertions(+), 29 deletions(-) create mode 100644 surfsense_backend/alembic/versions/59_add_teams_connector_enums.py diff --git a/surfsense_backend/alembic/versions/59_add_teams_connector_enums.py b/surfsense_backend/alembic/versions/59_add_teams_connector_enums.py new file mode 100644 index 000000000..f13fbe9e5 --- /dev/null +++ b/surfsense_backend/alembic/versions/59_add_teams_connector_enums.py @@ -0,0 +1,160 @@ +"""Add TEAMS_CONNECTOR to SearchSourceConnectorType and DocumentType enums + +Revision ID: 59 +Revises: 58 +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "59" +down_revision: str | None = "58" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +# Define the ENUM type name and the new value +CONNECTOR_ENUM = "searchsourceconnectortype" +CONNECTOR_NEW_VALUE = "TEAMS_CONNECTOR" +DOCUMENT_ENUM = "documenttype" +DOCUMENT_NEW_VALUE = "TEAMS_CONNECTOR" + + +def upgrade() -> None: + """Upgrade schema - add TEAMS_CONNECTOR to connector and document enum safely.""" + # Add TEAMS_CONNECTOR to searchsourceconnectortype only if not exists + op.execute( + f""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_enum + WHERE enumlabel = '{CONNECTOR_NEW_VALUE}' + AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{CONNECTOR_ENUM}') + ) THEN + ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}'; + END IF; + END$$; + """ + ) + + # Add TEAMS_CONNECTOR to documenttype only if not exists + op.execute( + f""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_enum + WHERE enumlabel = '{DOCUMENT_NEW_VALUE}' + AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{DOCUMENT_ENUM}') + ) THEN + ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}'; + END IF; + END$$; + """ + ) + + +def downgrade() -> None: + """Downgrade schema - remove TEAMS_CONNECTOR from connector and document enum.""" + + # Old enum name + old_connector_enum_name = f"{CONNECTOR_ENUM}_old" + old_document_enum_name = f"{DOCUMENT_ENUM}_old" + + # All connector values except TEAMS_CONNECTOR + old_connector_values = ( + "SERPER_API", + "TAVILY_API", + "SEARXNG_API", + "LINKUP_API", + "BAIDU_SEARCH_API", + "SLACK_CONNECTOR", + "NOTION_CONNECTOR", + "GITHUB_CONNECTOR", + "LINEAR_CONNECTOR", + "DISCORD_CONNECTOR", + "JIRA_CONNECTOR", + "CONFLUENCE_CONNECTOR", + "CLICKUP_CONNECTOR", + "GOOGLE_CALENDAR_CONNECTOR", + "GOOGLE_GMAIL_CONNECTOR", + "GOOGLE_DRIVE_CONNECTOR", + "AIRTABLE_CONNECTOR", + "LUMA_CONNECTOR", + "ELASTICSEARCH_CONNECTOR", + "WEBCRAWLER_CONNECTOR", + ) + + # All document values except TEAMS_CONNECTOR + old_document_values = ( + "EXTENSION", + "CRAWLED_URL", + "FILE", + "SLACK_CONNECTOR", + "NOTION_CONNECTOR", + "YOUTUBE_VIDEO", + "GITHUB_CONNECTOR", + "LINEAR_CONNECTOR", + "DISCORD_CONNECTOR", + "JIRA_CONNECTOR", + "CONFLUENCE_CONNECTOR", + "CLICKUP_CONNECTOR", + "GOOGLE_CALENDAR_CONNECTOR", + "GOOGLE_GMAIL_CONNECTOR", + "GOOGLE_DRIVE_FILE", + "AIRTABLE_CONNECTOR", + "LUMA_CONNECTOR", + "ELASTICSEARCH_CONNECTOR", + "BOOKSTACK_CONNECTOR", + "CIRCLEBACK", + "NOTE", + ) + + old_connector_values_sql = ", ".join([f"'{v}'" for v in old_connector_values]) + old_document_values_sql = ", ".join([f"'{v}'" for v in old_document_values]) + + # Table and column names + connector_table_name = "search_source_connectors" + connector_column_name = "connector_type" + document_table_name = "documents" + document_column_name = "document_type" + + # Connector Enum Downgrade Steps + # 1. Rename the current connector enum type + op.execute(f"ALTER TYPE {CONNECTOR_ENUM} RENAME TO {old_connector_enum_name}") + + # 2. Create the new connector enum type with the old values + op.execute(f"CREATE TYPE {CONNECTOR_ENUM} AS ENUM({old_connector_values_sql})") + + # 3. Alter the column to use the new connector enum type + op.execute( + f""" + ALTER TABLE {connector_table_name} + ALTER COLUMN {connector_column_name} TYPE {CONNECTOR_ENUM} + USING {connector_column_name}::text::{CONNECTOR_ENUM} + """ + ) + + # 4. Drop the old connector enum type + op.execute(f"DROP TYPE {old_connector_enum_name}") + + # Document Enum Downgrade Steps + # 1. Rename the current document enum type + op.execute(f"ALTER TYPE {DOCUMENT_ENUM} RENAME TO {old_document_enum_name}") + + # 2. Create the new document enum type with the old values + op.execute(f"CREATE TYPE {DOCUMENT_ENUM} AS ENUM({old_document_values_sql})") + + # 3. Alter the column to use the new document enum type + op.execute( + f""" + ALTER TABLE {document_table_name} + ALTER COLUMN {document_column_name} TYPE {DOCUMENT_ENUM} + USING {document_column_name}::text::{DOCUMENT_ENUM} + """ + ) + + # 4. Drop the old document enum type + op.execute(f"DROP TYPE {old_document_enum_name}") diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index a3cdad359..e91d865fa 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -26,6 +26,7 @@ _ALL_CONNECTORS: list[str] = [ "EXTENSION", "FILE", "SLACK_CONNECTOR", + "TEAMS_CONNECTOR", "NOTION_CONNECTOR", "YOUTUBE_VIDEO", "GITHUB_CONNECTOR", @@ -573,6 +574,7 @@ def create_search_knowledge_base_tool( - FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files) - NOTE: "SurfSense Notes" (notes created inside SurfSense) - SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications) + - TEAMS_CONNECTOR: "Microsoft Teams messages and conversations" (personal Teams communications) - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) - YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos) - GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions) diff --git a/surfsense_backend/app/connectors/teams_connector.py b/surfsense_backend/app/connectors/teams_connector.py index e11a2aad0..29c2db127 100644 --- a/surfsense_backend/app/connectors/teams_connector.py +++ b/surfsense_backend/app/connectors/teams_connector.py @@ -7,7 +7,7 @@ Supports OAuth-based authentication with token refresh. """ import logging -from datetime import datetime +from datetime import datetime, timezone from typing import Any import httpx @@ -255,25 +255,11 @@ class TeamsConnector: async with httpx.AsyncClient() as client: url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages" - # Build query parameters for date filtering if needed - params = {} - if start_date or end_date: - filter_parts = [] - if start_date: - filter_parts.append( - f"createdDateTime ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" - ) - if end_date: - filter_parts.append( - f"createdDateTime le {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" - ) - if filter_parts: - params["$filter"] = " and ".join(filter_parts) - + # Note: The Graph API for channel messages doesn't support $filter parameter + # We fetch all messages and filter them client-side response = await client.get( url, headers={"Authorization": f"Bearer {access_token}"}, - params=params, timeout=30.0, ) @@ -283,7 +269,36 @@ class TeamsConnector: ) data = response.json() - return data.get("value", []) + messages = data.get("value", []) + + # Filter messages by date if needed (client-side filtering) + if start_date or end_date: + # Make sure comparison dates are timezone-aware (UTC) + if start_date and start_date.tzinfo is None: + start_date = start_date.replace(tzinfo=timezone.utc) + if end_date and end_date.tzinfo is None: + end_date = end_date.replace(tzinfo=timezone.utc) + + filtered_messages = [] + for message in messages: + created_at_str = message.get("createdDateTime") + if not created_at_str: + continue + + # Parse the ISO 8601 datetime string (already timezone-aware) + created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) + + # Check if message is within date range + if start_date and created_at < start_date: + continue + if end_date and created_at > end_date: + continue + + filtered_messages.append(message) + + return filtered_messages + + return messages async def get_message_replies( self, team_id: str, channel_id: str, message_id: str diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 337e1af85..73a593186 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -558,6 +558,7 @@ async def index_connector_content( Currently supports: - SLACK_CONNECTOR: Indexes messages from all accessible Slack channels + - TEAMS_CONNECTOR: Indexes messages from all accessible Microsoft Teams channels - NOTION_CONNECTOR: Indexes pages from all accessible Notion pages - GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories - LINEAR_CONNECTOR: Indexes issues and comments from Linear @@ -631,6 +632,19 @@ async def index_connector_content( ) response_message = "Slack indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.TEAMS_CONNECTOR: + from app.tasks.celery_tasks.connector_tasks import ( + index_teams_messages_task, + ) + + logger.info( + f"Triggering Teams indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_teams_messages_task.delay( + connector_id, search_space_id, str(user.id), indexing_from, indexing_to + ) + response_message = "Teams indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import index_notion_pages_task @@ -1237,16 +1251,14 @@ async def run_teams_indexing( update_last_indexed=False, # Don't update timestamp in the indexing function ) - # Only update last_indexed_at if indexing was successful (either new docs or updated docs) - if documents_processed > 0: + # Update last_indexed_at if indexing was successful (regardless of new/skipped docs) + if error_or_warning is None: await update_connector_last_indexed(session, connector_id) logger.info( f"Teams indexing completed successfully: {documents_processed} documents processed" ) else: - logger.error( - f"Teams indexing failed or no documents processed: {error_or_warning}" - ) + logger.error(f"Teams indexing failed: {error_or_warning}") except Exception as e: logger.error(f"Error in background Teams indexing task: {e!s}") diff --git a/surfsense_backend/app/routes/teams_add_connector_route.py b/surfsense_backend/app/routes/teams_add_connector_route.py index a84db47c9..ce014be0d 100644 --- a/surfsense_backend/app/routes/teams_add_connector_route.py +++ b/surfsense_backend/app/routes/teams_add_connector_route.py @@ -312,17 +312,18 @@ async def teams_callback( session, SearchSourceConnectorType.TEAMS_CONNECTOR, space_id, - connector_config, + user_id, + connector_identifier, ) # Create new connector new_connector = SearchSourceConnector( + name=connector_name, connector_type=SearchSourceConnectorType.TEAMS_CONNECTOR, + is_indexable=True, config=connector_config, - is_enabled=True, search_space_id=space_id, user_id=user_id, - connector_name=connector_name, ) try: diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 4fb4d719d..c1e778768 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -165,14 +165,16 @@ async def index_teams_messages( ) # Convert date strings to datetime objects for filtering - from datetime import datetime + from datetime import datetime, timezone start_datetime = None end_datetime = None if start_date_str: - start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d") + # Parse as naive datetime and make it timezone-aware (UTC) + start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc) if end_date_str: - end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d") + # Parse as naive datetime, set to end of day, and make it timezone-aware (UTC) + end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) # Process each team for team in teams: diff --git a/surfsense_web/contracts/types/connector.types.ts b/surfsense_web/contracts/types/connector.types.ts index 5b67297ae..f864ae16f 100644 --- a/surfsense_web/contracts/types/connector.types.ts +++ b/surfsense_web/contracts/types/connector.types.ts @@ -8,6 +8,7 @@ export const searchSourceConnectorTypeEnum = z.enum([ "LINKUP_API", "BAIDU_SEARCH_API", "SLACK_CONNECTOR", + "TEAMS_CONNECTOR", "NOTION_CONNECTOR", "GITHUB_CONNECTOR", "LINEAR_CONNECTOR",