From 0f5bf93f687fc13f0c06094c6aa22aed0a837eda Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Tue, 6 Jan 2026 14:36:51 +0530 Subject: [PATCH] feat: implement JiraHistoryConnector for OAuth and legacy authentication - Introduced JiraHistoryConnector to handle OAuth-based authentication and automatic token refresh for Jira API access. - Refactored Jira indexing logic to utilize the new connector, simplifying credential management and enhancing token refresh capabilities. - Removed legacy token handling code from the Jira indexer, streamlining the integration process. - Ensured compatibility with both OAuth 2.0 and legacy API token methods for improved flexibility. --- .../app/connectors/jira_history.py | 320 ++++++++++++++++++ .../tasks/connector_indexers/jira_indexer.py | 155 ++------- 2 files changed, 351 insertions(+), 124 deletions(-) create mode 100644 surfsense_backend/app/connectors/jira_history.py diff --git a/surfsense_backend/app/connectors/jira_history.py b/surfsense_backend/app/connectors/jira_history.py new file mode 100644 index 000000000..3e8c69104 --- /dev/null +++ b/surfsense_backend/app/connectors/jira_history.py @@ -0,0 +1,320 @@ +""" +Jira OAuth Connector. + +Handles OAuth-based authentication and token refresh for Jira API access. +Supports both OAuth 2.0 (preferred) and legacy API token authentication. +""" + +import logging +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.connectors.jira_connector import JiraConnector +from app.db import SearchSourceConnector +from app.routes.jira_add_connector_route import refresh_jira_token +from app.schemas.atlassian_auth_credentials import AtlassianAuthCredentialsBase +from app.utils.oauth_security import TokenEncryption + +logger = logging.getLogger(__name__) + + +class JiraHistoryConnector: + """ + Jira connector with OAuth support and automatic token refresh. + + This connector uses OAuth 2.0 access tokens to authenticate with the + Jira API. It automatically refreshes expired tokens when needed. + Also supports legacy API token authentication for backward compatibility. + """ + + def __init__( + self, + session: AsyncSession, + connector_id: int, + credentials: AtlassianAuthCredentialsBase | None = None, + ): + """ + Initialize the JiraHistoryConnector with auto-refresh capability. + + Args: + session: Database session for updating connector + connector_id: Connector ID for direct updates + credentials: Jira OAuth credentials (optional, will be loaded from DB if not provided) + """ + self._session = session + self._connector_id = connector_id + self._credentials = credentials + self._cloud_id: str | None = None + self._base_url: str | None = None + self._jira_client: JiraConnector | None = None + self._use_oauth = True + self._legacy_email: str | None = None + self._legacy_api_token: str | None = None + + async def _get_valid_token(self) -> str: + """ + Get valid Jira access token, refreshing if needed. + + Returns: + Valid access token + + Raises: + ValueError: If credentials are missing or invalid + Exception: If token refresh fails + """ + # Load credentials from DB if not provided + if self._credentials is None: + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise ValueError(f"Connector {self._connector_id} not found") + + config_data = connector.config.copy() + + # Check if using OAuth or legacy API token + is_oauth = config_data.get("_token_encrypted", False) or config_data.get("access_token") + + if is_oauth: + # OAuth 2.0 authentication + if not config.SECRET_KEY: + raise ValueError( + "SECRET_KEY not configured but tokens are marked as encrypted" + ) + + try: + token_encryption = TokenEncryption(config.SECRET_KEY) + + # Decrypt access_token + if config_data.get("access_token"): + config_data["access_token"] = token_encryption.decrypt_token( + config_data["access_token"] + ) + logger.info( + f"Decrypted Jira access token for connector {self._connector_id}" + ) + + # Decrypt refresh_token if present + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + logger.info( + f"Decrypted Jira refresh token for connector {self._connector_id}" + ) + except Exception as e: + logger.error( + f"Failed to decrypt Jira credentials for connector {self._connector_id}: {e!s}" + ) + raise ValueError( + f"Failed to decrypt Jira credentials: {e!s}" + ) from e + + try: + self._credentials = AtlassianAuthCredentialsBase.from_dict(config_data) + self._cloud_id = config_data.get("cloud_id") + self._base_url = config_data.get("base_url") + self._use_oauth = True + except Exception as e: + raise ValueError(f"Invalid Jira OAuth credentials: {e!s}") from e + else: + # Legacy API token authentication + self._legacy_email = config_data.get("JIRA_EMAIL") + self._legacy_api_token = config_data.get("JIRA_API_TOKEN") + self._base_url = config_data.get("JIRA_BASE_URL") + self._use_oauth = False + + if not self._legacy_email or not self._legacy_api_token or not self._base_url: + raise ValueError("Jira credentials not found in connector config") + + # Check if token is expired and refreshable (only for OAuth) + if self._use_oauth and self._credentials.is_expired and self._credentials.is_refreshable: + try: + logger.info( + f"Jira token expired for connector {self._connector_id}, refreshing..." + ) + + # Get connector for refresh + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise RuntimeError( + f"Connector {self._connector_id} not found; cannot refresh token." + ) + + # Refresh token + connector = await refresh_jira_token(self._session, connector) + + # Reload credentials after refresh + config_data = connector.config.copy() + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("access_token"): + config_data["access_token"] = token_encryption.decrypt_token( + config_data["access_token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + + self._credentials = AtlassianAuthCredentialsBase.from_dict(config_data) + self._cloud_id = config_data.get("cloud_id") + self._base_url = config_data.get("base_url") + + # Invalidate cached client so it's recreated with new token + self._jira_client = None + + logger.info( + f"Successfully refreshed Jira token for connector {self._connector_id}" + ) + except Exception as e: + logger.error( + f"Failed to refresh Jira token for connector {self._connector_id}: {e!s}" + ) + raise Exception( + f"Failed to refresh Jira OAuth credentials: {e!s}" + ) from e + + if self._use_oauth: + return self._credentials.access_token + else: + # For legacy auth, return empty string (not used for token-based auth) + return "" + + async def _get_jira_client(self) -> JiraConnector: + """ + Get or create JiraConnector with valid credentials. + + Returns: + JiraConnector instance + """ + if self._jira_client is None: + if self._use_oauth: + # Ensure we have valid token (will refresh if needed) + await self._get_valid_token() + + self._jira_client = JiraConnector( + base_url=self._base_url, + access_token=self._credentials.access_token, + cloud_id=self._cloud_id, + ) + else: + # Legacy API token authentication + self._jira_client = JiraConnector( + base_url=self._base_url, + email=self._legacy_email, + api_token=self._legacy_api_token, + ) + else: + # If OAuth, refresh token if expired before returning client + if self._use_oauth: + await self._get_valid_token() + # Update client with new token if it was refreshed + if self._credentials: + self._jira_client.set_oauth_credentials( + base_url=self._base_url or "", + access_token=self._credentials.access_token, + cloud_id=self._cloud_id, + ) + + return self._jira_client + + async def get_issues_by_date_range( + self, + start_date: str, + end_date: str, + include_comments: bool = True, + project_key: str | None = None, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Fetch issues within a date range. + This method wraps JiraConnector.get_issues_by_date_range() with automatic token refresh. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format (inclusive) + include_comments: Whether to include comments in the response + project_key: Optional project key to filter issues + + Returns: + Tuple containing (issues list, error message or None) + """ + # Ensure token is valid (will refresh if needed) + if self._use_oauth: + await self._get_valid_token() + + # Get client with valid credentials + client = await self._get_jira_client() + + # JiraConnector methods are synchronous, so we call them directly + # Token refresh has already been handled above + return client.get_issues_by_date_range( + start_date=start_date, + end_date=end_date, + include_comments=include_comments, + project_key=project_key, + ) + + def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]: + """ + Format an issue for easier consumption. + Wraps JiraConnector.format_issue(). + + Args: + issue: The issue object from Jira API + + Returns: + Formatted issue dictionary + """ + # This is a synchronous method that doesn't need token refresh + # since it just formats data that's already been fetched + if self._jira_client is None: + # Create a minimal client just for formatting (doesn't need credentials) + self._jira_client = JiraConnector() + return self._jira_client.format_issue(issue) + + def format_issue_to_markdown(self, issue: dict[str, Any]) -> str: + """ + Convert an issue to markdown format. + Wraps JiraConnector.format_issue_to_markdown(). + + Args: + issue: The issue object (either raw or formatted) + + Returns: + Markdown string representation of the issue + """ + # This is a synchronous method that doesn't need token refresh + # since it just formats data that's already been fetched + if self._jira_client is None: + # Create a minimal client just for formatting (doesn't need credentials) + self._jira_client = JiraConnector() + return self._jira_client.format_issue_to_markdown(issue) + + async def close(self): + """Close any resources (currently no-op for JiraConnector).""" + # JiraConnector doesn't maintain persistent connections, so nothing to close + self._jira_client = None + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index cd7dabeaf..47ad0986f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -8,7 +8,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.config import config -from app.connectors.jira_connector import JiraConnector +from app.connectors.jira_history import JiraHistoryConnector from app.db import Document, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService @@ -83,130 +83,21 @@ async def index_jira_issues( ) return 0, f"Connector with ID {connector_id} not found" - # Get the Jira credentials from the connector config - # Support both OAuth (preferred) and legacy API token authentication - config_data = connector.config.copy() - is_oauth = config_data.get("_token_encrypted", False) or config_data.get("access_token") + # Initialize Jira client with internal refresh capability + # Token refresh will happen automatically when needed + await task_logger.log_task_progress( + log_entry, + f"Initializing Jira client for connector {connector_id}", + {"stage": "client_initialization"}, + ) - if is_oauth: - # OAuth 2.0 authentication - from app.utils.oauth_security import TokenEncryption + logger.info(f"Initializing Jira client for connector {connector_id}") - if not config.SECRET_KEY: - await task_logger.log_task_failure( - log_entry, - f"SECRET_KEY not configured but tokens are marked as encrypted for connector {connector_id}", - "Missing SECRET_KEY for token decryption", - {"error_type": "MissingSecretKey"}, - ) - return 0, "SECRET_KEY not configured but tokens are marked as encrypted" - - try: - token_encryption = TokenEncryption(config.SECRET_KEY) - - # Decrypt access_token - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - logger.info( - f"Decrypted Jira access token for connector {connector_id}" - ) - - # Decrypt refresh_token if present - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - logger.info( - f"Decrypted Jira refresh token for connector {connector_id}" - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to decrypt Jira tokens for connector {connector_id}: {e!s}", - "Token decryption failed", - {"error_type": "TokenDecryptionError"}, - ) - return 0, f"Failed to decrypt Jira tokens: {e!s}" - - try: - from app.schemas.atlassian_auth_credentials import AtlassianAuthCredentialsBase - credentials = AtlassianAuthCredentialsBase.from_dict(config_data) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Invalid Jira OAuth credentials in connector {connector_id}", - str(e), - {"error_type": "InvalidCredentials"}, - ) - return 0, f"Invalid Jira OAuth credentials: {e!s}" - - # Check if credentials are expired and refresh if needed - if credentials.is_expired: - await task_logger.log_task_progress( - log_entry, - f"Jira credentials expired for connector {connector_id}, refreshing token", - {"stage": "token_refresh"}, - ) - - from app.routes.jira_add_connector_route import refresh_jira_token - - try: - connector = await refresh_jira_token(session, connector) - # Re-fetch credentials after refresh - config_data = connector.config.copy() - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - credentials = AtlassianAuthCredentialsBase.from_dict(config_data) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to refresh Jira token for connector {connector_id}: {e!s}", - "Token refresh failed", - {"error_type": "TokenRefreshError"}, - ) - return 0, f"Failed to refresh Jira token: {e!s}" - - # Initialize Jira client with OAuth credentials - await task_logger.log_task_progress( - log_entry, - f"Initializing Jira client with OAuth for connector {connector_id}", - {"stage": "client_initialization"}, - ) - - jira_client = JiraConnector( - base_url=credentials.base_url, - access_token=credentials.access_token, - cloud_id=credentials.cloud_id, - ) - else: - # Legacy API token authentication - jira_email = config_data.get("JIRA_EMAIL") - jira_api_token = config_data.get("JIRA_API_TOKEN") - jira_base_url = config_data.get("JIRA_BASE_URL") - - if not jira_email or not jira_api_token or not jira_base_url: - await task_logger.log_task_failure( - log_entry, - f"Jira credentials not found in connector config for connector {connector_id}", - "Missing Jira credentials", - {"error_type": "MissingCredentials"}, - ) - return 0, "Jira credentials not found in connector config" - - # Initialize Jira client with legacy credentials - await task_logger.log_task_progress( - log_entry, - f"Initializing Jira client with API token for connector {connector_id}", - {"stage": "client_initialization"}, - ) - - jira_client = JiraConnector( - base_url=jira_base_url, email=jira_email, api_token=jira_api_token - ) + # Create connector with session and connector_id for internal refresh + # Token refresh will happen automatically when needed + jira_client = JiraHistoryConnector( + session=session, connector_id=connector_id + ) # Calculate date range # Handle "undefined" strings from frontend @@ -231,7 +122,7 @@ async def index_jira_issues( # Get issues within date range try: - issues, error = jira_client.get_issues_by_date_range( + issues, error = await jira_client.get_issues_by_date_range( start_date=start_date_str, end_date=end_date_str, include_comments=True ) @@ -504,6 +395,10 @@ async def index_jira_issues( logger.info( f"JIRA indexing completed: {documents_indexed} new issues, {documents_skipped} skipped" ) + + # Clean up the connector + await jira_client.close() + return ( total_processed, None, @@ -518,6 +413,12 @@ async def index_jira_issues( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) + # Clean up the connector in case of error + if "jira_client" in locals(): + try: + await jira_client.close() + except Exception: + pass return 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() @@ -528,4 +429,10 @@ async def index_jira_issues( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True) + # Clean up the connector in case of error + if "jira_client" in locals(): + try: + await jira_client.close() + except Exception: + pass return 0, f"Failed to index JIRA issues: {e!s}" \ No newline at end of file