From b6710ae9aff43023fe1df6b51b1049ee5be72da3 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 2 Jun 2026 16:38:00 +0200 Subject: [PATCH] refactor(jira): remove dead legacy indexing and write tools (mcp-only now) --- .../agents/new_chat/tools/jira/__init__.py | 11 - .../new_chat/tools/jira/create_issue.py | 248 ------- .../new_chat/tools/jira/delete_issue.py | 210 ------ .../new_chat/tools/jira/update_issue.py | 255 ------- .../app/connectors/jira_connector.py | 648 ------------------ .../app/connectors/jira_history.py | 350 ---------- .../app/services/jira/__init__.py | 13 - .../app/services/jira/kb_sync_service.py | 257 ------- .../services/jira/tool_metadata_service.py | 332 --------- .../tasks/connector_indexers/jira_indexer.py | 364 ---------- .../connector_indexers/test_jira_parallel.py | 387 ----------- 11 files changed, 3075 deletions(-) delete mode 100644 surfsense_backend/app/agents/new_chat/tools/jira/__init__.py delete mode 100644 surfsense_backend/app/agents/new_chat/tools/jira/create_issue.py delete mode 100644 surfsense_backend/app/agents/new_chat/tools/jira/delete_issue.py delete mode 100644 surfsense_backend/app/agents/new_chat/tools/jira/update_issue.py delete mode 100644 surfsense_backend/app/connectors/jira_connector.py delete mode 100644 surfsense_backend/app/connectors/jira_history.py delete mode 100644 surfsense_backend/app/services/jira/__init__.py delete mode 100644 surfsense_backend/app/services/jira/kb_sync_service.py delete mode 100644 surfsense_backend/app/services/jira/tool_metadata_service.py delete mode 100644 surfsense_backend/app/tasks/connector_indexers/jira_indexer.py delete mode 100644 surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py diff --git a/surfsense_backend/app/agents/new_chat/tools/jira/__init__.py b/surfsense_backend/app/agents/new_chat/tools/jira/__init__.py deleted file mode 100644 index 768738118..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/jira/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Jira tools for creating, updating, and deleting issues.""" - -from .create_issue import create_create_jira_issue_tool -from .delete_issue import create_delete_jira_issue_tool -from .update_issue import create_update_jira_issue_tool - -__all__ = [ - "create_create_jira_issue_tool", - "create_delete_jira_issue_tool", - "create_update_jira_issue_tool", -] diff --git a/surfsense_backend/app/agents/new_chat/tools/jira/create_issue.py b/surfsense_backend/app/agents/new_chat/tools/jira/create_issue.py deleted file mode 100644 index 0b04f1642..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/jira/create_issue.py +++ /dev/null @@ -1,248 +0,0 @@ -import asyncio -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.agents.new_chat.tools.hitl import request_approval -from app.connectors.jira_history import JiraHistoryConnector -from app.db import async_session_maker -from app.services.jira import JiraToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_create_jira_issue_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """Factory function to create the create_jira_issue tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. Per-call sessions also - keep the request's outer transaction free of long-running Jira API - blocking. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Jira connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured create_jira_issue tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_jira_issue( - project_key: str, - summary: str, - issue_type: str = "Task", - description: str | None = None, - priority: str | None = None, - ) -> dict[str, Any]: - """Create a new issue in Jira. - - Use this tool when the user explicitly asks to create a new Jira issue/ticket. - - Args: - project_key: The Jira project key (e.g. "PROJ", "ENG"). - summary: Short, descriptive issue title. - issue_type: Issue type (default "Task"). Others: "Bug", "Story", "Epic". - description: Optional description body for the issue. - priority: Optional priority name (e.g. "High", "Medium", "Low"). - - Returns: - Dictionary with status, issue_key, and message. - - IMPORTANT: - - If status is "rejected", the user declined. Do NOT retry. - - If status is "insufficient_permissions", inform user to re-authenticate. - """ - logger.info( - f"create_jira_issue called: project_key='{project_key}', summary='{summary}'" - ) - - if search_space_id is None or user_id is None: - return {"status": "error", "message": "Jira tool not properly configured."} - - try: - async with async_session_maker() as db_session: - metadata_service = JiraToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - return { - "status": "auth_error", - "message": "All connected Jira accounts need re-authentication.", - "connector_type": "jira", - } - - result = request_approval( - action_type="jira_issue_creation", - tool_name="create_jira_issue", - params={ - "project_key": project_key, - "summary": summary, - "issue_type": issue_type, - "description": description, - "priority": priority, - "connector_id": connector_id, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_project_key = result.params.get("project_key", project_key) - final_summary = result.params.get("summary", summary) - final_issue_type = result.params.get("issue_type", issue_type) - final_description = result.params.get("description", description) - final_priority = result.params.get("priority", priority) - final_connector_id = result.params.get("connector_id", connector_id) - - if not final_summary or not final_summary.strip(): - return { - "status": "error", - "message": "Issue summary cannot be empty.", - } - if not final_project_key: - return {"status": "error", "message": "A project must be selected."} - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - actual_connector_id = final_connector_id - if actual_connector_id is None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.JIRA_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Jira connector found.", - } - actual_connector_id = connector.id - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == actual_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.JIRA_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Jira connector is invalid.", - } - - try: - jira_history = JiraHistoryConnector( - session=db_session, connector_id=actual_connector_id - ) - jira_client = await jira_history._get_jira_client() - api_result = await asyncio.to_thread( - jira_client.create_issue, - project_key=final_project_key, - summary=final_summary, - issue_type=final_issue_type, - description=final_description, - priority=final_priority, - ) - except Exception as api_err: - if "status code 403" in str(api_err).lower(): - try: - _conn = connector - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - pass - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - issue_key = api_result.get("key", "") - issue_url = ( - f"{jira_history._base_url}/browse/{issue_key}" - if jira_history._base_url and issue_key - else "" - ) - - kb_message_suffix = "" - try: - from app.services.jira import JiraKBSyncService - - kb_service = JiraKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - issue_id=issue_key, - issue_identifier=issue_key, - issue_title=final_summary, - description=final_description, - state="To Do", - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "issue_key": issue_key, - "issue_url": issue_url, - "message": f"Jira issue {issue_key} created successfully.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error(f"Error creating Jira issue: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while creating the issue.", - } - - return create_jira_issue diff --git a/surfsense_backend/app/agents/new_chat/tools/jira/delete_issue.py b/surfsense_backend/app/agents/new_chat/tools/jira/delete_issue.py deleted file mode 100644 index c41aedad9..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/jira/delete_issue.py +++ /dev/null @@ -1,210 +0,0 @@ -import asyncio -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.agents.new_chat.tools.hitl import request_approval -from app.connectors.jira_history import JiraHistoryConnector -from app.db import async_session_maker -from app.services.jira import JiraToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_jira_issue_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """Factory function to create the delete_jira_issue tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Jira connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured delete_jira_issue tool - """ - del db_session # per-call session — see docstring - - @tool - async def delete_jira_issue( - issue_title_or_key: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Delete a Jira issue. - - Use this tool when the user asks to delete or remove a Jira issue. - - Args: - issue_title_or_key: The issue key (e.g. "PROJ-42") or title. - delete_from_kb: Whether to also remove from the knowledge base. - - Returns: - Dictionary with status, message, and deleted_from_kb. - - IMPORTANT: - - If status is "rejected", do NOT retry. - - If status is "not_found", relay the message to the user. - - If status is "insufficient_permissions", inform user to re-authenticate. - """ - logger.info( - f"delete_jira_issue called: issue_title_or_key='{issue_title_or_key}'" - ) - - if search_space_id is None or user_id is None: - return {"status": "error", "message": "Jira tool not properly configured."} - - try: - async with async_session_maker() as db_session: - metadata_service = JiraToolMetadataService(db_session) - context = await metadata_service.get_deletion_context( - search_space_id, user_id, issue_title_or_key - ) - - if "error" in context: - error_msg = context["error"] - if context.get("auth_expired"): - return { - "status": "auth_error", - "message": error_msg, - "connector_id": context.get("connector_id"), - "connector_type": "jira", - } - if "not found" in error_msg.lower(): - return {"status": "not_found", "message": error_msg} - return {"status": "error", "message": error_msg} - - issue_data = context["issue"] - issue_key = issue_data["issue_id"] - document_id = issue_data["document_id"] - connector_id_from_context = context.get("account", {}).get("id") - - result = request_approval( - action_type="jira_issue_deletion", - tool_name="delete_jira_issue", - params={ - "issue_key": issue_key, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_issue_key = result.params.get("issue_key", issue_key) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this issue.", - } - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.JIRA_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Jira connector is invalid.", - } - - try: - jira_history = JiraHistoryConnector( - session=db_session, connector_id=final_connector_id - ) - jira_client = await jira_history._get_jira_client() - await asyncio.to_thread(jira_client.delete_issue, final_issue_key) - except Exception as api_err: - if "status code 403" in str(api_err).lower(): - try: - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - pass - return { - "status": "insufficient_permissions", - "connector_id": final_connector_id, - "message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - deleted_from_kb = False - if final_delete_from_kb and document_id: - try: - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - - message = f"Jira issue {final_issue_key} deleted successfully." - if deleted_from_kb: - message += " Also removed from the knowledge base." - - return { - "status": "success", - "issue_key": final_issue_key, - "deleted_from_kb": deleted_from_kb, - "message": message, - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error(f"Error deleting Jira issue: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while deleting the issue.", - } - - return delete_jira_issue diff --git a/surfsense_backend/app/agents/new_chat/tools/jira/update_issue.py b/surfsense_backend/app/agents/new_chat/tools/jira/update_issue.py deleted file mode 100644 index 0fd7b28b3..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/jira/update_issue.py +++ /dev/null @@ -1,255 +0,0 @@ -import asyncio -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.agents.new_chat.tools.hitl import request_approval -from app.connectors.jira_history import JiraHistoryConnector -from app.db import async_session_maker -from app.services.jira import JiraToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_update_jira_issue_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """Factory function to create the update_jira_issue tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Jira connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured update_jira_issue tool - """ - del db_session # per-call session — see docstring - - @tool - async def update_jira_issue( - issue_title_or_key: str, - new_summary: str | None = None, - new_description: str | None = None, - new_priority: str | None = None, - ) -> dict[str, Any]: - """Update an existing Jira issue. - - Use this tool when the user asks to modify, edit, or update a Jira issue. - - Args: - issue_title_or_key: The issue key (e.g. "PROJ-42") or title to identify the issue. - new_summary: Optional new title/summary for the issue. - new_description: Optional new description. - new_priority: Optional new priority name. - - Returns: - Dictionary with status and message. - - IMPORTANT: - - If status is "rejected", do NOT retry. - - If status is "not_found", relay the message and ask user to verify. - - If status is "insufficient_permissions", inform user to re-authenticate. - """ - logger.info( - f"update_jira_issue called: issue_title_or_key='{issue_title_or_key}'" - ) - - if search_space_id is None or user_id is None: - return {"status": "error", "message": "Jira tool not properly configured."} - - try: - async with async_session_maker() as db_session: - metadata_service = JiraToolMetadataService(db_session) - context = await metadata_service.get_update_context( - search_space_id, user_id, issue_title_or_key - ) - - if "error" in context: - error_msg = context["error"] - if context.get("auth_expired"): - return { - "status": "auth_error", - "message": error_msg, - "connector_id": context.get("connector_id"), - "connector_type": "jira", - } - if "not found" in error_msg.lower(): - return {"status": "not_found", "message": error_msg} - return {"status": "error", "message": error_msg} - - issue_data = context["issue"] - issue_key = issue_data["issue_id"] - document_id = issue_data.get("document_id") - connector_id_from_context = context.get("account", {}).get("id") - - result = request_approval( - action_type="jira_issue_update", - tool_name="update_jira_issue", - params={ - "issue_key": issue_key, - "document_id": document_id, - "new_summary": new_summary, - "new_description": new_description, - "new_priority": new_priority, - "connector_id": connector_id_from_context, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_issue_key = result.params.get("issue_key", issue_key) - final_summary = result.params.get("new_summary", new_summary) - final_description = result.params.get( - "new_description", new_description - ) - final_priority = result.params.get("new_priority", new_priority) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_document_id = result.params.get("document_id", document_id) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this issue.", - } - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.JIRA_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Jira connector is invalid.", - } - - fields: dict[str, Any] = {} - if final_summary: - fields["summary"] = final_summary - if final_description is not None: - fields["description"] = { - "type": "doc", - "version": 1, - "content": [ - { - "type": "paragraph", - "content": [ - {"type": "text", "text": final_description} - ], - } - ], - } - if final_priority: - fields["priority"] = {"name": final_priority} - - if not fields: - return {"status": "error", "message": "No changes specified."} - - try: - jira_history = JiraHistoryConnector( - session=db_session, connector_id=final_connector_id - ) - jira_client = await jira_history._get_jira_client() - await asyncio.to_thread( - jira_client.update_issue, final_issue_key, fields - ) - except Exception as api_err: - if "status code 403" in str(api_err).lower(): - try: - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - pass - return { - "status": "insufficient_permissions", - "connector_id": final_connector_id, - "message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - issue_url = ( - f"{jira_history._base_url}/browse/{final_issue_key}" - if jira_history._base_url and final_issue_key - else "" - ) - - kb_message_suffix = "" - if final_document_id: - try: - from app.services.jira import JiraKBSyncService - - kb_service = JiraKBSyncService(db_session) - kb_result = await kb_service.sync_after_update( - document_id=final_document_id, - issue_id=final_issue_key, - user_id=user_id, - search_space_id=search_space_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = ( - " The knowledge base will be updated in the next sync." - ) - except Exception as kb_err: - logger.warning(f"KB sync after update failed: {kb_err}") - kb_message_suffix = ( - " The knowledge base will be updated in the next sync." - ) - - return { - "status": "success", - "issue_key": final_issue_key, - "issue_url": issue_url, - "message": f"Jira issue {final_issue_key} updated successfully.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error(f"Error updating Jira issue: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while updating the issue.", - } - - return update_jira_issue diff --git a/surfsense_backend/app/connectors/jira_connector.py b/surfsense_backend/app/connectors/jira_connector.py deleted file mode 100644 index 772b9404c..000000000 --- a/surfsense_backend/app/connectors/jira_connector.py +++ /dev/null @@ -1,648 +0,0 @@ -""" -Jira Connector Module - -A module for retrieving data from Jira. -Allows fetching issue lists and their comments, projects and more. -Supports both OAuth 2.0 (preferred) and legacy API token authentication. -""" - -import base64 -from datetime import datetime -from typing import Any - -import requests - - -class JiraConnector: - """Class for retrieving data from Jira.""" - - def __init__( - self, - base_url: str | None = None, - access_token: str | None = None, - cloud_id: str | None = None, - email: str | None = None, - api_token: str | None = None, - ): - """ - Initialize the JiraConnector class. - - Args: - base_url: Jira instance base URL (e.g., 'https://yourcompany.atlassian.net') - access_token: OAuth 2.0 access token (preferred method) - cloud_id: Atlassian cloud ID (used with OAuth for API URL construction) - email: Jira account email address (legacy method, used with api_token) - api_token: Jira API token (legacy method, used with email) - """ - self.base_url = base_url.rstrip("/") if base_url else None - self.access_token = access_token - self.cloud_id = cloud_id - self.email = email - self.api_token = api_token - self.api_version = "3" # Jira Cloud API version - self._use_oauth = access_token is not None - - def set_oauth_credentials( - self, base_url: str, access_token: str, cloud_id: str | None = None - ) -> None: - """ - Set OAuth 2.0 credentials (preferred method). - - Args: - base_url: Jira instance base URL - access_token: OAuth 2.0 access token - cloud_id: Atlassian cloud ID (optional, used for API URL construction) - """ - self.base_url = base_url.rstrip("/") - self.access_token = access_token - self.cloud_id = cloud_id - self._use_oauth = True - - def set_credentials(self, base_url: str, email: str, api_token: str) -> None: - """ - Set the Jira credentials (legacy method using API token). - - Args: - base_url: Jira instance base URL - email: Jira account email address - api_token: Jira API token - """ - self.base_url = base_url.rstrip("/") - self.email = email - self.api_token = api_token - self._use_oauth = False - - def set_email(self, email: str) -> None: - """ - Set the Jira account email (legacy method). - - Args: - email: Jira account email address - """ - self.email = email - self._use_oauth = False - - def set_api_token(self, api_token: str) -> None: - """ - Set the Jira API token (legacy method). - - Args: - api_token: Jira API token - """ - self.api_token = api_token - self._use_oauth = False - - def get_headers(self) -> dict[str, str]: - """ - Get headers for Jira API requests. - - Uses OAuth Bearer token if available, otherwise falls back to Basic Auth. - - Returns: - Dictionary of headers - - Raises: - ValueError: If credentials have not been set - """ - if self._use_oauth: - # OAuth 2.0 authentication - if not self.base_url or not self.access_token: - raise ValueError( - "Jira OAuth credentials not initialized. Call set_oauth_credentials() first." - ) - - return { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}", - "Accept": "application/json", - } - else: - # Legacy Basic Auth - if not all([self.base_url, self.email, self.api_token]): - raise ValueError( - "Jira credentials not initialized. Call set_credentials() first." - ) - - # Create Basic Auth header using email:api_token - auth_str = f"{self.email}:{self.api_token}" - auth_bytes = auth_str.encode("utf-8") - auth_header = "Basic " + base64.b64encode(auth_bytes).decode("ascii") - - return { - "Content-Type": "application/json", - "Authorization": auth_header, - "Accept": "application/json", - } - - def make_api_request( - self, - endpoint: str, - params: dict[str, Any] | None = None, - method: str = "GET", - json_payload: dict[str, Any] | None = None, - ) -> dict[str, Any]: - """ - Make a request to the Jira API. - - Args: - endpoint: API endpoint (without base URL) - params: Query parameters for the request (optional) - method: HTTP method (GET or POST) - json_payload: JSON payload for POST requests (optional) - - Returns: - Response data from the API - - Raises: - ValueError: If credentials have not been set - Exception: If the API request fails - """ - headers = self.get_headers() - - # Construct API URL based on authentication method - if self._use_oauth and self.cloud_id: - # Use Atlassian API gateway with cloud_id for OAuth - url = f"https://api.atlassian.com/ex/jira/{self.cloud_id}/rest/api/{self.api_version}/{endpoint}" - else: - # Use direct base URL (works for both OAuth and legacy) - url = f"{self.base_url}/rest/api/{self.api_version}/{endpoint}" - - method_upper = method.upper() - if method_upper == "POST": - response = requests.post( - url, headers=headers, json=json_payload, timeout=500 - ) - elif method_upper == "PUT": - response = requests.put( - url, headers=headers, json=json_payload, timeout=500 - ) - elif method_upper == "DELETE": - response = requests.delete(url, headers=headers, params=params, timeout=500) - else: - response = requests.get(url, headers=headers, params=params, timeout=500) - - if response.status_code in (200, 201, 204): - if response.status_code == 204 or not response.text: - return {"status": "success"} - return response.json() - else: - raise Exception( - f"API request failed with status code {response.status_code}: {response.text}" - ) - - def get_all_projects(self) -> dict[str, Any]: - """ - Fetch all projects from Jira. - - Returns: - List of project objects - - Raises: - ValueError: If credentials have not been set - Exception: If the API request fails - """ - return self.make_api_request("project/search") - - def get_all_issues(self, project_key: str | None = None) -> list[dict[str, Any]]: - """ - Fetch all issues from Jira. - - Args: - project_key: Optional project key to filter issues (e.g., 'PROJ') - - Returns: - List of issue objects - - Raises: - ValueError: If credentials have not been set - Exception: If the API request fails - """ - jql = "ORDER BY created DESC" - if project_key: - jql = f'project = "{project_key}" ' + jql - - fields = [ - "summary", - "description", - "status", - "assignee", - "reporter", - "created", - "updated", - "priority", - "issuetype", - "project", - ] - - all_issues = [] - start_at = 0 - max_results = 100 - - all_issues = [] - start_at = 0 - - while True: - json_payload = { - "jql": jql, - "fields": fields, # API accepts list - "maxResults": max_results, - "startAt": start_at, - } - result = self.make_api_request( - "search/jql", json_payload=json_payload, method="POST" - ) - - if not isinstance(result, dict) or "issues" not in result: - raise Exception("Invalid response from Jira API") - - issues = result["issues"] - all_issues.extend(issues) - - print(f"Fetched {len(issues)} issues (startAt={start_at})") - - total = result.get("total", 0) - if start_at + len(issues) >= total: - break - - start_at += len(issues) - - return all_issues - - def get_issues_by_date_range( - self, - start_date: str, - end_date: str, - include_comments: bool = True, - project_key: str | None = None, - ) -> tuple[list[dict[str, Any]], str | None]: - """ - Fetch issues within a date range. - - Args: - start_date: Start date in YYYY-MM-DD format - end_date: End date in YYYY-MM-DD format (inclusive) - include_comments: Whether to include comments in the response - project_key: Optional project key to filter issues - - Returns: - Tuple containing (issues list, error message or None) - """ - try: - # Build JQL query for date range - # Query issues that were either created OR updated within the date range - # Use end_date + 1 day with < operator to include the full end date - from datetime import datetime, timedelta - - # Parse end_date and add 1 day for inclusive end date - end_date_obj = datetime.strptime(end_date, "%Y-%m-%d") - end_date_next = (end_date_obj + timedelta(days=1)).strftime("%Y-%m-%d") - - # Check both created and updated dates to catch all relevant issues - # Use 'created' and 'updated' (standard JQL field names) - date_filter = ( - f"(created >= '{start_date}' AND created < '{end_date_next}') " - f"OR (updated >= '{start_date}' AND updated < '{end_date_next}')" - ) - - jql = f"{date_filter} ORDER BY created DESC" - if project_key: - jql = f'project = "{project_key}" AND ({date_filter}) ORDER BY created DESC' - - # Define fields to retrieve - fields = [ - "summary", - "description", - "status", - "assignee", - "reporter", - "created", - "updated", - "priority", - "issuetype", - "project", - ] - - if include_comments: - fields.append("comment") - - params = { - "jql": jql, - "fields": ",".join(fields), - "maxResults": 100, - "startAt": 0, - } - - all_issues = [] - start_at = 0 - - while True: - params["startAt"] = start_at - - result = self.make_api_request("search/jql", params) - - if not isinstance(result, dict) or "issues" not in result: - return [], "Invalid response from Jira API" - - issues = result["issues"] - all_issues.extend(issues) - - # Check if there are more issues to fetch - total = result.get("total", 0) - if start_at + len(issues) >= total: - break - - start_at += len(issues) - - if not all_issues: - return [], "No issues found in the specified date range." - - return all_issues, None - - except Exception as e: - return [], f"Error fetching issues: {e!s}" - - def get_myself(self) -> dict[str, Any]: - """Fetch the current user's profile (health check).""" - return self.make_api_request("myself") - - def get_projects(self) -> list[dict[str, Any]]: - """Fetch all projects the user has access to.""" - result = self.make_api_request("project/search") - return result.get("values", []) - - def get_issue_types(self) -> list[dict[str, Any]]: - """Fetch all issue types.""" - return self.make_api_request("issuetype") - - def get_priorities(self) -> list[dict[str, Any]]: - """Fetch all priority levels.""" - return self.make_api_request("priority") - - def get_issue(self, issue_id_or_key: str) -> dict[str, Any]: - """Fetch a single issue by ID or key.""" - return self.make_api_request(f"issue/{issue_id_or_key}") - - def create_issue( - self, - project_key: str, - summary: str, - issue_type: str = "Task", - description: str | None = None, - priority: str | None = None, - assignee_id: str | None = None, - ) -> dict[str, Any]: - """Create a new Jira issue.""" - fields: dict[str, Any] = { - "project": {"key": project_key}, - "summary": summary, - "issuetype": {"name": issue_type}, - } - if description: - fields["description"] = { - "type": "doc", - "version": 1, - "content": [ - { - "type": "paragraph", - "content": [{"type": "text", "text": description}], - } - ], - } - if priority: - fields["priority"] = {"name": priority} - if assignee_id: - fields["assignee"] = {"accountId": assignee_id} - - return self.make_api_request( - "issue", method="POST", json_payload={"fields": fields} - ) - - def update_issue( - self, issue_id_or_key: str, fields: dict[str, Any] - ) -> dict[str, Any]: - """Update an existing Jira issue fields.""" - return self.make_api_request( - f"issue/{issue_id_or_key}", - method="PUT", - json_payload={"fields": fields}, - ) - - def delete_issue(self, issue_id_or_key: str) -> dict[str, Any]: - """Delete a Jira issue.""" - return self.make_api_request(f"issue/{issue_id_or_key}", method="DELETE") - - def get_transitions(self, issue_id_or_key: str) -> list[dict[str, Any]]: - """Get available transitions for an issue (for status changes).""" - result = self.make_api_request(f"issue/{issue_id_or_key}/transitions") - return result.get("transitions", []) - - def transition_issue( - self, issue_id_or_key: str, transition_id: str - ) -> dict[str, Any]: - """Transition an issue to a new status.""" - return self.make_api_request( - f"issue/{issue_id_or_key}/transitions", - method="POST", - json_payload={"transition": {"id": transition_id}}, - ) - - def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]: - """ - Format an issue for easier consumption. - - Args: - issue: The issue object from Jira API - - Returns: - Formatted issue dictionary - """ - fields = issue.get("fields", {}) - - # Extract basic issue details - formatted = { - "id": issue.get("id", ""), - "key": issue.get("key", ""), - "title": fields.get("summary", ""), - "description": fields.get("description", ""), - "status": ( - fields.get("status", {}).get("name", "Unknown") - if fields.get("status") - else "Unknown" - ), - "status_category": ( - fields.get("status", {}) - .get("statusCategory", {}) - .get("name", "Unknown") - if fields.get("status") - else "Unknown" - ), - "priority": ( - fields.get("priority", {}).get("name", "Unknown") - if fields.get("priority") - else "Unknown" - ), - "issue_type": ( - fields.get("issuetype", {}).get("name", "Unknown") - if fields.get("issuetype") - else "Unknown" - ), - "project": ( - fields.get("project", {}).get("key", "Unknown") - if fields.get("project") - else "Unknown" - ), - "created_at": fields.get("created", ""), - "updated_at": fields.get("updated", ""), - "reporter": ( - { - "account_id": ( - fields.get("reporter", {}).get("accountId", "") - if fields.get("reporter") - else "" - ), - "display_name": ( - fields.get("reporter", {}).get("displayName", "Unknown") - if fields.get("reporter") - else "Unknown" - ), - "email": ( - fields.get("reporter", {}).get("emailAddress", "") - if fields.get("reporter") - else "" - ), - } - if fields.get("reporter") - else {"account_id": "", "display_name": "Unknown", "email": ""} - ), - "assignee": ( - { - "account_id": fields.get("assignee", {}).get("accountId", ""), - "display_name": fields.get("assignee", {}).get( - "displayName", "Unknown" - ), - "email": fields.get("assignee", {}).get("emailAddress", ""), - } - if fields.get("assignee") - else None - ), - "comments": [], - } - - # Extract comments if available - if "comment" in fields and "comments" in fields["comment"]: - for comment in fields["comment"]["comments"]: - formatted_comment = { - "id": comment.get("id", ""), - "body": comment.get("body", ""), - "created_at": comment.get("created", ""), - "updated_at": comment.get("updated", ""), - "author": ( - { - "account_id": ( - comment.get("author", {}).get("accountId", "") - if comment.get("author") - else "" - ), - "display_name": ( - comment.get("author", {}).get("displayName", "Unknown") - if comment.get("author") - else "Unknown" - ), - "email": ( - comment.get("author", {}).get("emailAddress", "") - if comment.get("author") - else "" - ), - } - if comment.get("author") - else {"account_id": "", "display_name": "Unknown", "email": ""} - ), - } - formatted["comments"].append(formatted_comment) - - return formatted - - def format_issue_to_markdown(self, issue: dict[str, Any]) -> str: - """ - Convert an issue to markdown format. - - Args: - issue: The issue object (either raw or formatted) - - Returns: - Markdown string representation of the issue - """ - # Format the issue if it's not already formatted - if "key" not in issue: - issue = self.format_issue(issue) - - # Build the markdown content - markdown = ( - f"# {issue.get('key', 'No Key')}: {issue.get('title', 'No Title')}\n\n" - ) - - if issue.get("status"): - markdown += f"**Status:** {issue['status']}\n" - - if issue.get("priority"): - markdown += f"**Priority:** {issue['priority']}\n" - - if issue.get("issue_type"): - markdown += f"**Type:** {issue['issue_type']}\n" - - if issue.get("project"): - markdown += f"**Project:** {issue['project']}\n\n" - - if issue.get("assignee") and issue["assignee"].get("display_name"): - markdown += f"**Assignee:** {issue['assignee']['display_name']}\n" - - if issue.get("reporter") and issue["reporter"].get("display_name"): - markdown += f"**Reporter:** {issue['reporter']['display_name']}\n" - - if issue.get("created_at"): - created_date = self.format_date(issue["created_at"]) - markdown += f"**Created:** {created_date}\n" - - if issue.get("updated_at"): - updated_date = self.format_date(issue["updated_at"]) - markdown += f"**Updated:** {updated_date}\n\n" - - if issue.get("description"): - markdown += f"## Description\n\n{issue['description']}\n\n" - - if issue.get("comments"): - markdown += f"## Comments ({len(issue['comments'])})\n\n" - - for comment in issue["comments"]: - author_name = "Unknown" - if comment.get("author") and comment["author"].get("display_name"): - author_name = comment["author"]["display_name"] - - comment_date = "Unknown date" - if comment.get("created_at"): - comment_date = self.format_date(comment["created_at"]) - - markdown += f"### {author_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n" - - return markdown - - @staticmethod - def format_date(iso_date: str) -> str: - """ - Format an ISO date string to a more readable format. - - Args: - iso_date: ISO format date string - - Returns: - Formatted date string - """ - if not iso_date or not isinstance(iso_date, str): - return "Unknown date" - - try: - # Jira dates are typically in format: 2023-01-01T12:00:00.000+0000 - dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00")) - return dt.strftime("%Y-%m-%d %H:%M:%S") - except ValueError: - return iso_date diff --git a/surfsense_backend/app/connectors/jira_history.py b/surfsense_backend/app/connectors/jira_history.py deleted file mode 100644 index 30162964e..000000000 --- a/surfsense_backend/app/connectors/jira_history.py +++ /dev/null @@ -1,350 +0,0 @@ -""" -Jira OAuth Connector. - -Handles OAuth-based authentication and token refresh for Jira API access. -Supports both OAuth 2.0 (preferred) and legacy API token authentication. -""" - -import logging -from typing import Any - -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.config import config -from app.connectors.jira_connector import JiraConnector -from app.db import SearchSourceConnector -from app.schemas.atlassian_auth_credentials import AtlassianAuthCredentialsBase -from app.utils.oauth_security import TokenEncryption - -logger = logging.getLogger(__name__) - - -class JiraHistoryConnector: - """ - Jira connector with OAuth support and automatic token refresh. - - This connector uses OAuth 2.0 access tokens to authenticate with the - Jira API. It automatically refreshes expired tokens when needed. - Also supports legacy API token authentication for backward compatibility. - """ - - def __init__( - self, - session: AsyncSession, - connector_id: int, - credentials: AtlassianAuthCredentialsBase | None = None, - ): - """ - Initialize the JiraHistoryConnector with auto-refresh capability. - - Args: - session: Database session for updating connector - connector_id: Connector ID for direct updates - credentials: Jira OAuth credentials (optional, will be loaded from DB if not provided) - """ - self._session = session - self._connector_id = connector_id - self._credentials = credentials - self._cloud_id: str | None = None - self._base_url: str | None = None - self._jira_client: JiraConnector | None = None - self._use_oauth = True - self._legacy_email: str | None = None - self._legacy_api_token: str | None = None - - async def _get_valid_token(self) -> str: - """ - Get valid Jira access token, refreshing if needed. - - Returns: - Valid access token - - Raises: - ValueError: If credentials are missing or invalid - Exception: If token refresh fails - """ - # Load credentials from DB if not provided - if self._credentials is None: - result = await self._session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == self._connector_id - ) - ) - connector = result.scalars().first() - - if not connector: - raise ValueError(f"Connector {self._connector_id} not found") - - config_data = connector.config.copy() - - # Check if using OAuth or legacy API token - is_oauth = config_data.get("_token_encrypted", False) or config_data.get( - "access_token" - ) - - if is_oauth: - # OAuth 2.0 authentication - # Check if access_token exists before processing - raw_access_token = config_data.get("access_token") - if not raw_access_token: - raise ValueError( - "Jira access token not found. " - "Please reconnect your Jira account." - ) - - if not config.SECRET_KEY: - raise ValueError( - "SECRET_KEY not configured but tokens are marked as encrypted" - ) - - try: - token_encryption = TokenEncryption(config.SECRET_KEY) - - # Decrypt access_token - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - logger.info( - f"Decrypted Jira access token for connector {self._connector_id}" - ) - - # Decrypt refresh_token if present - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - logger.info( - f"Decrypted Jira refresh token for connector {self._connector_id}" - ) - except Exception as e: - logger.error( - f"Failed to decrypt Jira credentials for connector {self._connector_id}: {e!s}" - ) - raise ValueError( - f"Failed to decrypt Jira credentials: {e!s}" - ) from e - - # Final validation after decryption - final_token = config_data.get("access_token") - if not final_token or ( - isinstance(final_token, str) and not final_token.strip() - ): - raise ValueError( - "Jira access token is invalid or empty. " - "Please reconnect your Jira account." - ) - - try: - self._credentials = AtlassianAuthCredentialsBase.from_dict( - config_data - ) - self._cloud_id = config_data.get("cloud_id") - self._base_url = config_data.get("base_url") - self._use_oauth = True - except Exception as e: - raise ValueError(f"Invalid Jira OAuth credentials: {e!s}") from e - else: - # Legacy API token authentication - self._legacy_email = config_data.get("JIRA_EMAIL") - self._legacy_api_token = config_data.get("JIRA_API_TOKEN") - self._base_url = config_data.get("JIRA_BASE_URL") - self._use_oauth = False - - if ( - not self._legacy_email - or not self._legacy_api_token - or not self._base_url - ): - raise ValueError("Jira credentials not found in connector config") - - # Check if token is expired and refreshable (only for OAuth) - if ( - self._use_oauth - and self._credentials.is_expired - and self._credentials.is_refreshable - ): - try: - logger.info( - f"Jira token expired for connector {self._connector_id}, refreshing..." - ) - - # Get connector for refresh - result = await self._session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == self._connector_id - ) - ) - connector = result.scalars().first() - - if not connector: - raise RuntimeError( - f"Connector {self._connector_id} not found; cannot refresh token." - ) - - # Lazy import to avoid circular dependency - from app.routes.jira_add_connector_route import refresh_jira_token - - connector = await refresh_jira_token(self._session, connector) - - # Reload credentials after refresh - config_data = connector.config.copy() - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - - self._credentials = AtlassianAuthCredentialsBase.from_dict(config_data) - self._cloud_id = config_data.get("cloud_id") - self._base_url = config_data.get("base_url") - - # Invalidate cached client so it's recreated with new token - self._jira_client = None - - logger.info( - f"Successfully refreshed Jira token for connector {self._connector_id}" - ) - except Exception as e: - logger.error( - f"Failed to refresh Jira token for connector {self._connector_id}: {e!s}" - ) - raise Exception( - f"Failed to refresh Jira OAuth credentials: {e!s}" - ) from e - - if self._use_oauth: - return self._credentials.access_token - else: - # For legacy auth, return empty string (not used for token-based auth) - return "" - - async def _get_jira_client(self) -> JiraConnector: - """ - Get or create JiraConnector with valid credentials. - - Returns: - JiraConnector instance - """ - if self._jira_client is None: - if self._use_oauth: - # Ensure we have valid token (will refresh if needed) - await self._get_valid_token() - - self._jira_client = JiraConnector( - base_url=self._base_url, - access_token=self._credentials.access_token, - cloud_id=self._cloud_id, - ) - else: - # Legacy API token authentication - self._jira_client = JiraConnector( - base_url=self._base_url, - email=self._legacy_email, - api_token=self._legacy_api_token, - ) - else: - # If OAuth, refresh token if expired before returning client - if self._use_oauth: - await self._get_valid_token() - # Update client with new token if it was refreshed - if self._credentials: - self._jira_client.set_oauth_credentials( - base_url=self._base_url or "", - access_token=self._credentials.access_token, - cloud_id=self._cloud_id, - ) - - return self._jira_client - - async def get_issues_by_date_range( - self, - start_date: str, - end_date: str, - include_comments: bool = True, - project_key: str | None = None, - ) -> tuple[list[dict[str, Any]], str | None]: - """ - Fetch issues within a date range. - This method wraps JiraConnector.get_issues_by_date_range() with automatic token refresh. - - Args: - start_date: Start date in YYYY-MM-DD format - end_date: End date in YYYY-MM-DD format (inclusive) - include_comments: Whether to include comments in the response - project_key: Optional project key to filter issues - - Returns: - Tuple containing (issues list, error message or None) - """ - # Ensure token is valid (will refresh if needed) - if self._use_oauth: - await self._get_valid_token() - - # Get client with valid credentials - client = await self._get_jira_client() - - # JiraConnector methods are synchronous, so we call them directly - # Token refresh has already been handled above - return client.get_issues_by_date_range( - start_date=start_date, - end_date=end_date, - include_comments=include_comments, - project_key=project_key, - ) - - def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]: - """ - Format an issue for easier consumption. - Wraps JiraConnector.format_issue(). - - Args: - issue: The issue object from Jira API - - Returns: - Formatted issue dictionary - """ - # This is a synchronous method that doesn't need token refresh - # since it just formats data that's already been fetched - if self._jira_client is None: - # Create a minimal client just for formatting (doesn't need credentials) - self._jira_client = JiraConnector() - return self._jira_client.format_issue(issue) - - def format_issue_to_markdown(self, issue: dict[str, Any]) -> str: - """ - Convert an issue to markdown format. - Wraps JiraConnector.format_issue_to_markdown(). - - Args: - issue: The issue object (either raw or formatted) - - Returns: - Markdown string representation of the issue - """ - # This is a synchronous method that doesn't need token refresh - # since it just formats data that's already been fetched - if self._jira_client is None: - # Create a minimal client just for formatting (doesn't need credentials) - self._jira_client = JiraConnector() - return self._jira_client.format_issue_to_markdown(issue) - - async def close(self): - """Close any resources (currently no-op for JiraConnector).""" - # JiraConnector doesn't maintain persistent connections, so nothing to close - self._jira_client = None - - async def __aenter__(self): - """Async context manager entry.""" - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - """Async context manager exit.""" - await self.close() diff --git a/surfsense_backend/app/services/jira/__init__.py b/surfsense_backend/app/services/jira/__init__.py deleted file mode 100644 index fad49b68d..000000000 --- a/surfsense_backend/app/services/jira/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from app.services.jira.kb_sync_service import JiraKBSyncService -from app.services.jira.tool_metadata_service import ( - JiraIssue, - JiraToolMetadataService, - JiraWorkspace, -) - -__all__ = [ - "JiraIssue", - "JiraKBSyncService", - "JiraToolMetadataService", - "JiraWorkspace", -] diff --git a/surfsense_backend/app/services/jira/kb_sync_service.py b/surfsense_backend/app/services/jira/kb_sync_service.py deleted file mode 100644 index 37001a476..000000000 --- a/surfsense_backend/app/services/jira/kb_sync_service.py +++ /dev/null @@ -1,257 +0,0 @@ -import asyncio -import logging -from datetime import datetime - -from sqlalchemy.ext.asyncio import AsyncSession - -from app.connectors.jira_history import JiraHistoryConnector -from app.db import Document, DocumentType -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) - -logger = logging.getLogger(__name__) - - -class JiraKBSyncService: - """Syncs Jira issue documents to the knowledge base after HITL actions.""" - - def __init__(self, db_session: AsyncSession): - self.db_session = db_session - - async def sync_after_create( - self, - issue_id: str, - issue_identifier: str, - issue_title: str, - description: str | None, - state: str | None, - connector_id: int, - search_space_id: int, - user_id: str, - ) -> dict: - from app.tasks.connector_indexers.base import ( - check_document_by_unique_identifier, - check_duplicate_document_by_hash, - get_current_timestamp, - safe_set_chunks, - ) - - try: - unique_hash = generate_unique_identifier_hash( - DocumentType.JIRA_CONNECTOR, issue_id, search_space_id - ) - - existing = await check_document_by_unique_identifier( - self.db_session, unique_hash - ) - if existing: - logger.info( - "Document for Jira issue %s already exists (doc_id=%s), skipping", - issue_identifier, - existing.id, - ) - return {"status": "success"} - - indexable_content = (description or "").strip() - if not indexable_content: - indexable_content = f"Jira Issue {issue_identifier}: {issue_title}" - - issue_content = ( - f"# {issue_identifier}: {issue_title}\n\n{indexable_content}" - ) - - content_hash = generate_content_hash(issue_content, search_space_id) - - with self.db_session.no_autoflush: - dup = await check_duplicate_document_by_hash( - self.db_session, content_hash - ) - if dup: - content_hash = unique_hash - - from app.services.llm_service import get_user_long_context_llm - - user_llm = await get_user_long_context_llm( - self.db_session, - user_id, - search_space_id, - disable_streaming=True, - ) - - doc_metadata_for_summary = { - "issue_id": issue_identifier, - "issue_title": issue_title, - "document_type": "Jira Issue", - "connector_type": "Jira", - } - - if user_llm: - summary_content, summary_embedding = await generate_document_summary( - issue_content, user_llm, doc_metadata_for_summary - ) - else: - summary_content = ( - f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}" - ) - summary_embedding = await asyncio.to_thread(embed_text, summary_content) - - chunks = await create_document_chunks(issue_content) - now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - document = Document( - title=f"{issue_identifier}: {issue_title}", - document_type=DocumentType.JIRA_CONNECTOR, - document_metadata={ - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state or "Unknown", - "indexed_at": now_str, - "connector_id": connector_id, - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_hash, - embedding=summary_embedding, - search_space_id=search_space_id, - connector_id=connector_id, - updated_at=get_current_timestamp(), - created_by_id=user_id, - ) - - self.db_session.add(document) - await self.db_session.flush() - await safe_set_chunks(self.db_session, document, chunks) - await self.db_session.commit() - - logger.info( - "KB sync after create succeeded: doc_id=%s, issue=%s", - document.id, - issue_identifier, - ) - return {"status": "success"} - - except Exception as e: - error_str = str(e).lower() - if ( - "duplicate key value violates unique constraint" in error_str - or "uniqueviolationerror" in error_str - ): - await self.db_session.rollback() - return {"status": "error", "message": "Duplicate document detected"} - - logger.error( - "KB sync after create failed for issue %s: %s", - issue_identifier, - e, - exc_info=True, - ) - await self.db_session.rollback() - return {"status": "error", "message": str(e)} - - async def sync_after_update( - self, - document_id: int, - issue_id: str, - user_id: str, - search_space_id: int, - ) -> dict: - from app.tasks.connector_indexers.base import ( - get_current_timestamp, - safe_set_chunks, - ) - - try: - document = await self.db_session.get(Document, document_id) - if not document: - return {"status": "not_indexed"} - - connector_id = document.connector_id - if not connector_id: - return {"status": "error", "message": "Document has no connector_id"} - - jira_history = JiraHistoryConnector( - session=self.db_session, connector_id=connector_id - ) - jira_client = await jira_history._get_jira_client() - issue_raw = await asyncio.to_thread(jira_client.get_issue, issue_id) - formatted = jira_client.format_issue(issue_raw) - issue_content = jira_client.format_issue_to_markdown(formatted) - - if not issue_content: - return {"status": "error", "message": "Issue produced empty content"} - - issue_identifier = formatted.get("key", "") - issue_title = formatted.get("title", "") - state = formatted.get("status", "Unknown") - comment_count = len(formatted.get("comments", [])) - - from app.services.llm_service import get_user_long_context_llm - - user_llm = await get_user_long_context_llm( - self.db_session, user_id, search_space_id, disable_streaming=True - ) - - if user_llm: - doc_meta = { - "issue_key": issue_identifier, - "issue_title": issue_title, - "status": state, - "document_type": "Jira Issue", - "connector_type": "Jira", - } - summary_content, summary_embedding = await generate_document_summary( - issue_content, user_llm, doc_meta - ) - else: - summary_content = ( - f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}" - ) - summary_embedding = await asyncio.to_thread(embed_text, summary_content) - - chunks = await create_document_chunks(issue_content) - - document.title = f"{issue_identifier}: {issue_title}" - document.content = summary_content - document.content_hash = generate_content_hash( - issue_content, search_space_id - ) - document.embedding = summary_embedding - - from sqlalchemy.orm.attributes import flag_modified - - document.document_metadata = { - **(document.document_metadata or {}), - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "comment_count": comment_count, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "connector_id": connector_id, - } - flag_modified(document, "document_metadata") - await safe_set_chunks(self.db_session, document, chunks) - document.updated_at = get_current_timestamp() - - await self.db_session.commit() - - logger.info( - "KB sync successful for document %s (%s: %s)", - document_id, - issue_identifier, - issue_title, - ) - return {"status": "success"} - - except Exception as e: - logger.error( - "KB sync failed for document %s: %s", document_id, e, exc_info=True - ) - await self.db_session.rollback() - return {"status": "error", "message": str(e)} diff --git a/surfsense_backend/app/services/jira/tool_metadata_service.py b/surfsense_backend/app/services/jira/tool_metadata_service.py deleted file mode 100644 index cbc89e7be..000000000 --- a/surfsense_backend/app/services/jira/tool_metadata_service.py +++ /dev/null @@ -1,332 +0,0 @@ -import asyncio -import logging -from dataclasses import dataclass - -from sqlalchemy import and_, func, or_ -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select -from sqlalchemy.orm.attributes import flag_modified - -from app.connectors.jira_history import JiraHistoryConnector -from app.db import ( - Document, - DocumentType, - SearchSourceConnector, - SearchSourceConnectorType, -) - -logger = logging.getLogger(__name__) - - -@dataclass -class JiraWorkspace: - """Represents a Jira connector as a workspace for tool context.""" - - id: int - name: str - base_url: str - - @classmethod - def from_connector(cls, connector: SearchSourceConnector) -> "JiraWorkspace": - return cls( - id=connector.id, - name=connector.name, - base_url=connector.config.get("base_url", ""), - ) - - def to_dict(self) -> dict: - return { - "id": self.id, - "name": self.name, - "base_url": self.base_url, - } - - -@dataclass -class JiraIssue: - """Represents an indexed Jira issue resolved from the knowledge base.""" - - issue_id: str - issue_identifier: str - issue_title: str - state: str - connector_id: int - document_id: int - indexed_at: str | None - - @classmethod - def from_document(cls, document: Document) -> "JiraIssue": - meta = document.document_metadata or {} - return cls( - issue_id=meta.get("issue_id", ""), - issue_identifier=meta.get("issue_identifier", ""), - issue_title=meta.get("issue_title", document.title), - state=meta.get("state", ""), - connector_id=document.connector_id, - document_id=document.id, - indexed_at=meta.get("indexed_at"), - ) - - def to_dict(self) -> dict: - return { - "issue_id": self.issue_id, - "issue_identifier": self.issue_identifier, - "issue_title": self.issue_title, - "state": self.state, - "connector_id": self.connector_id, - "document_id": self.document_id, - "indexed_at": self.indexed_at, - } - - -class JiraToolMetadataService: - """Builds interrupt context for Jira HITL tools.""" - - def __init__(self, db_session: AsyncSession): - self._db_session = db_session - - async def _check_account_health(self, connector: SearchSourceConnector) -> bool: - """Check if the Jira connector auth is still valid. - - Returns True if auth is expired/invalid, False if healthy. - """ - try: - jira_history = JiraHistoryConnector( - session=self._db_session, connector_id=connector.id - ) - jira_client = await jira_history._get_jira_client() - await asyncio.to_thread(jira_client.get_myself) - return False - except Exception as e: - logger.warning("Jira connector %s health check failed: %s", connector.id, e) - try: - connector.config = {**connector.config, "auth_expired": True} - flag_modified(connector, "config") - await self._db_session.commit() - await self._db_session.refresh(connector) - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - connector.id, - exc_info=True, - ) - return True - - async def get_creation_context(self, search_space_id: int, user_id: str) -> dict: - """Return context needed to create a new Jira issue. - - Fetches all connected Jira accounts, and for the first healthy one - fetches projects, issue types, and priorities. - """ - connectors = await self._get_all_jira_connectors(search_space_id, user_id) - if not connectors: - return {"error": "No Jira account connected"} - - accounts = [] - projects = [] - issue_types = [] - priorities = [] - fetched_context = False - - for connector in connectors: - auth_expired = await self._check_account_health(connector) - workspace = JiraWorkspace.from_connector(connector) - account_info = { - **workspace.to_dict(), - "auth_expired": auth_expired, - } - accounts.append(account_info) - - if not auth_expired and not fetched_context: - try: - jira_history = JiraHistoryConnector( - session=self._db_session, connector_id=connector.id - ) - jira_client = await jira_history._get_jira_client() - raw_projects = await asyncio.to_thread(jira_client.get_projects) - projects = [ - {"id": p.get("id"), "key": p.get("key"), "name": p.get("name")} - for p in raw_projects - ] - raw_types = await asyncio.to_thread(jira_client.get_issue_types) - seen_type_names: set[str] = set() - issue_types = [] - for t in raw_types: - if t.get("subtask", False): - continue - name = t.get("name") - if name not in seen_type_names: - seen_type_names.add(name) - issue_types.append({"id": t.get("id"), "name": name}) - raw_priorities = await asyncio.to_thread(jira_client.get_priorities) - priorities = [ - {"id": p.get("id"), "name": p.get("name")} - for p in raw_priorities - ] - fetched_context = True - except Exception as e: - logger.warning( - "Failed to fetch Jira context for connector %s: %s", - connector.id, - e, - ) - - return { - "accounts": accounts, - "projects": projects, - "issue_types": issue_types, - "priorities": priorities, - } - - async def get_update_context( - self, search_space_id: int, user_id: str, issue_ref: str - ) -> dict: - """Return context needed to update an indexed Jira issue. - - Resolves the issue from the KB, then fetches current details from the Jira API. - """ - document = await self._resolve_issue(search_space_id, user_id, issue_ref) - if not document: - return { - "error": f"Issue '{issue_ref}' not found in your synced Jira issues. " - "Please make sure the issue is indexed in your knowledge base." - } - - connector = await self._get_connector_for_document(document, user_id) - if not connector: - return {"error": "Connector not found or access denied"} - - auth_expired = await self._check_account_health(connector) - if auth_expired: - return { - "error": "Jira authentication has expired. Please re-authenticate.", - "auth_expired": True, - "connector_id": connector.id, - } - - workspace = JiraWorkspace.from_connector(connector) - issue = JiraIssue.from_document(document) - - try: - jira_history = JiraHistoryConnector( - session=self._db_session, connector_id=connector.id - ) - jira_client = await jira_history._get_jira_client() - issue_data = await asyncio.to_thread(jira_client.get_issue, issue.issue_id) - formatted = jira_client.format_issue(issue_data) - except Exception as e: - error_str = str(e).lower() - if ( - "401" in error_str - or "403" in error_str - or "authentication" in error_str - ): - return { - "error": f"Failed to fetch Jira issue: {e!s}", - "auth_expired": True, - "connector_id": connector.id, - } - return {"error": f"Failed to fetch Jira issue: {e!s}"} - - return { - "account": {**workspace.to_dict(), "auth_expired": False}, - "issue": { - "issue_id": formatted.get("key", issue.issue_id), - "issue_identifier": formatted.get("key", issue.issue_identifier), - "issue_title": formatted.get("title", issue.issue_title), - "state": formatted.get("status", "Unknown"), - "priority": formatted.get("priority", "Unknown"), - "issue_type": formatted.get("issue_type", "Unknown"), - "assignee": formatted.get("assignee"), - "description": formatted.get("description"), - "project": formatted.get("project", ""), - "document_id": issue.document_id, - "indexed_at": issue.indexed_at, - }, - } - - async def get_deletion_context( - self, search_space_id: int, user_id: str, issue_ref: str - ) -> dict: - """Return context needed to delete a Jira issue (KB metadata only, no API call).""" - document = await self._resolve_issue(search_space_id, user_id, issue_ref) - if not document: - return { - "error": f"Issue '{issue_ref}' not found in your synced Jira issues. " - "Please make sure the issue is indexed in your knowledge base." - } - - connector = await self._get_connector_for_document(document, user_id) - if not connector: - return {"error": "Connector not found or access denied"} - - auth_expired = connector.config.get("auth_expired", False) - workspace = JiraWorkspace.from_connector(connector) - issue = JiraIssue.from_document(document) - - return { - "account": {**workspace.to_dict(), "auth_expired": auth_expired}, - "issue": issue.to_dict(), - } - - async def _resolve_issue( - self, search_space_id: int, user_id: str, issue_ref: str - ) -> Document | None: - """Resolve an issue from KB: issue_identifier -> issue_title -> document.title.""" - ref_lower = issue_ref.lower() - - result = await self._db_session.execute( - select(Document) - .join( - SearchSourceConnector, Document.connector_id == SearchSourceConnector.id - ) - .filter( - and_( - Document.search_space_id == search_space_id, - Document.document_type == DocumentType.JIRA_CONNECTOR, - SearchSourceConnector.user_id == user_id, - or_( - func.lower( - Document.document_metadata.op("->>")("issue_identifier") - ) - == ref_lower, - func.lower(Document.document_metadata.op("->>")("issue_title")) - == ref_lower, - func.lower(Document.title) == ref_lower, - ), - ) - ) - .order_by(Document.updated_at.desc().nullslast()) - .limit(1) - ) - return result.scalars().first() - - async def _get_all_jira_connectors( - self, search_space_id: int, user_id: str - ) -> list[SearchSourceConnector]: - result = await self._db_session.execute( - select(SearchSourceConnector).filter( - and_( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.JIRA_CONNECTOR, - ) - ) - ) - return result.scalars().all() - - async def _get_connector_for_document( - self, document: Document, user_id: str - ) -> SearchSourceConnector | None: - if not document.connector_id: - return None - result = await self._db_session.execute( - select(SearchSourceConnector).filter( - and_( - SearchSourceConnector.id == document.connector_id, - SearchSourceConnector.user_id == user_id, - ) - ) - ) - return result.scalars().first() diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py deleted file mode 100644 index 4d5cbb9f1..000000000 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ /dev/null @@ -1,364 +0,0 @@ -"""Jira connector indexer using the unified parallel indexing pipeline.""" - -import contextlib -from collections.abc import Awaitable, Callable - -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession - -from app.connectors.jira_history import JiraHistoryConnector -from app.db import DocumentType, SearchSourceConnectorType -from app.indexing_pipeline.connector_document import ConnectorDocument -from app.indexing_pipeline.document_hashing import compute_content_hash -from app.indexing_pipeline.indexing_pipeline_service import ( - IndexingPipelineService, - PlaceholderInfo, -) -from app.services.llm_service import get_user_long_context_llm -from app.services.task_logging_service import TaskLoggingService - -from .base import ( - calculate_date_range, - check_duplicate_document_by_hash, - get_connector_by_id, - logger, - update_connector_last_indexed, -) - -HeartbeatCallbackType = Callable[[int], Awaitable[None]] -HEARTBEAT_INTERVAL_SECONDS = 30 - - -def _build_connector_doc( - issue: dict, - formatted_issue: dict, - issue_content: str, - *, - connector_id: int, - search_space_id: int, - user_id: str, - enable_summary: bool, -) -> ConnectorDocument: - """Map a raw Jira issue dict to a ConnectorDocument.""" - issue_id = issue.get("key", "") - issue_identifier = issue.get("key", "") - issue_title = issue.get("id", "") - state = formatted_issue.get("status", "Unknown") - priority = formatted_issue.get("priority", "Unknown") - comment_count = len(formatted_issue.get("comments", [])) - - metadata = { - "issue_id": issue_id, - "issue_identifier": issue_identifier, - "issue_title": issue_title, - "state": state, - "priority": priority, - "comment_count": comment_count, - "connector_id": connector_id, - "document_type": "Jira Issue", - "connector_type": "Jira", - } - - fallback_summary = ( - f"Jira Issue {issue_identifier}: {issue_title}\n\n" - f"Status: {state}\n\n{issue_content}" - ) - - return ConnectorDocument( - title=f"{issue_identifier}: {issue_title}", - source_markdown=issue_content, - unique_id=issue_id, - document_type=DocumentType.JIRA_CONNECTOR, - search_space_id=search_space_id, - connector_id=connector_id, - created_by_id=user_id, - should_summarize=enable_summary, - fallback_summary=fallback_summary, - metadata=metadata, - ) - - -async def index_jira_issues( - session: AsyncSession, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str | None = None, - end_date: str | None = None, - update_last_indexed: bool = True, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, int, str | None]: - """Index Jira issues and comments.""" - task_logger = TaskLoggingService(session, search_space_id) - log_entry = await task_logger.log_task_start( - task_name="jira_issues_indexing", - source="connector_indexing_task", - message=f"Starting Jira issues indexing for connector {connector_id}", - metadata={ - "connector_id": connector_id, - "user_id": str(user_id), - "start_date": start_date, - "end_date": end_date, - }, - ) - - try: - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.JIRA_CONNECTOR - ) - - if not connector: - await task_logger.log_task_failure( - log_entry, - f"Connector with ID {connector_id} not found", - "Connector not found", - {"error_type": "ConnectorNotFound"}, - ) - return 0, 0, f"Connector with ID {connector_id} not found" - - await task_logger.log_task_progress( - log_entry, - f"Initializing Jira client for connector {connector_id}", - {"stage": "client_initialization"}, - ) - jira_client = JiraHistoryConnector(session=session, connector_id=connector_id) - - if start_date == "undefined" or start_date == "": - start_date = None - if end_date == "undefined" or end_date == "": - end_date = None - - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Jira issues from {start_date_str} to {end_date_str}", - { - "stage": "fetching_issues", - "start_date": start_date_str, - "end_date": end_date_str, - }, - ) - - try: - issues, error = await jira_client.get_issues_by_date_range( - start_date=start_date_str, end_date=end_date_str, include_comments=True - ) - - if error: - if "No issues found" in error: - logger.info(f"No Jira issues found: {error}") - if update_last_indexed: - await update_connector_last_indexed( - session, connector, update_last_indexed - ) - await session.commit() - logger.info( - f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found" - ) - - await task_logger.log_task_success( - log_entry, - f"No Jira issues found in date range {start_date_str} to {end_date_str}", - {"issues_found": 0}, - ) - await jira_client.close() - return 0, 0, None - else: - logger.error(f"Failed to get Jira issues: {error}") - await task_logger.log_task_failure( - log_entry, - f"Failed to get Jira issues: {error}", - "API Error", - {"error_type": "APIError"}, - ) - await jira_client.close() - return 0, 0, f"Failed to get Jira issues: {error}" - - logger.info(f"Retrieved {len(issues)} issues from Jira API") - - except Exception as e: - logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True) - await jira_client.close() - return 0, 0, f"Error fetching Jira issues: {e!s}" - - if not issues: - logger.info("No Jira issues found for the specified date range") - if update_last_indexed: - await update_connector_last_indexed( - session, connector, update_last_indexed - ) - await session.commit() - await jira_client.close() - return 0, 0, None - - # ── Create placeholders for instant UI feedback ─────────────── - pipeline = IndexingPipelineService(session) - placeholders = [ - PlaceholderInfo( - title=f"{issue.get('key', '')}: {issue.get('id', '')}", - document_type=DocumentType.JIRA_CONNECTOR, - unique_id=issue.get("key", ""), - search_space_id=search_space_id, - connector_id=connector_id, - created_by_id=user_id, - metadata={ - "issue_id": issue.get("key", ""), - "connector_id": connector_id, - "connector_type": "Jira", - }, - ) - for issue in issues - if issue.get("key") and issue.get("id") - ] - await pipeline.create_placeholder_documents(placeholders) - - connector_docs: list[ConnectorDocument] = [] - documents_skipped = 0 - duplicate_content_count = 0 - - for issue in issues: - try: - issue_id = issue.get("key") - issue_identifier = issue.get("key", "") - issue_title = issue.get("id", "") - - if not issue_id or not issue_title: - logger.warning( - f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}" - ) - documents_skipped += 1 - continue - - formatted_issue = jira_client.format_issue(issue) - issue_content = jira_client.format_issue_to_markdown(formatted_issue) - - if not issue_content: - logger.warning( - f"Skipping issue with no content: {issue_identifier} - {issue_title}" - ) - documents_skipped += 1 - continue - - doc = _build_connector_doc( - issue, - formatted_issue, - issue_content, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - enable_summary=connector.enable_summary, - ) - - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, compute_content_hash(doc) - ) - - if duplicate_by_content: - logger.info( - f"Jira issue {issue_identifier} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) - duplicate_content_count += 1 - documents_skipped += 1 - continue - - connector_docs.append(doc) - - except Exception as e: - logger.error( - f"Error building ConnectorDocument for issue {issue_identifier}: {e!s}", - exc_info=True, - ) - documents_skipped += 1 - continue - - await pipeline.migrate_legacy_docs(connector_docs) - - async def _get_llm(s: AsyncSession): - return await get_user_long_context_llm(s, user_id, search_space_id) - - _, documents_indexed, documents_failed = await pipeline.index_batch_parallel( - connector_docs, - _get_llm, - max_concurrency=3, - on_heartbeat=on_heartbeat_callback, - heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, - ) - - await update_connector_last_indexed(session, connector, update_last_indexed) - - logger.info(f"Final commit: Total {documents_indexed} Jira issues processed") - try: - await session.commit() - logger.info("Successfully committed all JIRA document changes to database") - except Exception as e: - if ( - "duplicate key value violates unique constraint" in str(e).lower() - or "uniqueviolationerror" in str(e).lower() - ): - logger.warning( - f"Duplicate content_hash detected during final commit. " - f"This may occur if the same issue was indexed by multiple connectors. " - f"Rolling back and continuing. Error: {e!s}" - ) - await session.rollback() - # Don't fail the entire task - some documents may have been successfully indexed - else: - raise - - warning_parts = [] - if duplicate_content_count > 0: - warning_parts.append(f"{duplicate_content_count} duplicate") - if documents_failed > 0: - warning_parts.append(f"{documents_failed} failed") - warning_message = ", ".join(warning_parts) if warning_parts else None - - await task_logger.log_task_success( - log_entry, - f"Successfully completed JIRA indexing for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "documents_failed": documents_failed, - "duplicate_content_count": duplicate_content_count, - }, - ) - logger.info( - f"JIRA indexing completed: {documents_indexed} ready, " - f"{documents_skipped} skipped, {documents_failed} failed " - f"({duplicate_content_count} duplicate content)" - ) - await jira_client.close() - return documents_indexed, documents_skipped, warning_message - - except SQLAlchemyError as db_error: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Database error during JIRA indexing for connector {connector_id}", - str(db_error), - {"error_type": "SQLAlchemyError"}, - ) - logger.error(f"Database error: {db_error!s}", exc_info=True) - if "jira_client" in locals(): - with contextlib.suppress(Exception): - await jira_client.close() - return 0, 0, f"Database error: {db_error!s}" - except Exception as e: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Failed to index JIRA issues for connector {connector_id}", - str(e), - {"error_type": type(e).__name__}, - ) - logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True) - if "jira_client" in locals(): - with contextlib.suppress(Exception): - await jira_client.close() - return 0, 0, f"Failed to index JIRA issues: {e!s}" diff --git a/surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py deleted file mode 100644 index 494991074..000000000 --- a/surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py +++ /dev/null @@ -1,387 +0,0 @@ -"""Tests for Jira indexer migrated to the unified parallel pipeline.""" - -from unittest.mock import AsyncMock, MagicMock - -import pytest - -import app.tasks.connector_indexers.jira_indexer as _mod -from app.db import DocumentType -from app.tasks.connector_indexers.jira_indexer import ( - _build_connector_doc, - index_jira_issues, -) - -pytestmark = pytest.mark.unit - -_USER_ID = "00000000-0000-0000-0000-000000000001" -_CONNECTOR_ID = 42 -_SEARCH_SPACE_ID = 1 - - -def _make_issue( - issue_key: str = "ENG-1", - issue_id: str = "10001", - title: str = "Fix login", -): - return {"key": issue_key, "id": issue_id, "title": title} - - -def _make_formatted_issue( - issue_key: str = "ENG-1", - issue_id: str = "10001", - title: str = "Fix login", - status: str = "In Progress", - priority: str = "High", - comments=None, -): - return { - "key": issue_key, - "id": issue_id, - "title": title, - "status": status, - "priority": priority, - "comments": comments or [], - } - - -# --------------------------------------------------------------------------- -# Slice 1: _build_connector_doc tracer bullet -# --------------------------------------------------------------------------- - - -async def test_build_connector_doc_produces_correct_fields(): - issue = _make_issue(issue_key="ENG-42", issue_id="4242", title="Fix auth bug") - formatted = _make_formatted_issue( - issue_key="ENG-42", - issue_id="4242", - title="Fix auth bug", - status="Done", - priority="Urgent", - comments=[{"id": "c1"}], - ) - markdown = "# ENG-42: Fix auth bug\n\nBody" - - doc = _build_connector_doc( - issue, - formatted, - markdown, - connector_id=_CONNECTOR_ID, - search_space_id=_SEARCH_SPACE_ID, - user_id=_USER_ID, - enable_summary=True, - ) - - assert doc.title == "ENG-42: 4242" - assert doc.unique_id == "ENG-42" - assert doc.document_type == DocumentType.JIRA_CONNECTOR - assert doc.source_markdown == markdown - assert doc.search_space_id == _SEARCH_SPACE_ID - assert doc.connector_id == _CONNECTOR_ID - assert doc.created_by_id == _USER_ID - assert doc.should_summarize is True - assert doc.metadata["issue_id"] == "ENG-42" - assert doc.metadata["issue_identifier"] == "ENG-42" - assert doc.metadata["issue_title"] == "4242" - assert doc.metadata["state"] == "Done" - assert doc.metadata["priority"] == "Urgent" - assert doc.metadata["comment_count"] == 1 - assert doc.metadata["connector_id"] == _CONNECTOR_ID - assert doc.metadata["document_type"] == "Jira Issue" - assert doc.metadata["connector_type"] == "Jira" - assert doc.fallback_summary is not None - assert "ENG-42" in doc.fallback_summary - assert markdown in doc.fallback_summary - - -async def test_build_connector_doc_summary_disabled(): - doc = _build_connector_doc( - _make_issue(), - _make_formatted_issue(), - "# content", - connector_id=_CONNECTOR_ID, - search_space_id=_SEARCH_SPACE_ID, - user_id=_USER_ID, - enable_summary=False, - ) - assert doc.should_summarize is False - - -# --------------------------------------------------------------------------- -# Shared fixtures for Slices 2-7 -# --------------------------------------------------------------------------- - - -def _mock_connector(enable_summary: bool = True): - c = MagicMock() - c.config = {"access_token": "tok"} - c.enable_summary = enable_summary - c.last_indexed_at = None - return c - - -def _mock_jira_client(issues=None, error=None): - client = MagicMock() - client.get_issues_by_date_range = AsyncMock( - return_value=(issues if issues is not None else [], error), - ) - client.format_issue = MagicMock( - side_effect=lambda i: _make_formatted_issue( - issue_key=i.get("key", ""), - issue_id=i.get("id", ""), - title=i.get("title", ""), - ) - ) - client.format_issue_to_markdown = MagicMock( - side_effect=lambda fi: f"# {fi.get('key', '')}: {fi.get('id', '')}\n\nContent" - ) - client.close = AsyncMock() - return client - - -@pytest.fixture -def jira_mocks(monkeypatch): - mock_session = AsyncMock() - mock_session.no_autoflush = MagicMock() - - mock_connector = _mock_connector() - monkeypatch.setattr( - _mod, - "get_connector_by_id", - AsyncMock(return_value=mock_connector), - ) - - jira_client = _mock_jira_client(issues=[_make_issue()]) - monkeypatch.setattr( - _mod, - "JiraHistoryConnector", - MagicMock(return_value=jira_client), - ) - - monkeypatch.setattr( - _mod, - "check_duplicate_document_by_hash", - AsyncMock(return_value=None), - ) - monkeypatch.setattr( - _mod, - "update_connector_last_indexed", - AsyncMock(), - ) - monkeypatch.setattr( - _mod, - "calculate_date_range", - MagicMock(return_value=("2025-01-01", "2025-12-31")), - ) - - mock_task_logger = MagicMock() - mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock()) - mock_task_logger.log_task_progress = AsyncMock() - mock_task_logger.log_task_success = AsyncMock() - mock_task_logger.log_task_failure = AsyncMock() - monkeypatch.setattr( - _mod, - "TaskLoggingService", - MagicMock(return_value=mock_task_logger), - ) - - batch_mock = AsyncMock(return_value=([], 1, 0)) - pipeline_mock = MagicMock() - pipeline_mock.index_batch_parallel = batch_mock - pipeline_mock.migrate_legacy_docs = AsyncMock() - pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0) - monkeypatch.setattr( - _mod, - "IndexingPipelineService", - MagicMock(return_value=pipeline_mock), - ) - - return { - "session": mock_session, - "connector": mock_connector, - "jira_client": jira_client, - "task_logger": mock_task_logger, - "pipeline_mock": pipeline_mock, - "batch_mock": batch_mock, - } - - -async def _run_index(mocks, **overrides): - return await index_jira_issues( - session=mocks["session"], - connector_id=overrides.get("connector_id", _CONNECTOR_ID), - search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID), - user_id=overrides.get("user_id", _USER_ID), - start_date=overrides.get("start_date", "2025-01-01"), - end_date=overrides.get("end_date", "2025-12-31"), - update_last_indexed=overrides.get("update_last_indexed", True), - on_heartbeat_callback=overrides.get("on_heartbeat_callback"), - ) - - -# --------------------------------------------------------------------------- -# Slice 2: Full pipeline wiring -# --------------------------------------------------------------------------- - - -async def test_one_issue_calls_pipeline_and_returns_indexed_count(jira_mocks): - indexed, skipped, warning = await _run_index(jira_mocks) - assert indexed == 1 - assert skipped == 0 - assert warning is None - - jira_mocks["batch_mock"].assert_called_once() - connector_docs = jira_mocks["batch_mock"].call_args[0][0] - assert len(connector_docs) == 1 - assert connector_docs[0].document_type == DocumentType.JIRA_CONNECTOR - - -async def test_pipeline_called_with_max_concurrency_3(jira_mocks): - await _run_index(jira_mocks) - call_kwargs = jira_mocks["batch_mock"].call_args[1] - assert call_kwargs.get("max_concurrency") == 3 - - -async def test_migrate_legacy_docs_called_before_indexing(jira_mocks): - await _run_index(jira_mocks) - jira_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once() - - -# --------------------------------------------------------------------------- -# Slice 3: Issue skipping (missing key/title/content) -# --------------------------------------------------------------------------- - - -async def test_issues_with_missing_key_are_skipped(jira_mocks): - issues = [ - _make_issue(issue_key="ENG-1", issue_id="10001"), - {"key": "", "id": "10002", "title": "No key"}, - ] - jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) - - _, skipped, _ = await _run_index(jira_mocks) - connector_docs = jira_mocks["batch_mock"].call_args[0][0] - assert len(connector_docs) == 1 - assert skipped == 1 - - -async def test_issues_with_missing_title_are_skipped(jira_mocks): - issues = [ - _make_issue(issue_key="ENG-1", issue_id="10001"), - {"key": "ENG-2", "id": "", "title": "Missing id used as title"}, - ] - jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) - - _, skipped, _ = await _run_index(jira_mocks) - connector_docs = jira_mocks["batch_mock"].call_args[0][0] - assert len(connector_docs) == 1 - assert skipped == 1 - - -async def test_issues_with_no_content_are_skipped(jira_mocks): - issues = [ - _make_issue(issue_key="ENG-1", issue_id="10001"), - _make_issue(issue_key="ENG-2", issue_id="10002"), - ] - jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) - - jira_mocks["jira_client"].format_issue_to_markdown.side_effect = [ - "# ENG-1: 10001\n\nContent", - "", - ] - _, skipped, _ = await _run_index(jira_mocks) - connector_docs = jira_mocks["batch_mock"].call_args[0][0] - assert len(connector_docs) == 1 - assert skipped == 1 - - -# --------------------------------------------------------------------------- -# Slice 4: Duplicate content skipping -# --------------------------------------------------------------------------- - - -async def test_duplicate_content_issues_are_skipped(jira_mocks, monkeypatch): - issues = [ - _make_issue(issue_key="ENG-1", issue_id="10001"), - _make_issue(issue_key="ENG-2", issue_id="10002"), - ] - jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None) - - call_count = 0 - - async def _check_dup(session, content_hash): - nonlocal call_count - call_count += 1 - if call_count == 2: - dup = MagicMock() - dup.id = 99 - dup.document_type = "OTHER" - return dup - return None - - monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup) - - _, skipped, _ = await _run_index(jira_mocks) - connector_docs = jira_mocks["batch_mock"].call_args[0][0] - assert len(connector_docs) == 1 - assert skipped == 1 - - -# --------------------------------------------------------------------------- -# Slice 5: Heartbeat callback forwarding -# --------------------------------------------------------------------------- - - -async def test_heartbeat_callback_forwarded_to_pipeline(jira_mocks): - heartbeat_cb = AsyncMock() - await _run_index(jira_mocks, on_heartbeat_callback=heartbeat_cb) - call_kwargs = jira_mocks["batch_mock"].call_args[1] - assert call_kwargs.get("on_heartbeat") is heartbeat_cb - - -# --------------------------------------------------------------------------- -# Slice 6: Empty issues and no-data success tuple -# --------------------------------------------------------------------------- - - -async def test_empty_issues_returns_zero_tuple(jira_mocks): - jira_mocks["jira_client"].get_issues_by_date_range.return_value = ([], None) - indexed, skipped, warning = await _run_index(jira_mocks) - assert indexed == 0 - assert skipped == 0 - assert warning is None - jira_mocks["batch_mock"].assert_not_called() - - -async def test_no_issues_error_message_returns_success_tuple(jira_mocks): - jira_mocks["jira_client"].get_issues_by_date_range.return_value = ( - [], - "No issues found in date range", - ) - indexed, skipped, warning = await _run_index(jira_mocks) - assert indexed == 0 - assert skipped == 0 - assert warning is None - - -async def test_api_error_still_returns_3_tuple(jira_mocks): - jira_mocks["jira_client"].get_issues_by_date_range.return_value = ( - [], - "API exploded", - ) - result = await _run_index(jira_mocks) - assert len(result) == 3 - assert result[0] == 0 - assert result[1] == 0 - assert "Failed to get Jira issues" in result[2] - - -# --------------------------------------------------------------------------- -# Slice 7: Failed docs warning -# --------------------------------------------------------------------------- - - -async def test_failed_docs_warning_in_result(jira_mocks): - jira_mocks["batch_mock"].return_value = ([], 0, 2) - _, _, warning = await _run_index(jira_mocks) - assert warning is not None - assert "2 failed" in warning