Add MS Teams connector

This commit is contained in:
Manoj Aggarwal 2026-01-09 13:20:47 -08:00
parent fa35b71522
commit 18035b3728
7 changed files with 222 additions and 29 deletions

View file

@ -0,0 +1,160 @@
"""Add TEAMS_CONNECTOR to SearchSourceConnectorType and DocumentType enums
Revision ID: 59
Revises: 58
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "59"
down_revision: str | None = "58"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
# Define the ENUM type name and the new value
CONNECTOR_ENUM = "searchsourceconnectortype"
CONNECTOR_NEW_VALUE = "TEAMS_CONNECTOR"
DOCUMENT_ENUM = "documenttype"
DOCUMENT_NEW_VALUE = "TEAMS_CONNECTOR"
def upgrade() -> None:
"""Upgrade schema - add TEAMS_CONNECTOR to connector and document enum safely."""
# Add TEAMS_CONNECTOR to searchsourceconnectortype only if not exists
op.execute(
f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{CONNECTOR_NEW_VALUE}'
AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{CONNECTOR_ENUM}')
) THEN
ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}';
END IF;
END$$;
"""
)
# Add TEAMS_CONNECTOR to documenttype only if not exists
op.execute(
f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{DOCUMENT_NEW_VALUE}'
AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{DOCUMENT_ENUM}')
) THEN
ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}';
END IF;
END$$;
"""
)
def downgrade() -> None:
"""Downgrade schema - remove TEAMS_CONNECTOR from connector and document enum."""
# Old enum name
old_connector_enum_name = f"{CONNECTOR_ENUM}_old"
old_document_enum_name = f"{DOCUMENT_ENUM}_old"
# All connector values except TEAMS_CONNECTOR
old_connector_values = (
"SERPER_API",
"TAVILY_API",
"SEARXNG_API",
"LINKUP_API",
"BAIDU_SEARCH_API",
"SLACK_CONNECTOR",
"NOTION_CONNECTOR",
"GITHUB_CONNECTOR",
"LINEAR_CONNECTOR",
"DISCORD_CONNECTOR",
"JIRA_CONNECTOR",
"CONFLUENCE_CONNECTOR",
"CLICKUP_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR",
"GOOGLE_DRIVE_CONNECTOR",
"AIRTABLE_CONNECTOR",
"LUMA_CONNECTOR",
"ELASTICSEARCH_CONNECTOR",
"WEBCRAWLER_CONNECTOR",
)
# All document values except TEAMS_CONNECTOR
old_document_values = (
"EXTENSION",
"CRAWLED_URL",
"FILE",
"SLACK_CONNECTOR",
"NOTION_CONNECTOR",
"YOUTUBE_VIDEO",
"GITHUB_CONNECTOR",
"LINEAR_CONNECTOR",
"DISCORD_CONNECTOR",
"JIRA_CONNECTOR",
"CONFLUENCE_CONNECTOR",
"CLICKUP_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR",
"GOOGLE_DRIVE_FILE",
"AIRTABLE_CONNECTOR",
"LUMA_CONNECTOR",
"ELASTICSEARCH_CONNECTOR",
"BOOKSTACK_CONNECTOR",
"CIRCLEBACK",
"NOTE",
)
old_connector_values_sql = ", ".join([f"'{v}'" for v in old_connector_values])
old_document_values_sql = ", ".join([f"'{v}'" for v in old_document_values])
# Table and column names
connector_table_name = "search_source_connectors"
connector_column_name = "connector_type"
document_table_name = "documents"
document_column_name = "document_type"
# Connector Enum Downgrade Steps
# 1. Rename the current connector enum type
op.execute(f"ALTER TYPE {CONNECTOR_ENUM} RENAME TO {old_connector_enum_name}")
# 2. Create the new connector enum type with the old values
op.execute(f"CREATE TYPE {CONNECTOR_ENUM} AS ENUM({old_connector_values_sql})")
# 3. Alter the column to use the new connector enum type
op.execute(
f"""
ALTER TABLE {connector_table_name}
ALTER COLUMN {connector_column_name} TYPE {CONNECTOR_ENUM}
USING {connector_column_name}::text::{CONNECTOR_ENUM}
"""
)
# 4. Drop the old connector enum type
op.execute(f"DROP TYPE {old_connector_enum_name}")
# Document Enum Downgrade Steps
# 1. Rename the current document enum type
op.execute(f"ALTER TYPE {DOCUMENT_ENUM} RENAME TO {old_document_enum_name}")
# 2. Create the new document enum type with the old values
op.execute(f"CREATE TYPE {DOCUMENT_ENUM} AS ENUM({old_document_values_sql})")
# 3. Alter the column to use the new document enum type
op.execute(
f"""
ALTER TABLE {document_table_name}
ALTER COLUMN {document_column_name} TYPE {DOCUMENT_ENUM}
USING {document_column_name}::text::{DOCUMENT_ENUM}
"""
)
# 4. Drop the old document enum type
op.execute(f"DROP TYPE {old_document_enum_name}")

View file

@ -26,6 +26,7 @@ _ALL_CONNECTORS: list[str] = [
"EXTENSION", "EXTENSION",
"FILE", "FILE",
"SLACK_CONNECTOR", "SLACK_CONNECTOR",
"TEAMS_CONNECTOR",
"NOTION_CONNECTOR", "NOTION_CONNECTOR",
"YOUTUBE_VIDEO", "YOUTUBE_VIDEO",
"GITHUB_CONNECTOR", "GITHUB_CONNECTOR",
@ -573,6 +574,7 @@ def create_search_knowledge_base_tool(
- FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files) - FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files)
- NOTE: "SurfSense Notes" (notes created inside SurfSense) - NOTE: "SurfSense Notes" (notes created inside SurfSense)
- SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications) - SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications)
- TEAMS_CONNECTOR: "Microsoft Teams messages and conversations" (personal Teams communications)
- NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management)
- YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos) - YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos)
- GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions) - GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions)

View file

@ -7,7 +7,7 @@ Supports OAuth-based authentication with token refresh.
""" """
import logging import logging
from datetime import datetime from datetime import datetime, timezone
from typing import Any from typing import Any
import httpx import httpx
@ -255,25 +255,11 @@ class TeamsConnector:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages" url = f"{self.GRAPH_API_BASE}/teams/{team_id}/channels/{channel_id}/messages"
# Build query parameters for date filtering if needed # Note: The Graph API for channel messages doesn't support $filter parameter
params = {} # We fetch all messages and filter them client-side
if start_date or end_date:
filter_parts = []
if start_date:
filter_parts.append(
f"createdDateTime ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
)
if end_date:
filter_parts.append(
f"createdDateTime le {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
)
if filter_parts:
params["$filter"] = " and ".join(filter_parts)
response = await client.get( response = await client.get(
url, url,
headers={"Authorization": f"Bearer {access_token}"}, headers={"Authorization": f"Bearer {access_token}"},
params=params,
timeout=30.0, timeout=30.0,
) )
@ -283,7 +269,36 @@ class TeamsConnector:
) )
data = response.json() data = response.json()
return data.get("value", []) messages = data.get("value", [])
# Filter messages by date if needed (client-side filtering)
if start_date or end_date:
# Make sure comparison dates are timezone-aware (UTC)
if start_date and start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)
if end_date and end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
filtered_messages = []
for message in messages:
created_at_str = message.get("createdDateTime")
if not created_at_str:
continue
# Parse the ISO 8601 datetime string (already timezone-aware)
created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
# Check if message is within date range
if start_date and created_at < start_date:
continue
if end_date and created_at > end_date:
continue
filtered_messages.append(message)
return filtered_messages
return messages
async def get_message_replies( async def get_message_replies(
self, team_id: str, channel_id: str, message_id: str self, team_id: str, channel_id: str, message_id: str

View file

@ -558,6 +558,7 @@ async def index_connector_content(
Currently supports: Currently supports:
- SLACK_CONNECTOR: Indexes messages from all accessible Slack channels - SLACK_CONNECTOR: Indexes messages from all accessible Slack channels
- TEAMS_CONNECTOR: Indexes messages from all accessible Microsoft Teams channels
- NOTION_CONNECTOR: Indexes pages from all accessible Notion pages - NOTION_CONNECTOR: Indexes pages from all accessible Notion pages
- GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories - GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories
- LINEAR_CONNECTOR: Indexes issues and comments from Linear - LINEAR_CONNECTOR: Indexes issues and comments from Linear
@ -631,6 +632,19 @@ async def index_connector_content(
) )
response_message = "Slack indexing started in the background." response_message = "Slack indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.TEAMS_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import (
index_teams_messages_task,
)
logger.info(
f"Triggering Teams indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_teams_messages_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "Teams indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: elif connector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import index_notion_pages_task from app.tasks.celery_tasks.connector_tasks import index_notion_pages_task
@ -1237,16 +1251,14 @@ async def run_teams_indexing(
update_last_indexed=False, # Don't update timestamp in the indexing function update_last_indexed=False, # Don't update timestamp in the indexing function
) )
# Only update last_indexed_at if indexing was successful (either new docs or updated docs) # Update last_indexed_at if indexing was successful (regardless of new/skipped docs)
if documents_processed > 0: if error_or_warning is None:
await update_connector_last_indexed(session, connector_id) await update_connector_last_indexed(session, connector_id)
logger.info( logger.info(
f"Teams indexing completed successfully: {documents_processed} documents processed" f"Teams indexing completed successfully: {documents_processed} documents processed"
) )
else: else:
logger.error( logger.error(f"Teams indexing failed: {error_or_warning}")
f"Teams indexing failed or no documents processed: {error_or_warning}"
)
except Exception as e: except Exception as e:
logger.error(f"Error in background Teams indexing task: {e!s}") logger.error(f"Error in background Teams indexing task: {e!s}")

View file

@ -312,17 +312,18 @@ async def teams_callback(
session, session,
SearchSourceConnectorType.TEAMS_CONNECTOR, SearchSourceConnectorType.TEAMS_CONNECTOR,
space_id, space_id,
connector_config, user_id,
connector_identifier,
) )
# Create new connector # Create new connector
new_connector = SearchSourceConnector( new_connector = SearchSourceConnector(
name=connector_name,
connector_type=SearchSourceConnectorType.TEAMS_CONNECTOR, connector_type=SearchSourceConnectorType.TEAMS_CONNECTOR,
is_indexable=True,
config=connector_config, config=connector_config,
is_enabled=True,
search_space_id=space_id, search_space_id=space_id,
user_id=user_id, user_id=user_id,
connector_name=connector_name,
) )
try: try:

View file

@ -165,14 +165,16 @@ async def index_teams_messages(
) )
# Convert date strings to datetime objects for filtering # Convert date strings to datetime objects for filtering
from datetime import datetime from datetime import datetime, timezone
start_datetime = None start_datetime = None
end_datetime = None end_datetime = None
if start_date_str: if start_date_str:
start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d") # Parse as naive datetime and make it timezone-aware (UTC)
start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
if end_date_str: if end_date_str:
end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d") # Parse as naive datetime, set to end of day, and make it timezone-aware (UTC)
end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc)
# Process each team # Process each team
for team in teams: for team in teams:

View file

@ -8,6 +8,7 @@ export const searchSourceConnectorTypeEnum = z.enum([
"LINKUP_API", "LINKUP_API",
"BAIDU_SEARCH_API", "BAIDU_SEARCH_API",
"SLACK_CONNECTOR", "SLACK_CONNECTOR",
"TEAMS_CONNECTOR",
"NOTION_CONNECTOR", "NOTION_CONNECTOR",
"GITHUB_CONNECTOR", "GITHUB_CONNECTOR",
"LINEAR_CONNECTOR", "LINEAR_CONNECTOR",