feat: add extract_identifier_from_credentials to connector naming

This commit is contained in:
CREDO23 2026-01-07 08:15:48 +02:00
parent d979c156f8
commit 4c6a782cec

View file

@ -1,4 +1,18 @@
from app.db import SearchSourceConnectorType """
Connector Naming Utilities.
Provides functions for generating unique, user-friendly connector names.
"""
from typing import Any
from urllib.parse import urlparse
from uuid import UUID
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
# Friendly display names for connector types # Friendly display names for connector types
BASE_NAME_FOR_TYPE = { BASE_NAME_FOR_TYPE = {
@ -7,54 +21,131 @@ BASE_NAME_FOR_TYPE = {
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "Google Calendar",
SearchSourceConnectorType.SLACK_CONNECTOR: "Slack", SearchSourceConnectorType.SLACK_CONNECTOR: "Slack",
SearchSourceConnectorType.NOTION_CONNECTOR: "Notion", SearchSourceConnectorType.NOTION_CONNECTOR: "Notion",
SearchSourceConnectorType.GITHUB_CONNECTOR: "GitHub",
SearchSourceConnectorType.LINEAR_CONNECTOR: "Linear", SearchSourceConnectorType.LINEAR_CONNECTOR: "Linear",
SearchSourceConnectorType.JIRA_CONNECTOR: "Jira", SearchSourceConnectorType.JIRA_CONNECTOR: "Jira",
SearchSourceConnectorType.DISCORD_CONNECTOR: "Discord", SearchSourceConnectorType.DISCORD_CONNECTOR: "Discord",
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: "Confluence", SearchSourceConnectorType.CONFLUENCE_CONNECTOR: "Confluence",
SearchSourceConnectorType.AIRTABLE_CONNECTOR: "Airtable", SearchSourceConnectorType.AIRTABLE_CONNECTOR: "Airtable",
SearchSourceConnectorType.LUMA_CONNECTOR: "Luma",
# Add other connectors as needed, fallback below
} }
def get_base_name_for_type(connector_type: SearchSourceConnectorType) -> str: def get_base_name_for_type(connector_type: SearchSourceConnectorType) -> str:
"""Get a friendly display name for a connector type."""
return BASE_NAME_FOR_TYPE.get(connector_type, connector_type.replace("_", " ").title()) return BASE_NAME_FOR_TYPE.get(connector_type, connector_type.replace("_", " ").title())
def generate_unique_connector_name(connector_type: SearchSourceConnectorType, identifier: str | None) -> str: def extract_identifier_from_credentials(
connector_type: SearchSourceConnectorType,
credentials: dict[str, Any],
) -> str | None:
"""
Extract a unique identifier from connector credentials.
Args:
connector_type: The type of connector
credentials: The connector credentials dict
Returns:
Identifier string (workspace name, email, etc.) or None
"""
if connector_type == SearchSourceConnectorType.SLACK_CONNECTOR:
return credentials.get("team_name")
if connector_type == SearchSourceConnectorType.NOTION_CONNECTOR:
return credentials.get("workspace_name")
if connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
return credentials.get("guild_name")
if connector_type in (
SearchSourceConnectorType.JIRA_CONNECTOR,
SearchSourceConnectorType.CONFLUENCE_CONNECTOR,
):
base_url = credentials.get("base_url", "")
if base_url:
try:
parsed = urlparse(base_url)
hostname = parsed.netloc or parsed.path
if ".atlassian.net" in hostname:
return hostname.replace(".atlassian.net", "")
return hostname
except Exception:
pass
return None
# Google, Linear, Airtable require API calls - return None
return None
def generate_connector_name_with_identifier(
connector_type: SearchSourceConnectorType,
identifier: str | None,
) -> str:
"""
Generate a connector name with an identifier.
Args:
connector_type: The type of connector
identifier: User identifier (email, workspace name, etc.)
Returns:
Name like "Gmail - john@example.com" or just "Gmail" if no identifier
"""
base = get_base_name_for_type(connector_type) base = get_base_name_for_type(connector_type)
if identifier: if identifier:
return f"{base} - {identifier}" return f"{base} - {identifier}"
return base return base
def extract_identifier_from_credentials(connector_type: SearchSourceConnectorType, credentials: dict) -> str | None: async def count_connectors_of_type(
if connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: session: AsyncSession,
return credentials.get("email") or credentials.get("user_email") connector_type: SearchSourceConnectorType,
if connector_type == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: search_space_id: int,
return credentials.get("email") user_id: UUID,
if connector_type == SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: ) -> int:
return credentials.get("email") """Count existing connectors of a type for a user in a search space."""
if connector_type == SearchSourceConnectorType.SLACK_CONNECTOR: result = await session.execute(
return credentials.get("team_name") or credentials.get("team_id") select(func.count(SearchSourceConnector.id)).where(
if connector_type == SearchSourceConnectorType.NOTION_CONNECTOR: SearchSourceConnector.connector_type == connector_type,
return credentials.get("workspace_name") SearchSourceConnector.search_space_id == search_space_id,
if connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR: SearchSourceConnector.user_id == user_id,
return credentials.get("username") )
if connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR: )
return credentials.get("workspace_name") return result.scalar() or 0
if connector_type == SearchSourceConnectorType.JIRA_CONNECTOR:
return credentials.get("base_url") or credentials.get("cloud_id")
if connector_type == SearchSourceConnectorType.CONFLUENCE_CONNECTOR: async def generate_unique_connector_name(
return credentials.get("base_url") or credentials.get("cloud_id") session: AsyncSession,
if connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: connector_type: SearchSourceConnectorType,
return credentials.get("guild_name") search_space_id: int,
if connector_type == SearchSourceConnectorType.AIRTABLE_CONNECTOR: user_id: UUID,
return credentials.get("base_name") identifier: str | None = None,
if connector_type == SearchSourceConnectorType.LUMA_CONNECTOR: ) -> str:
return credentials.get("account_name") """
for key in ["email", "username", "workspace_name", "team_name", "base_url", "guild_name", "site_name", "account_name"]: Generate a unique connector name.
if credentials.get(key):
return credentials.get(key) If an identifier is provided (email, workspace name, etc.), uses it with base name.
return None Otherwise, falls back to counting existing connectors for uniqueness.
Args:
session: Database session
connector_type: The type of connector
search_space_id: The search space ID
user_id: The user ID
identifier: Optional user identifier (email, workspace name, etc.)
Returns:
Unique name like "Gmail - john@example.com" or "Gmail (2)"
"""
base = get_base_name_for_type(connector_type)
if identifier:
return f"{base} - {identifier}"
# Fallback: use counter for uniqueness
count = await count_connectors_of_type(session, connector_type, search_space_id, user_id)
if count == 0:
return base
return f"{base} ({count + 1})"