refactor(jira): remove dead legacy indexing and write tools (mcp-only now)

This commit is contained in:
CREDO23 2026-06-02 16:38:00 +02:00
parent ef60af90cf
commit b6710ae9af
11 changed files with 0 additions and 3075 deletions

View file

@ -1,11 +0,0 @@
"""Jira tools for creating, updating, and deleting issues."""
from .create_issue import create_create_jira_issue_tool
from .delete_issue import create_delete_jira_issue_tool
from .update_issue import create_update_jira_issue_tool
__all__ = [
"create_create_jira_issue_tool",
"create_delete_jira_issue_tool",
"create_update_jira_issue_tool",
]

View file

@ -1,248 +0,0 @@
import asyncio
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.jira_history import JiraHistoryConnector
from app.db import async_session_maker
from app.services.jira import JiraToolMetadataService
logger = logging.getLogger(__name__)
def create_create_jira_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
"""Factory function to create the create_jira_issue tool.
The tool acquires its own short-lived ``AsyncSession`` per call via
:data:`async_session_maker`. This is critical for the compiled-agent
cache: the compiled graph (and therefore this closure) is reused
across HTTP requests, so capturing a per-request session here would
surface stale/closed sessions on cache hits. Per-call sessions also
keep the request's outer transaction free of long-running Jira API
blocking.
Args:
db_session: Reserved for registry compatibility. Per-call sessions
are opened via :data:`async_session_maker` inside the tool body.
search_space_id: Search space ID to find the Jira connector
user_id: User ID for fetching user-specific context
connector_id: Optional specific connector ID (if known)
Returns:
Configured create_jira_issue tool
"""
del db_session # per-call session — see docstring
@tool
async def create_jira_issue(
project_key: str,
summary: str,
issue_type: str = "Task",
description: str | None = None,
priority: str | None = None,
) -> dict[str, Any]:
"""Create a new issue in Jira.
Use this tool when the user explicitly asks to create a new Jira issue/ticket.
Args:
project_key: The Jira project key (e.g. "PROJ", "ENG").
summary: Short, descriptive issue title.
issue_type: Issue type (default "Task"). Others: "Bug", "Story", "Epic".
description: Optional description body for the issue.
priority: Optional priority name (e.g. "High", "Medium", "Low").
Returns:
Dictionary with status, issue_key, and message.
IMPORTANT:
- If status is "rejected", the user declined. Do NOT retry.
- If status is "insufficient_permissions", inform user to re-authenticate.
"""
logger.info(
f"create_jira_issue called: project_key='{project_key}', summary='{summary}'"
)
if search_space_id is None or user_id is None:
return {"status": "error", "message": "Jira tool not properly configured."}
try:
async with async_session_maker() as db_session:
metadata_service = JiraToolMetadataService(db_session)
context = await metadata_service.get_creation_context(
search_space_id, user_id
)
if "error" in context:
return {"status": "error", "message": context["error"]}
accounts = context.get("accounts", [])
if accounts and all(a.get("auth_expired") for a in accounts):
return {
"status": "auth_error",
"message": "All connected Jira accounts need re-authentication.",
"connector_type": "jira",
}
result = request_approval(
action_type="jira_issue_creation",
tool_name="create_jira_issue",
params={
"project_key": project_key,
"summary": summary,
"issue_type": issue_type,
"description": description,
"priority": priority,
"connector_id": connector_id,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_project_key = result.params.get("project_key", project_key)
final_summary = result.params.get("summary", summary)
final_issue_type = result.params.get("issue_type", issue_type)
final_description = result.params.get("description", description)
final_priority = result.params.get("priority", priority)
final_connector_id = result.params.get("connector_id", connector_id)
if not final_summary or not final_summary.strip():
return {
"status": "error",
"message": "Issue summary cannot be empty.",
}
if not final_project_key:
return {"status": "error", "message": "A project must be selected."}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
actual_connector_id = final_connector_id
if actual_connector_id is None:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "No Jira connector found.",
}
actual_connector_id = connector.id
else:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == actual_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Jira connector is invalid.",
}
try:
jira_history = JiraHistoryConnector(
session=db_session, connector_id=actual_connector_id
)
jira_client = await jira_history._get_jira_client()
api_result = await asyncio.to_thread(
jira_client.create_issue,
project_key=final_project_key,
summary=final_summary,
issue_type=final_issue_type,
description=final_description,
priority=final_priority,
)
except Exception as api_err:
if "status code 403" in str(api_err).lower():
try:
_conn = connector
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
pass
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
issue_key = api_result.get("key", "")
issue_url = (
f"{jira_history._base_url}/browse/{issue_key}"
if jira_history._base_url and issue_key
else ""
)
kb_message_suffix = ""
try:
from app.services.jira import JiraKBSyncService
kb_service = JiraKBSyncService(db_session)
kb_result = await kb_service.sync_after_create(
issue_id=issue_key,
issue_identifier=issue_key,
issue_title=final_summary,
description=final_description,
state="To Do",
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = (
" Your knowledge base has also been updated."
)
else:
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
except Exception as kb_err:
logger.warning(f"KB sync after create failed: {kb_err}")
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
return {
"status": "success",
"issue_key": issue_key,
"issue_url": issue_url,
"message": f"Jira issue {issue_key} created successfully.{kb_message_suffix}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error creating Jira issue: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while creating the issue.",
}
return create_jira_issue

View file

@ -1,210 +0,0 @@
import asyncio
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.jira_history import JiraHistoryConnector
from app.db import async_session_maker
from app.services.jira import JiraToolMetadataService
logger = logging.getLogger(__name__)
def create_delete_jira_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
"""Factory function to create the delete_jira_issue tool.
The tool acquires its own short-lived ``AsyncSession`` per call via
:data:`async_session_maker`. This is critical for the compiled-agent
cache: the compiled graph (and therefore this closure) is reused
across HTTP requests, so capturing a per-request session here would
surface stale/closed sessions on cache hits.
Args:
db_session: Reserved for registry compatibility. Per-call sessions
are opened via :data:`async_session_maker` inside the tool body.
search_space_id: Search space ID to find the Jira connector
user_id: User ID for fetching user-specific context
connector_id: Optional specific connector ID (if known)
Returns:
Configured delete_jira_issue tool
"""
del db_session # per-call session — see docstring
@tool
async def delete_jira_issue(
issue_title_or_key: str,
delete_from_kb: bool = False,
) -> dict[str, Any]:
"""Delete a Jira issue.
Use this tool when the user asks to delete or remove a Jira issue.
Args:
issue_title_or_key: The issue key (e.g. "PROJ-42") or title.
delete_from_kb: Whether to also remove from the knowledge base.
Returns:
Dictionary with status, message, and deleted_from_kb.
IMPORTANT:
- If status is "rejected", do NOT retry.
- If status is "not_found", relay the message to the user.
- If status is "insufficient_permissions", inform user to re-authenticate.
"""
logger.info(
f"delete_jira_issue called: issue_title_or_key='{issue_title_or_key}'"
)
if search_space_id is None or user_id is None:
return {"status": "error", "message": "Jira tool not properly configured."}
try:
async with async_session_maker() as db_session:
metadata_service = JiraToolMetadataService(db_session)
context = await metadata_service.get_deletion_context(
search_space_id, user_id, issue_title_or_key
)
if "error" in context:
error_msg = context["error"]
if context.get("auth_expired"):
return {
"status": "auth_error",
"message": error_msg,
"connector_id": context.get("connector_id"),
"connector_type": "jira",
}
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
issue_data = context["issue"]
issue_key = issue_data["issue_id"]
document_id = issue_data["document_id"]
connector_id_from_context = context.get("account", {}).get("id")
result = request_approval(
action_type="jira_issue_deletion",
tool_name="delete_jira_issue",
params={
"issue_key": issue_key,
"connector_id": connector_id_from_context,
"delete_from_kb": delete_from_kb,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_issue_key = result.params.get("issue_key", issue_key)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
final_delete_from_kb = result.params.get(
"delete_from_kb", delete_from_kb
)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if not final_connector_id:
return {
"status": "error",
"message": "No connector found for this issue.",
}
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Jira connector is invalid.",
}
try:
jira_history = JiraHistoryConnector(
session=db_session, connector_id=final_connector_id
)
jira_client = await jira_history._get_jira_client()
await asyncio.to_thread(jira_client.delete_issue, final_issue_key)
except Exception as api_err:
if "status code 403" in str(api_err).lower():
try:
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
pass
return {
"status": "insufficient_permissions",
"connector_id": final_connector_id,
"message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
deleted_from_kb = False
if final_delete_from_kb and document_id:
try:
from app.db import Document
doc_result = await db_session.execute(
select(Document).filter(Document.id == document_id)
)
document = doc_result.scalars().first()
if document:
await db_session.delete(document)
await db_session.commit()
deleted_from_kb = True
except Exception as e:
logger.error(f"Failed to delete document from KB: {e}")
await db_session.rollback()
message = f"Jira issue {final_issue_key} deleted successfully."
if deleted_from_kb:
message += " Also removed from the knowledge base."
return {
"status": "success",
"issue_key": final_issue_key,
"deleted_from_kb": deleted_from_kb,
"message": message,
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error deleting Jira issue: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while deleting the issue.",
}
return delete_jira_issue

View file

@ -1,255 +0,0 @@
import asyncio
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.jira_history import JiraHistoryConnector
from app.db import async_session_maker
from app.services.jira import JiraToolMetadataService
logger = logging.getLogger(__name__)
def create_update_jira_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
"""Factory function to create the update_jira_issue tool.
The tool acquires its own short-lived ``AsyncSession`` per call via
:data:`async_session_maker`. This is critical for the compiled-agent
cache: the compiled graph (and therefore this closure) is reused
across HTTP requests, so capturing a per-request session here would
surface stale/closed sessions on cache hits.
Args:
db_session: Reserved for registry compatibility. Per-call sessions
are opened via :data:`async_session_maker` inside the tool body.
search_space_id: Search space ID to find the Jira connector
user_id: User ID for fetching user-specific context
connector_id: Optional specific connector ID (if known)
Returns:
Configured update_jira_issue tool
"""
del db_session # per-call session — see docstring
@tool
async def update_jira_issue(
issue_title_or_key: str,
new_summary: str | None = None,
new_description: str | None = None,
new_priority: str | None = None,
) -> dict[str, Any]:
"""Update an existing Jira issue.
Use this tool when the user asks to modify, edit, or update a Jira issue.
Args:
issue_title_or_key: The issue key (e.g. "PROJ-42") or title to identify the issue.
new_summary: Optional new title/summary for the issue.
new_description: Optional new description.
new_priority: Optional new priority name.
Returns:
Dictionary with status and message.
IMPORTANT:
- If status is "rejected", do NOT retry.
- If status is "not_found", relay the message and ask user to verify.
- If status is "insufficient_permissions", inform user to re-authenticate.
"""
logger.info(
f"update_jira_issue called: issue_title_or_key='{issue_title_or_key}'"
)
if search_space_id is None or user_id is None:
return {"status": "error", "message": "Jira tool not properly configured."}
try:
async with async_session_maker() as db_session:
metadata_service = JiraToolMetadataService(db_session)
context = await metadata_service.get_update_context(
search_space_id, user_id, issue_title_or_key
)
if "error" in context:
error_msg = context["error"]
if context.get("auth_expired"):
return {
"status": "auth_error",
"message": error_msg,
"connector_id": context.get("connector_id"),
"connector_type": "jira",
}
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
issue_data = context["issue"]
issue_key = issue_data["issue_id"]
document_id = issue_data.get("document_id")
connector_id_from_context = context.get("account", {}).get("id")
result = request_approval(
action_type="jira_issue_update",
tool_name="update_jira_issue",
params={
"issue_key": issue_key,
"document_id": document_id,
"new_summary": new_summary,
"new_description": new_description,
"new_priority": new_priority,
"connector_id": connector_id_from_context,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_issue_key = result.params.get("issue_key", issue_key)
final_summary = result.params.get("new_summary", new_summary)
final_description = result.params.get(
"new_description", new_description
)
final_priority = result.params.get("new_priority", new_priority)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
final_document_id = result.params.get("document_id", document_id)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if not final_connector_id:
return {
"status": "error",
"message": "No connector found for this issue.",
}
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Jira connector is invalid.",
}
fields: dict[str, Any] = {}
if final_summary:
fields["summary"] = final_summary
if final_description is not None:
fields["description"] = {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [
{"type": "text", "text": final_description}
],
}
],
}
if final_priority:
fields["priority"] = {"name": final_priority}
if not fields:
return {"status": "error", "message": "No changes specified."}
try:
jira_history = JiraHistoryConnector(
session=db_session, connector_id=final_connector_id
)
jira_client = await jira_history._get_jira_client()
await asyncio.to_thread(
jira_client.update_issue, final_issue_key, fields
)
except Exception as api_err:
if "status code 403" in str(api_err).lower():
try:
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
pass
return {
"status": "insufficient_permissions",
"connector_id": final_connector_id,
"message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
issue_url = (
f"{jira_history._base_url}/browse/{final_issue_key}"
if jira_history._base_url and final_issue_key
else ""
)
kb_message_suffix = ""
if final_document_id:
try:
from app.services.jira import JiraKBSyncService
kb_service = JiraKBSyncService(db_session)
kb_result = await kb_service.sync_after_update(
document_id=final_document_id,
issue_id=final_issue_key,
user_id=user_id,
search_space_id=search_space_id,
)
if kb_result["status"] == "success":
kb_message_suffix = (
" Your knowledge base has also been updated."
)
else:
kb_message_suffix = (
" The knowledge base will be updated in the next sync."
)
except Exception as kb_err:
logger.warning(f"KB sync after update failed: {kb_err}")
kb_message_suffix = (
" The knowledge base will be updated in the next sync."
)
return {
"status": "success",
"issue_key": final_issue_key,
"issue_url": issue_url,
"message": f"Jira issue {final_issue_key} updated successfully.{kb_message_suffix}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error updating Jira issue: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while updating the issue.",
}
return update_jira_issue

View file

@ -1,648 +0,0 @@
"""
Jira Connector Module
A module for retrieving data from Jira.
Allows fetching issue lists and their comments, projects and more.
Supports both OAuth 2.0 (preferred) and legacy API token authentication.
"""
import base64
from datetime import datetime
from typing import Any
import requests
class JiraConnector:
"""Class for retrieving data from Jira."""
def __init__(
self,
base_url: str | None = None,
access_token: str | None = None,
cloud_id: str | None = None,
email: str | None = None,
api_token: str | None = None,
):
"""
Initialize the JiraConnector class.
Args:
base_url: Jira instance base URL (e.g., 'https://yourcompany.atlassian.net')
access_token: OAuth 2.0 access token (preferred method)
cloud_id: Atlassian cloud ID (used with OAuth for API URL construction)
email: Jira account email address (legacy method, used with api_token)
api_token: Jira API token (legacy method, used with email)
"""
self.base_url = base_url.rstrip("/") if base_url else None
self.access_token = access_token
self.cloud_id = cloud_id
self.email = email
self.api_token = api_token
self.api_version = "3" # Jira Cloud API version
self._use_oauth = access_token is not None
def set_oauth_credentials(
self, base_url: str, access_token: str, cloud_id: str | None = None
) -> None:
"""
Set OAuth 2.0 credentials (preferred method).
Args:
base_url: Jira instance base URL
access_token: OAuth 2.0 access token
cloud_id: Atlassian cloud ID (optional, used for API URL construction)
"""
self.base_url = base_url.rstrip("/")
self.access_token = access_token
self.cloud_id = cloud_id
self._use_oauth = True
def set_credentials(self, base_url: str, email: str, api_token: str) -> None:
"""
Set the Jira credentials (legacy method using API token).
Args:
base_url: Jira instance base URL
email: Jira account email address
api_token: Jira API token
"""
self.base_url = base_url.rstrip("/")
self.email = email
self.api_token = api_token
self._use_oauth = False
def set_email(self, email: str) -> None:
"""
Set the Jira account email (legacy method).
Args:
email: Jira account email address
"""
self.email = email
self._use_oauth = False
def set_api_token(self, api_token: str) -> None:
"""
Set the Jira API token (legacy method).
Args:
api_token: Jira API token
"""
self.api_token = api_token
self._use_oauth = False
def get_headers(self) -> dict[str, str]:
"""
Get headers for Jira API requests.
Uses OAuth Bearer token if available, otherwise falls back to Basic Auth.
Returns:
Dictionary of headers
Raises:
ValueError: If credentials have not been set
"""
if self._use_oauth:
# OAuth 2.0 authentication
if not self.base_url or not self.access_token:
raise ValueError(
"Jira OAuth credentials not initialized. Call set_oauth_credentials() first."
)
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
}
else:
# Legacy Basic Auth
if not all([self.base_url, self.email, self.api_token]):
raise ValueError(
"Jira credentials not initialized. Call set_credentials() first."
)
# Create Basic Auth header using email:api_token
auth_str = f"{self.email}:{self.api_token}"
auth_bytes = auth_str.encode("utf-8")
auth_header = "Basic " + base64.b64encode(auth_bytes).decode("ascii")
return {
"Content-Type": "application/json",
"Authorization": auth_header,
"Accept": "application/json",
}
def make_api_request(
self,
endpoint: str,
params: dict[str, Any] | None = None,
method: str = "GET",
json_payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Make a request to the Jira API.
Args:
endpoint: API endpoint (without base URL)
params: Query parameters for the request (optional)
method: HTTP method (GET or POST)
json_payload: JSON payload for POST requests (optional)
Returns:
Response data from the API
Raises:
ValueError: If credentials have not been set
Exception: If the API request fails
"""
headers = self.get_headers()
# Construct API URL based on authentication method
if self._use_oauth and self.cloud_id:
# Use Atlassian API gateway with cloud_id for OAuth
url = f"https://api.atlassian.com/ex/jira/{self.cloud_id}/rest/api/{self.api_version}/{endpoint}"
else:
# Use direct base URL (works for both OAuth and legacy)
url = f"{self.base_url}/rest/api/{self.api_version}/{endpoint}"
method_upper = method.upper()
if method_upper == "POST":
response = requests.post(
url, headers=headers, json=json_payload, timeout=500
)
elif method_upper == "PUT":
response = requests.put(
url, headers=headers, json=json_payload, timeout=500
)
elif method_upper == "DELETE":
response = requests.delete(url, headers=headers, params=params, timeout=500)
else:
response = requests.get(url, headers=headers, params=params, timeout=500)
if response.status_code in (200, 201, 204):
if response.status_code == 204 or not response.text:
return {"status": "success"}
return response.json()
else:
raise Exception(
f"API request failed with status code {response.status_code}: {response.text}"
)
def get_all_projects(self) -> dict[str, Any]:
"""
Fetch all projects from Jira.
Returns:
List of project objects
Raises:
ValueError: If credentials have not been set
Exception: If the API request fails
"""
return self.make_api_request("project/search")
def get_all_issues(self, project_key: str | None = None) -> list[dict[str, Any]]:
"""
Fetch all issues from Jira.
Args:
project_key: Optional project key to filter issues (e.g., 'PROJ')
Returns:
List of issue objects
Raises:
ValueError: If credentials have not been set
Exception: If the API request fails
"""
jql = "ORDER BY created DESC"
if project_key:
jql = f'project = "{project_key}" ' + jql
fields = [
"summary",
"description",
"status",
"assignee",
"reporter",
"created",
"updated",
"priority",
"issuetype",
"project",
]
all_issues = []
start_at = 0
max_results = 100
all_issues = []
start_at = 0
while True:
json_payload = {
"jql": jql,
"fields": fields, # API accepts list
"maxResults": max_results,
"startAt": start_at,
}
result = self.make_api_request(
"search/jql", json_payload=json_payload, method="POST"
)
if not isinstance(result, dict) or "issues" not in result:
raise Exception("Invalid response from Jira API")
issues = result["issues"]
all_issues.extend(issues)
print(f"Fetched {len(issues)} issues (startAt={start_at})")
total = result.get("total", 0)
if start_at + len(issues) >= total:
break
start_at += len(issues)
return all_issues
def get_issues_by_date_range(
self,
start_date: str,
end_date: str,
include_comments: bool = True,
project_key: str | None = None,
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch issues within a date range.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (inclusive)
include_comments: Whether to include comments in the response
project_key: Optional project key to filter issues
Returns:
Tuple containing (issues list, error message or None)
"""
try:
# Build JQL query for date range
# Query issues that were either created OR updated within the date range
# Use end_date + 1 day with < operator to include the full end date
from datetime import datetime, timedelta
# Parse end_date and add 1 day for inclusive end date
end_date_obj = datetime.strptime(end_date, "%Y-%m-%d")
end_date_next = (end_date_obj + timedelta(days=1)).strftime("%Y-%m-%d")
# Check both created and updated dates to catch all relevant issues
# Use 'created' and 'updated' (standard JQL field names)
date_filter = (
f"(created >= '{start_date}' AND created < '{end_date_next}') "
f"OR (updated >= '{start_date}' AND updated < '{end_date_next}')"
)
jql = f"{date_filter} ORDER BY created DESC"
if project_key:
jql = f'project = "{project_key}" AND ({date_filter}) ORDER BY created DESC'
# Define fields to retrieve
fields = [
"summary",
"description",
"status",
"assignee",
"reporter",
"created",
"updated",
"priority",
"issuetype",
"project",
]
if include_comments:
fields.append("comment")
params = {
"jql": jql,
"fields": ",".join(fields),
"maxResults": 100,
"startAt": 0,
}
all_issues = []
start_at = 0
while True:
params["startAt"] = start_at
result = self.make_api_request("search/jql", params)
if not isinstance(result, dict) or "issues" not in result:
return [], "Invalid response from Jira API"
issues = result["issues"]
all_issues.extend(issues)
# Check if there are more issues to fetch
total = result.get("total", 0)
if start_at + len(issues) >= total:
break
start_at += len(issues)
if not all_issues:
return [], "No issues found in the specified date range."
return all_issues, None
except Exception as e:
return [], f"Error fetching issues: {e!s}"
def get_myself(self) -> dict[str, Any]:
"""Fetch the current user's profile (health check)."""
return self.make_api_request("myself")
def get_projects(self) -> list[dict[str, Any]]:
"""Fetch all projects the user has access to."""
result = self.make_api_request("project/search")
return result.get("values", [])
def get_issue_types(self) -> list[dict[str, Any]]:
"""Fetch all issue types."""
return self.make_api_request("issuetype")
def get_priorities(self) -> list[dict[str, Any]]:
"""Fetch all priority levels."""
return self.make_api_request("priority")
def get_issue(self, issue_id_or_key: str) -> dict[str, Any]:
"""Fetch a single issue by ID or key."""
return self.make_api_request(f"issue/{issue_id_or_key}")
def create_issue(
self,
project_key: str,
summary: str,
issue_type: str = "Task",
description: str | None = None,
priority: str | None = None,
assignee_id: str | None = None,
) -> dict[str, Any]:
"""Create a new Jira issue."""
fields: dict[str, Any] = {
"project": {"key": project_key},
"summary": summary,
"issuetype": {"name": issue_type},
}
if description:
fields["description"] = {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [{"type": "text", "text": description}],
}
],
}
if priority:
fields["priority"] = {"name": priority}
if assignee_id:
fields["assignee"] = {"accountId": assignee_id}
return self.make_api_request(
"issue", method="POST", json_payload={"fields": fields}
)
def update_issue(
self, issue_id_or_key: str, fields: dict[str, Any]
) -> dict[str, Any]:
"""Update an existing Jira issue fields."""
return self.make_api_request(
f"issue/{issue_id_or_key}",
method="PUT",
json_payload={"fields": fields},
)
def delete_issue(self, issue_id_or_key: str) -> dict[str, Any]:
"""Delete a Jira issue."""
return self.make_api_request(f"issue/{issue_id_or_key}", method="DELETE")
def get_transitions(self, issue_id_or_key: str) -> list[dict[str, Any]]:
"""Get available transitions for an issue (for status changes)."""
result = self.make_api_request(f"issue/{issue_id_or_key}/transitions")
return result.get("transitions", [])
def transition_issue(
self, issue_id_or_key: str, transition_id: str
) -> dict[str, Any]:
"""Transition an issue to a new status."""
return self.make_api_request(
f"issue/{issue_id_or_key}/transitions",
method="POST",
json_payload={"transition": {"id": transition_id}},
)
def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]:
"""
Format an issue for easier consumption.
Args:
issue: The issue object from Jira API
Returns:
Formatted issue dictionary
"""
fields = issue.get("fields", {})
# Extract basic issue details
formatted = {
"id": issue.get("id", ""),
"key": issue.get("key", ""),
"title": fields.get("summary", ""),
"description": fields.get("description", ""),
"status": (
fields.get("status", {}).get("name", "Unknown")
if fields.get("status")
else "Unknown"
),
"status_category": (
fields.get("status", {})
.get("statusCategory", {})
.get("name", "Unknown")
if fields.get("status")
else "Unknown"
),
"priority": (
fields.get("priority", {}).get("name", "Unknown")
if fields.get("priority")
else "Unknown"
),
"issue_type": (
fields.get("issuetype", {}).get("name", "Unknown")
if fields.get("issuetype")
else "Unknown"
),
"project": (
fields.get("project", {}).get("key", "Unknown")
if fields.get("project")
else "Unknown"
),
"created_at": fields.get("created", ""),
"updated_at": fields.get("updated", ""),
"reporter": (
{
"account_id": (
fields.get("reporter", {}).get("accountId", "")
if fields.get("reporter")
else ""
),
"display_name": (
fields.get("reporter", {}).get("displayName", "Unknown")
if fields.get("reporter")
else "Unknown"
),
"email": (
fields.get("reporter", {}).get("emailAddress", "")
if fields.get("reporter")
else ""
),
}
if fields.get("reporter")
else {"account_id": "", "display_name": "Unknown", "email": ""}
),
"assignee": (
{
"account_id": fields.get("assignee", {}).get("accountId", ""),
"display_name": fields.get("assignee", {}).get(
"displayName", "Unknown"
),
"email": fields.get("assignee", {}).get("emailAddress", ""),
}
if fields.get("assignee")
else None
),
"comments": [],
}
# Extract comments if available
if "comment" in fields and "comments" in fields["comment"]:
for comment in fields["comment"]["comments"]:
formatted_comment = {
"id": comment.get("id", ""),
"body": comment.get("body", ""),
"created_at": comment.get("created", ""),
"updated_at": comment.get("updated", ""),
"author": (
{
"account_id": (
comment.get("author", {}).get("accountId", "")
if comment.get("author")
else ""
),
"display_name": (
comment.get("author", {}).get("displayName", "Unknown")
if comment.get("author")
else "Unknown"
),
"email": (
comment.get("author", {}).get("emailAddress", "")
if comment.get("author")
else ""
),
}
if comment.get("author")
else {"account_id": "", "display_name": "Unknown", "email": ""}
),
}
formatted["comments"].append(formatted_comment)
return formatted
def format_issue_to_markdown(self, issue: dict[str, Any]) -> str:
"""
Convert an issue to markdown format.
Args:
issue: The issue object (either raw or formatted)
Returns:
Markdown string representation of the issue
"""
# Format the issue if it's not already formatted
if "key" not in issue:
issue = self.format_issue(issue)
# Build the markdown content
markdown = (
f"# {issue.get('key', 'No Key')}: {issue.get('title', 'No Title')}\n\n"
)
if issue.get("status"):
markdown += f"**Status:** {issue['status']}\n"
if issue.get("priority"):
markdown += f"**Priority:** {issue['priority']}\n"
if issue.get("issue_type"):
markdown += f"**Type:** {issue['issue_type']}\n"
if issue.get("project"):
markdown += f"**Project:** {issue['project']}\n\n"
if issue.get("assignee") and issue["assignee"].get("display_name"):
markdown += f"**Assignee:** {issue['assignee']['display_name']}\n"
if issue.get("reporter") and issue["reporter"].get("display_name"):
markdown += f"**Reporter:** {issue['reporter']['display_name']}\n"
if issue.get("created_at"):
created_date = self.format_date(issue["created_at"])
markdown += f"**Created:** {created_date}\n"
if issue.get("updated_at"):
updated_date = self.format_date(issue["updated_at"])
markdown += f"**Updated:** {updated_date}\n\n"
if issue.get("description"):
markdown += f"## Description\n\n{issue['description']}\n\n"
if issue.get("comments"):
markdown += f"## Comments ({len(issue['comments'])})\n\n"
for comment in issue["comments"]:
author_name = "Unknown"
if comment.get("author") and comment["author"].get("display_name"):
author_name = comment["author"]["display_name"]
comment_date = "Unknown date"
if comment.get("created_at"):
comment_date = self.format_date(comment["created_at"])
markdown += f"### {author_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n"
return markdown
@staticmethod
def format_date(iso_date: str) -> str:
"""
Format an ISO date string to a more readable format.
Args:
iso_date: ISO format date string
Returns:
Formatted date string
"""
if not iso_date or not isinstance(iso_date, str):
return "Unknown date"
try:
# Jira dates are typically in format: 2023-01-01T12:00:00.000+0000
dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return iso_date

View file

@ -1,350 +0,0 @@
"""
Jira OAuth Connector.
Handles OAuth-based authentication and token refresh for Jira API access.
Supports both OAuth 2.0 (preferred) and legacy API token authentication.
"""
import logging
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.config import config
from app.connectors.jira_connector import JiraConnector
from app.db import SearchSourceConnector
from app.schemas.atlassian_auth_credentials import AtlassianAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
class JiraHistoryConnector:
"""
Jira connector with OAuth support and automatic token refresh.
This connector uses OAuth 2.0 access tokens to authenticate with the
Jira API. It automatically refreshes expired tokens when needed.
Also supports legacy API token authentication for backward compatibility.
"""
def __init__(
self,
session: AsyncSession,
connector_id: int,
credentials: AtlassianAuthCredentialsBase | None = None,
):
"""
Initialize the JiraHistoryConnector with auto-refresh capability.
Args:
session: Database session for updating connector
connector_id: Connector ID for direct updates
credentials: Jira OAuth credentials (optional, will be loaded from DB if not provided)
"""
self._session = session
self._connector_id = connector_id
self._credentials = credentials
self._cloud_id: str | None = None
self._base_url: str | None = None
self._jira_client: JiraConnector | None = None
self._use_oauth = True
self._legacy_email: str | None = None
self._legacy_api_token: str | None = None
async def _get_valid_token(self) -> str:
"""
Get valid Jira access token, refreshing if needed.
Returns:
Valid access token
Raises:
ValueError: If credentials are missing or invalid
Exception: If token refresh fails
"""
# Load credentials from DB if not provided
if self._credentials is None:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if not connector:
raise ValueError(f"Connector {self._connector_id} not found")
config_data = connector.config.copy()
# Check if using OAuth or legacy API token
is_oauth = config_data.get("_token_encrypted", False) or config_data.get(
"access_token"
)
if is_oauth:
# OAuth 2.0 authentication
# Check if access_token exists before processing
raw_access_token = config_data.get("access_token")
if not raw_access_token:
raise ValueError(
"Jira access token not found. "
"Please reconnect your Jira account."
)
if not config.SECRET_KEY:
raise ValueError(
"SECRET_KEY not configured but tokens are marked as encrypted"
)
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt access_token
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
logger.info(
f"Decrypted Jira access token for connector {self._connector_id}"
)
# Decrypt refresh_token if present
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
logger.info(
f"Decrypted Jira refresh token for connector {self._connector_id}"
)
except Exception as e:
logger.error(
f"Failed to decrypt Jira credentials for connector {self._connector_id}: {e!s}"
)
raise ValueError(
f"Failed to decrypt Jira credentials: {e!s}"
) from e
# Final validation after decryption
final_token = config_data.get("access_token")
if not final_token or (
isinstance(final_token, str) and not final_token.strip()
):
raise ValueError(
"Jira access token is invalid or empty. "
"Please reconnect your Jira account."
)
try:
self._credentials = AtlassianAuthCredentialsBase.from_dict(
config_data
)
self._cloud_id = config_data.get("cloud_id")
self._base_url = config_data.get("base_url")
self._use_oauth = True
except Exception as e:
raise ValueError(f"Invalid Jira OAuth credentials: {e!s}") from e
else:
# Legacy API token authentication
self._legacy_email = config_data.get("JIRA_EMAIL")
self._legacy_api_token = config_data.get("JIRA_API_TOKEN")
self._base_url = config_data.get("JIRA_BASE_URL")
self._use_oauth = False
if (
not self._legacy_email
or not self._legacy_api_token
or not self._base_url
):
raise ValueError("Jira credentials not found in connector config")
# Check if token is expired and refreshable (only for OAuth)
if (
self._use_oauth
and self._credentials.is_expired
and self._credentials.is_refreshable
):
try:
logger.info(
f"Jira token expired for connector {self._connector_id}, refreshing..."
)
# Get connector for refresh
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if not connector:
raise RuntimeError(
f"Connector {self._connector_id} not found; cannot refresh token."
)
# Lazy import to avoid circular dependency
from app.routes.jira_add_connector_route import refresh_jira_token
connector = await refresh_jira_token(self._session, connector)
# Reload credentials after refresh
config_data = connector.config.copy()
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
token_encryption = TokenEncryption(config.SECRET_KEY)
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
self._credentials = AtlassianAuthCredentialsBase.from_dict(config_data)
self._cloud_id = config_data.get("cloud_id")
self._base_url = config_data.get("base_url")
# Invalidate cached client so it's recreated with new token
self._jira_client = None
logger.info(
f"Successfully refreshed Jira token for connector {self._connector_id}"
)
except Exception as e:
logger.error(
f"Failed to refresh Jira token for connector {self._connector_id}: {e!s}"
)
raise Exception(
f"Failed to refresh Jira OAuth credentials: {e!s}"
) from e
if self._use_oauth:
return self._credentials.access_token
else:
# For legacy auth, return empty string (not used for token-based auth)
return ""
async def _get_jira_client(self) -> JiraConnector:
"""
Get or create JiraConnector with valid credentials.
Returns:
JiraConnector instance
"""
if self._jira_client is None:
if self._use_oauth:
# Ensure we have valid token (will refresh if needed)
await self._get_valid_token()
self._jira_client = JiraConnector(
base_url=self._base_url,
access_token=self._credentials.access_token,
cloud_id=self._cloud_id,
)
else:
# Legacy API token authentication
self._jira_client = JiraConnector(
base_url=self._base_url,
email=self._legacy_email,
api_token=self._legacy_api_token,
)
else:
# If OAuth, refresh token if expired before returning client
if self._use_oauth:
await self._get_valid_token()
# Update client with new token if it was refreshed
if self._credentials:
self._jira_client.set_oauth_credentials(
base_url=self._base_url or "",
access_token=self._credentials.access_token,
cloud_id=self._cloud_id,
)
return self._jira_client
async def get_issues_by_date_range(
self,
start_date: str,
end_date: str,
include_comments: bool = True,
project_key: str | None = None,
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch issues within a date range.
This method wraps JiraConnector.get_issues_by_date_range() with automatic token refresh.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (inclusive)
include_comments: Whether to include comments in the response
project_key: Optional project key to filter issues
Returns:
Tuple containing (issues list, error message or None)
"""
# Ensure token is valid (will refresh if needed)
if self._use_oauth:
await self._get_valid_token()
# Get client with valid credentials
client = await self._get_jira_client()
# JiraConnector methods are synchronous, so we call them directly
# Token refresh has already been handled above
return client.get_issues_by_date_range(
start_date=start_date,
end_date=end_date,
include_comments=include_comments,
project_key=project_key,
)
def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]:
"""
Format an issue for easier consumption.
Wraps JiraConnector.format_issue().
Args:
issue: The issue object from Jira API
Returns:
Formatted issue dictionary
"""
# This is a synchronous method that doesn't need token refresh
# since it just formats data that's already been fetched
if self._jira_client is None:
# Create a minimal client just for formatting (doesn't need credentials)
self._jira_client = JiraConnector()
return self._jira_client.format_issue(issue)
def format_issue_to_markdown(self, issue: dict[str, Any]) -> str:
"""
Convert an issue to markdown format.
Wraps JiraConnector.format_issue_to_markdown().
Args:
issue: The issue object (either raw or formatted)
Returns:
Markdown string representation of the issue
"""
# This is a synchronous method that doesn't need token refresh
# since it just formats data that's already been fetched
if self._jira_client is None:
# Create a minimal client just for formatting (doesn't need credentials)
self._jira_client = JiraConnector()
return self._jira_client.format_issue_to_markdown(issue)
async def close(self):
"""Close any resources (currently no-op for JiraConnector)."""
# JiraConnector doesn't maintain persistent connections, so nothing to close
self._jira_client = None
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()

View file

@ -1,13 +0,0 @@
from app.services.jira.kb_sync_service import JiraKBSyncService
from app.services.jira.tool_metadata_service import (
JiraIssue,
JiraToolMetadataService,
JiraWorkspace,
)
__all__ = [
"JiraIssue",
"JiraKBSyncService",
"JiraToolMetadataService",
"JiraWorkspace",
]

View file

@ -1,257 +0,0 @@
import asyncio
import logging
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.jira_history import JiraHistoryConnector
from app.db import Document, DocumentType
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
logger = logging.getLogger(__name__)
class JiraKBSyncService:
"""Syncs Jira issue documents to the knowledge base after HITL actions."""
def __init__(self, db_session: AsyncSession):
self.db_session = db_session
async def sync_after_create(
self,
issue_id: str,
issue_identifier: str,
issue_title: str,
description: str | None,
state: str | None,
connector_id: int,
search_space_id: int,
user_id: str,
) -> dict:
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_current_timestamp,
safe_set_chunks,
)
try:
unique_hash = generate_unique_identifier_hash(
DocumentType.JIRA_CONNECTOR, issue_id, search_space_id
)
existing = await check_document_by_unique_identifier(
self.db_session, unique_hash
)
if existing:
logger.info(
"Document for Jira issue %s already exists (doc_id=%s), skipping",
issue_identifier,
existing.id,
)
return {"status": "success"}
indexable_content = (description or "").strip()
if not indexable_content:
indexable_content = f"Jira Issue {issue_identifier}: {issue_title}"
issue_content = (
f"# {issue_identifier}: {issue_title}\n\n{indexable_content}"
)
content_hash = generate_content_hash(issue_content, search_space_id)
with self.db_session.no_autoflush:
dup = await check_duplicate_document_by_hash(
self.db_session, content_hash
)
if dup:
content_hash = unique_hash
from app.services.llm_service import get_user_long_context_llm
user_llm = await get_user_long_context_llm(
self.db_session,
user_id,
search_space_id,
disable_streaming=True,
)
doc_metadata_for_summary = {
"issue_id": issue_identifier,
"issue_title": issue_title,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
if user_llm:
summary_content, summary_embedding = await generate_document_summary(
issue_content, user_llm, doc_metadata_for_summary
)
else:
summary_content = (
f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
)
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
chunks = await create_document_chunks(issue_content)
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
document = Document(
title=f"{issue_identifier}: {issue_title}",
document_type=DocumentType.JIRA_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state or "Unknown",
"indexed_at": now_str,
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
connector_id=connector_id,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
self.db_session.add(document)
await self.db_session.flush()
await safe_set_chunks(self.db_session, document, chunks)
await self.db_session.commit()
logger.info(
"KB sync after create succeeded: doc_id=%s, issue=%s",
document.id,
issue_identifier,
)
return {"status": "success"}
except Exception as e:
error_str = str(e).lower()
if (
"duplicate key value violates unique constraint" in error_str
or "uniqueviolationerror" in error_str
):
await self.db_session.rollback()
return {"status": "error", "message": "Duplicate document detected"}
logger.error(
"KB sync after create failed for issue %s: %s",
issue_identifier,
e,
exc_info=True,
)
await self.db_session.rollback()
return {"status": "error", "message": str(e)}
async def sync_after_update(
self,
document_id: int,
issue_id: str,
user_id: str,
search_space_id: int,
) -> dict:
from app.tasks.connector_indexers.base import (
get_current_timestamp,
safe_set_chunks,
)
try:
document = await self.db_session.get(Document, document_id)
if not document:
return {"status": "not_indexed"}
connector_id = document.connector_id
if not connector_id:
return {"status": "error", "message": "Document has no connector_id"}
jira_history = JiraHistoryConnector(
session=self.db_session, connector_id=connector_id
)
jira_client = await jira_history._get_jira_client()
issue_raw = await asyncio.to_thread(jira_client.get_issue, issue_id)
formatted = jira_client.format_issue(issue_raw)
issue_content = jira_client.format_issue_to_markdown(formatted)
if not issue_content:
return {"status": "error", "message": "Issue produced empty content"}
issue_identifier = formatted.get("key", "")
issue_title = formatted.get("title", "")
state = formatted.get("status", "Unknown")
comment_count = len(formatted.get("comments", []))
from app.services.llm_service import get_user_long_context_llm
user_llm = await get_user_long_context_llm(
self.db_session, user_id, search_space_id, disable_streaming=True
)
if user_llm:
doc_meta = {
"issue_key": issue_identifier,
"issue_title": issue_title,
"status": state,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
summary_content, summary_embedding = await generate_document_summary(
issue_content, user_llm, doc_meta
)
else:
summary_content = (
f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
)
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
chunks = await create_document_chunks(issue_content)
document.title = f"{issue_identifier}: {issue_title}"
document.content = summary_content
document.content_hash = generate_content_hash(
issue_content, search_space_id
)
document.embedding = summary_embedding
from sqlalchemy.orm.attributes import flag_modified
document.document_metadata = {
**(document.document_metadata or {}),
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
flag_modified(document, "document_metadata")
await safe_set_chunks(self.db_session, document, chunks)
document.updated_at = get_current_timestamp()
await self.db_session.commit()
logger.info(
"KB sync successful for document %s (%s: %s)",
document_id,
issue_identifier,
issue_title,
)
return {"status": "success"}
except Exception as e:
logger.error(
"KB sync failed for document %s: %s", document_id, e, exc_info=True
)
await self.db_session.rollback()
return {"status": "error", "message": str(e)}

View file

@ -1,332 +0,0 @@
import asyncio
import logging
from dataclasses import dataclass
from sqlalchemy import and_, func, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm.attributes import flag_modified
from app.connectors.jira_history import JiraHistoryConnector
from app.db import (
Document,
DocumentType,
SearchSourceConnector,
SearchSourceConnectorType,
)
logger = logging.getLogger(__name__)
@dataclass
class JiraWorkspace:
"""Represents a Jira connector as a workspace for tool context."""
id: int
name: str
base_url: str
@classmethod
def from_connector(cls, connector: SearchSourceConnector) -> "JiraWorkspace":
return cls(
id=connector.id,
name=connector.name,
base_url=connector.config.get("base_url", ""),
)
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"base_url": self.base_url,
}
@dataclass
class JiraIssue:
"""Represents an indexed Jira issue resolved from the knowledge base."""
issue_id: str
issue_identifier: str
issue_title: str
state: str
connector_id: int
document_id: int
indexed_at: str | None
@classmethod
def from_document(cls, document: Document) -> "JiraIssue":
meta = document.document_metadata or {}
return cls(
issue_id=meta.get("issue_id", ""),
issue_identifier=meta.get("issue_identifier", ""),
issue_title=meta.get("issue_title", document.title),
state=meta.get("state", ""),
connector_id=document.connector_id,
document_id=document.id,
indexed_at=meta.get("indexed_at"),
)
def to_dict(self) -> dict:
return {
"issue_id": self.issue_id,
"issue_identifier": self.issue_identifier,
"issue_title": self.issue_title,
"state": self.state,
"connector_id": self.connector_id,
"document_id": self.document_id,
"indexed_at": self.indexed_at,
}
class JiraToolMetadataService:
"""Builds interrupt context for Jira HITL tools."""
def __init__(self, db_session: AsyncSession):
self._db_session = db_session
async def _check_account_health(self, connector: SearchSourceConnector) -> bool:
"""Check if the Jira connector auth is still valid.
Returns True if auth is expired/invalid, False if healthy.
"""
try:
jira_history = JiraHistoryConnector(
session=self._db_session, connector_id=connector.id
)
jira_client = await jira_history._get_jira_client()
await asyncio.to_thread(jira_client.get_myself)
return False
except Exception as e:
logger.warning("Jira connector %s health check failed: %s", connector.id, e)
try:
connector.config = {**connector.config, "auth_expired": True}
flag_modified(connector, "config")
await self._db_session.commit()
await self._db_session.refresh(connector)
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
)
return True
async def get_creation_context(self, search_space_id: int, user_id: str) -> dict:
"""Return context needed to create a new Jira issue.
Fetches all connected Jira accounts, and for the first healthy one
fetches projects, issue types, and priorities.
"""
connectors = await self._get_all_jira_connectors(search_space_id, user_id)
if not connectors:
return {"error": "No Jira account connected"}
accounts = []
projects = []
issue_types = []
priorities = []
fetched_context = False
for connector in connectors:
auth_expired = await self._check_account_health(connector)
workspace = JiraWorkspace.from_connector(connector)
account_info = {
**workspace.to_dict(),
"auth_expired": auth_expired,
}
accounts.append(account_info)
if not auth_expired and not fetched_context:
try:
jira_history = JiraHistoryConnector(
session=self._db_session, connector_id=connector.id
)
jira_client = await jira_history._get_jira_client()
raw_projects = await asyncio.to_thread(jira_client.get_projects)
projects = [
{"id": p.get("id"), "key": p.get("key"), "name": p.get("name")}
for p in raw_projects
]
raw_types = await asyncio.to_thread(jira_client.get_issue_types)
seen_type_names: set[str] = set()
issue_types = []
for t in raw_types:
if t.get("subtask", False):
continue
name = t.get("name")
if name not in seen_type_names:
seen_type_names.add(name)
issue_types.append({"id": t.get("id"), "name": name})
raw_priorities = await asyncio.to_thread(jira_client.get_priorities)
priorities = [
{"id": p.get("id"), "name": p.get("name")}
for p in raw_priorities
]
fetched_context = True
except Exception as e:
logger.warning(
"Failed to fetch Jira context for connector %s: %s",
connector.id,
e,
)
return {
"accounts": accounts,
"projects": projects,
"issue_types": issue_types,
"priorities": priorities,
}
async def get_update_context(
self, search_space_id: int, user_id: str, issue_ref: str
) -> dict:
"""Return context needed to update an indexed Jira issue.
Resolves the issue from the KB, then fetches current details from the Jira API.
"""
document = await self._resolve_issue(search_space_id, user_id, issue_ref)
if not document:
return {
"error": f"Issue '{issue_ref}' not found in your synced Jira issues. "
"Please make sure the issue is indexed in your knowledge base."
}
connector = await self._get_connector_for_document(document, user_id)
if not connector:
return {"error": "Connector not found or access denied"}
auth_expired = await self._check_account_health(connector)
if auth_expired:
return {
"error": "Jira authentication has expired. Please re-authenticate.",
"auth_expired": True,
"connector_id": connector.id,
}
workspace = JiraWorkspace.from_connector(connector)
issue = JiraIssue.from_document(document)
try:
jira_history = JiraHistoryConnector(
session=self._db_session, connector_id=connector.id
)
jira_client = await jira_history._get_jira_client()
issue_data = await asyncio.to_thread(jira_client.get_issue, issue.issue_id)
formatted = jira_client.format_issue(issue_data)
except Exception as e:
error_str = str(e).lower()
if (
"401" in error_str
or "403" in error_str
or "authentication" in error_str
):
return {
"error": f"Failed to fetch Jira issue: {e!s}",
"auth_expired": True,
"connector_id": connector.id,
}
return {"error": f"Failed to fetch Jira issue: {e!s}"}
return {
"account": {**workspace.to_dict(), "auth_expired": False},
"issue": {
"issue_id": formatted.get("key", issue.issue_id),
"issue_identifier": formatted.get("key", issue.issue_identifier),
"issue_title": formatted.get("title", issue.issue_title),
"state": formatted.get("status", "Unknown"),
"priority": formatted.get("priority", "Unknown"),
"issue_type": formatted.get("issue_type", "Unknown"),
"assignee": formatted.get("assignee"),
"description": formatted.get("description"),
"project": formatted.get("project", ""),
"document_id": issue.document_id,
"indexed_at": issue.indexed_at,
},
}
async def get_deletion_context(
self, search_space_id: int, user_id: str, issue_ref: str
) -> dict:
"""Return context needed to delete a Jira issue (KB metadata only, no API call)."""
document = await self._resolve_issue(search_space_id, user_id, issue_ref)
if not document:
return {
"error": f"Issue '{issue_ref}' not found in your synced Jira issues. "
"Please make sure the issue is indexed in your knowledge base."
}
connector = await self._get_connector_for_document(document, user_id)
if not connector:
return {"error": "Connector not found or access denied"}
auth_expired = connector.config.get("auth_expired", False)
workspace = JiraWorkspace.from_connector(connector)
issue = JiraIssue.from_document(document)
return {
"account": {**workspace.to_dict(), "auth_expired": auth_expired},
"issue": issue.to_dict(),
}
async def _resolve_issue(
self, search_space_id: int, user_id: str, issue_ref: str
) -> Document | None:
"""Resolve an issue from KB: issue_identifier -> issue_title -> document.title."""
ref_lower = issue_ref.lower()
result = await self._db_session.execute(
select(Document)
.join(
SearchSourceConnector, Document.connector_id == SearchSourceConnector.id
)
.filter(
and_(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.JIRA_CONNECTOR,
SearchSourceConnector.user_id == user_id,
or_(
func.lower(
Document.document_metadata.op("->>")("issue_identifier")
)
== ref_lower,
func.lower(Document.document_metadata.op("->>")("issue_title"))
== ref_lower,
func.lower(Document.title) == ref_lower,
),
)
)
.order_by(Document.updated_at.desc().nullslast())
.limit(1)
)
return result.scalars().first()
async def _get_all_jira_connectors(
self, search_space_id: int, user_id: str
) -> list[SearchSourceConnector]:
result = await self._db_session.execute(
select(SearchSourceConnector).filter(
and_(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
)
return result.scalars().all()
async def _get_connector_for_document(
self, document: Document, user_id: str
) -> SearchSourceConnector | None:
if not document.connector_id:
return None
result = await self._db_session.execute(
select(SearchSourceConnector).filter(
and_(
SearchSourceConnector.id == document.connector_id,
SearchSourceConnector.user_id == user_id,
)
)
)
return result.scalars().first()

View file

@ -1,364 +0,0 @@
"""Jira connector indexer using the unified parallel indexing pipeline."""
import contextlib
from collections.abc import Awaitable, Callable
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.jira_history import JiraHistoryConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from .base import (
calculate_date_range,
check_duplicate_document_by_hash,
get_connector_by_id,
logger,
update_connector_last_indexed,
)
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
def _build_connector_doc(
issue: dict,
formatted_issue: dict,
issue_content: str,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Jira issue dict to a ConnectorDocument."""
issue_id = issue.get("key", "")
issue_identifier = issue.get("key", "")
issue_title = issue.get("id", "")
state = formatted_issue.get("status", "Unknown")
priority = formatted_issue.get("priority", "Unknown")
comment_count = len(formatted_issue.get("comments", []))
metadata = {
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": state,
"priority": priority,
"comment_count": comment_count,
"connector_id": connector_id,
"document_type": "Jira Issue",
"connector_type": "Jira",
}
fallback_summary = (
f"Jira Issue {issue_identifier}: {issue_title}\n\n"
f"Status: {state}\n\n{issue_content}"
)
return ConnectorDocument(
title=f"{issue_identifier}: {issue_title}",
source_markdown=issue_content,
unique_id=issue_id,
document_type=DocumentType.JIRA_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def index_jira_issues(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, str | None]:
"""Index Jira issues and comments."""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="jira_issues_indexing",
source="connector_indexing_task",
message=f"Starting Jira issues indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date,
"end_date": end_date,
},
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.JIRA_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return 0, 0, f"Connector with ID {connector_id} not found"
await task_logger.log_task_progress(
log_entry,
f"Initializing Jira client for connector {connector_id}",
{"stage": "client_initialization"},
)
jira_client = JiraHistoryConnector(session=session, connector_id=connector_id)
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
await task_logger.log_task_progress(
log_entry,
f"Fetching Jira issues from {start_date_str} to {end_date_str}",
{
"stage": "fetching_issues",
"start_date": start_date_str,
"end_date": end_date_str,
},
)
try:
issues, error = await jira_client.get_issues_by_date_range(
start_date=start_date_str, end_date=end_date_str, include_comments=True
)
if error:
if "No issues found" in error:
logger.info(f"No Jira issues found: {error}")
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
logger.info(
f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
)
await task_logger.log_task_success(
log_entry,
f"No Jira issues found in date range {start_date_str} to {end_date_str}",
{"issues_found": 0},
)
await jira_client.close()
return 0, 0, None
else:
logger.error(f"Failed to get Jira issues: {error}")
await task_logger.log_task_failure(
log_entry,
f"Failed to get Jira issues: {error}",
"API Error",
{"error_type": "APIError"},
)
await jira_client.close()
return 0, 0, f"Failed to get Jira issues: {error}"
logger.info(f"Retrieved {len(issues)} issues from Jira API")
except Exception as e:
logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True)
await jira_client.close()
return 0, 0, f"Error fetching Jira issues: {e!s}"
if not issues:
logger.info("No Jira issues found for the specified date range")
if update_last_indexed:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await session.commit()
await jira_client.close()
return 0, 0, None
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
placeholders = [
PlaceholderInfo(
title=f"{issue.get('key', '')}: {issue.get('id', '')}",
document_type=DocumentType.JIRA_CONNECTOR,
unique_id=issue.get("key", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"issue_id": issue.get("key", ""),
"connector_id": connector_id,
"connector_type": "Jira",
},
)
for issue in issues
if issue.get("key") and issue.get("id")
]
await pipeline.create_placeholder_documents(placeholders)
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
duplicate_content_count = 0
for issue in issues:
try:
issue_id = issue.get("key")
issue_identifier = issue.get("key", "")
issue_title = issue.get("id", "")
if not issue_id or not issue_title:
logger.warning(
f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}"
)
documents_skipped += 1
continue
formatted_issue = jira_client.format_issue(issue)
issue_content = jira_client.format_issue_to_markdown(formatted_issue)
if not issue_content:
logger.warning(
f"Skipping issue with no content: {issue_identifier} - {issue_title}"
)
documents_skipped += 1
continue
doc = _build_connector_doc(
issue,
formatted_issue,
issue_content,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, compute_content_hash(doc)
)
if duplicate_by_content:
logger.info(
f"Jira issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
connector_docs.append(doc)
except Exception as e:
logger.error(
f"Error building ConnectorDocument for issue {issue_identifier}: {e!s}",
exc_info=True,
)
documents_skipped += 1
continue
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s: AsyncSession):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
await update_connector_last_indexed(session, connector, update_last_indexed)
logger.info(f"Final commit: Total {documents_indexed} Jira issues processed")
try:
await session.commit()
logger.info("Successfully committed all JIRA document changes to database")
except Exception as e:
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same issue was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
f"Successfully completed JIRA indexing for connector {connector_id}",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"JIRA indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
await jira_client.close()
return documents_indexed, documents_skipped, warning_message
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during JIRA indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
if "jira_client" in locals():
with contextlib.suppress(Exception):
await jira_client.close()
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index JIRA issues for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True)
if "jira_client" in locals():
with contextlib.suppress(Exception):
await jira_client.close()
return 0, 0, f"Failed to index JIRA issues: {e!s}"

View file

@ -1,387 +0,0 @@
"""Tests for Jira indexer migrated to the unified parallel pipeline."""
from unittest.mock import AsyncMock, MagicMock
import pytest
import app.tasks.connector_indexers.jira_indexer as _mod
from app.db import DocumentType
from app.tasks.connector_indexers.jira_indexer import (
_build_connector_doc,
index_jira_issues,
)
pytestmark = pytest.mark.unit
_USER_ID = "00000000-0000-0000-0000-000000000001"
_CONNECTOR_ID = 42
_SEARCH_SPACE_ID = 1
def _make_issue(
issue_key: str = "ENG-1",
issue_id: str = "10001",
title: str = "Fix login",
):
return {"key": issue_key, "id": issue_id, "title": title}
def _make_formatted_issue(
issue_key: str = "ENG-1",
issue_id: str = "10001",
title: str = "Fix login",
status: str = "In Progress",
priority: str = "High",
comments=None,
):
return {
"key": issue_key,
"id": issue_id,
"title": title,
"status": status,
"priority": priority,
"comments": comments or [],
}
# ---------------------------------------------------------------------------
# Slice 1: _build_connector_doc tracer bullet
# ---------------------------------------------------------------------------
async def test_build_connector_doc_produces_correct_fields():
issue = _make_issue(issue_key="ENG-42", issue_id="4242", title="Fix auth bug")
formatted = _make_formatted_issue(
issue_key="ENG-42",
issue_id="4242",
title="Fix auth bug",
status="Done",
priority="Urgent",
comments=[{"id": "c1"}],
)
markdown = "# ENG-42: Fix auth bug\n\nBody"
doc = _build_connector_doc(
issue,
formatted,
markdown,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert doc.title == "ENG-42: 4242"
assert doc.unique_id == "ENG-42"
assert doc.document_type == DocumentType.JIRA_CONNECTOR
assert doc.source_markdown == markdown
assert doc.search_space_id == _SEARCH_SPACE_ID
assert doc.connector_id == _CONNECTOR_ID
assert doc.created_by_id == _USER_ID
assert doc.should_summarize is True
assert doc.metadata["issue_id"] == "ENG-42"
assert doc.metadata["issue_identifier"] == "ENG-42"
assert doc.metadata["issue_title"] == "4242"
assert doc.metadata["state"] == "Done"
assert doc.metadata["priority"] == "Urgent"
assert doc.metadata["comment_count"] == 1
assert doc.metadata["connector_id"] == _CONNECTOR_ID
assert doc.metadata["document_type"] == "Jira Issue"
assert doc.metadata["connector_type"] == "Jira"
assert doc.fallback_summary is not None
assert "ENG-42" in doc.fallback_summary
assert markdown in doc.fallback_summary
async def test_build_connector_doc_summary_disabled():
doc = _build_connector_doc(
_make_issue(),
_make_formatted_issue(),
"# content",
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=False,
)
assert doc.should_summarize is False
# ---------------------------------------------------------------------------
# Shared fixtures for Slices 2-7
# ---------------------------------------------------------------------------
def _mock_connector(enable_summary: bool = True):
c = MagicMock()
c.config = {"access_token": "tok"}
c.enable_summary = enable_summary
c.last_indexed_at = None
return c
def _mock_jira_client(issues=None, error=None):
client = MagicMock()
client.get_issues_by_date_range = AsyncMock(
return_value=(issues if issues is not None else [], error),
)
client.format_issue = MagicMock(
side_effect=lambda i: _make_formatted_issue(
issue_key=i.get("key", ""),
issue_id=i.get("id", ""),
title=i.get("title", ""),
)
)
client.format_issue_to_markdown = MagicMock(
side_effect=lambda fi: f"# {fi.get('key', '')}: {fi.get('id', '')}\n\nContent"
)
client.close = AsyncMock()
return client
@pytest.fixture
def jira_mocks(monkeypatch):
mock_session = AsyncMock()
mock_session.no_autoflush = MagicMock()
mock_connector = _mock_connector()
monkeypatch.setattr(
_mod,
"get_connector_by_id",
AsyncMock(return_value=mock_connector),
)
jira_client = _mock_jira_client(issues=[_make_issue()])
monkeypatch.setattr(
_mod,
"JiraHistoryConnector",
MagicMock(return_value=jira_client),
)
monkeypatch.setattr(
_mod,
"check_duplicate_document_by_hash",
AsyncMock(return_value=None),
)
monkeypatch.setattr(
_mod,
"update_connector_last_indexed",
AsyncMock(),
)
monkeypatch.setattr(
_mod,
"calculate_date_range",
MagicMock(return_value=("2025-01-01", "2025-12-31")),
)
mock_task_logger = MagicMock()
mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock())
mock_task_logger.log_task_progress = AsyncMock()
mock_task_logger.log_task_success = AsyncMock()
mock_task_logger.log_task_failure = AsyncMock()
monkeypatch.setattr(
_mod,
"TaskLoggingService",
MagicMock(return_value=mock_task_logger),
)
batch_mock = AsyncMock(return_value=([], 1, 0))
pipeline_mock = MagicMock()
pipeline_mock.index_batch_parallel = batch_mock
pipeline_mock.migrate_legacy_docs = AsyncMock()
pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0)
monkeypatch.setattr(
_mod,
"IndexingPipelineService",
MagicMock(return_value=pipeline_mock),
)
return {
"session": mock_session,
"connector": mock_connector,
"jira_client": jira_client,
"task_logger": mock_task_logger,
"pipeline_mock": pipeline_mock,
"batch_mock": batch_mock,
}
async def _run_index(mocks, **overrides):
return await index_jira_issues(
session=mocks["session"],
connector_id=overrides.get("connector_id", _CONNECTOR_ID),
search_space_id=overrides.get("search_space_id", _SEARCH_SPACE_ID),
user_id=overrides.get("user_id", _USER_ID),
start_date=overrides.get("start_date", "2025-01-01"),
end_date=overrides.get("end_date", "2025-12-31"),
update_last_indexed=overrides.get("update_last_indexed", True),
on_heartbeat_callback=overrides.get("on_heartbeat_callback"),
)
# ---------------------------------------------------------------------------
# Slice 2: Full pipeline wiring
# ---------------------------------------------------------------------------
async def test_one_issue_calls_pipeline_and_returns_indexed_count(jira_mocks):
indexed, skipped, warning = await _run_index(jira_mocks)
assert indexed == 1
assert skipped == 0
assert warning is None
jira_mocks["batch_mock"].assert_called_once()
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert connector_docs[0].document_type == DocumentType.JIRA_CONNECTOR
async def test_pipeline_called_with_max_concurrency_3(jira_mocks):
await _run_index(jira_mocks)
call_kwargs = jira_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("max_concurrency") == 3
async def test_migrate_legacy_docs_called_before_indexing(jira_mocks):
await _run_index(jira_mocks)
jira_mocks["pipeline_mock"].migrate_legacy_docs.assert_called_once()
# ---------------------------------------------------------------------------
# Slice 3: Issue skipping (missing key/title/content)
# ---------------------------------------------------------------------------
async def test_issues_with_missing_key_are_skipped(jira_mocks):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
{"key": "", "id": "10002", "title": "No key"},
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
async def test_issues_with_missing_title_are_skipped(jira_mocks):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
{"key": "ENG-2", "id": "", "title": "Missing id used as title"},
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
async def test_issues_with_no_content_are_skipped(jira_mocks):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
_make_issue(issue_key="ENG-2", issue_id="10002"),
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
jira_mocks["jira_client"].format_issue_to_markdown.side_effect = [
"# ENG-1: 10001\n\nContent",
"",
]
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 4: Duplicate content skipping
# ---------------------------------------------------------------------------
async def test_duplicate_content_issues_are_skipped(jira_mocks, monkeypatch):
issues = [
_make_issue(issue_key="ENG-1", issue_id="10001"),
_make_issue(issue_key="ENG-2", issue_id="10002"),
]
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (issues, None)
call_count = 0
async def _check_dup(session, content_hash):
nonlocal call_count
call_count += 1
if call_count == 2:
dup = MagicMock()
dup.id = 99
dup.document_type = "OTHER"
return dup
return None
monkeypatch.setattr(_mod, "check_duplicate_document_by_hash", _check_dup)
_, skipped, _ = await _run_index(jira_mocks)
connector_docs = jira_mocks["batch_mock"].call_args[0][0]
assert len(connector_docs) == 1
assert skipped == 1
# ---------------------------------------------------------------------------
# Slice 5: Heartbeat callback forwarding
# ---------------------------------------------------------------------------
async def test_heartbeat_callback_forwarded_to_pipeline(jira_mocks):
heartbeat_cb = AsyncMock()
await _run_index(jira_mocks, on_heartbeat_callback=heartbeat_cb)
call_kwargs = jira_mocks["batch_mock"].call_args[1]
assert call_kwargs.get("on_heartbeat") is heartbeat_cb
# ---------------------------------------------------------------------------
# Slice 6: Empty issues and no-data success tuple
# ---------------------------------------------------------------------------
async def test_empty_issues_returns_zero_tuple(jira_mocks):
jira_mocks["jira_client"].get_issues_by_date_range.return_value = ([], None)
indexed, skipped, warning = await _run_index(jira_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
jira_mocks["batch_mock"].assert_not_called()
async def test_no_issues_error_message_returns_success_tuple(jira_mocks):
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (
[],
"No issues found in date range",
)
indexed, skipped, warning = await _run_index(jira_mocks)
assert indexed == 0
assert skipped == 0
assert warning is None
async def test_api_error_still_returns_3_tuple(jira_mocks):
jira_mocks["jira_client"].get_issues_by_date_range.return_value = (
[],
"API exploded",
)
result = await _run_index(jira_mocks)
assert len(result) == 3
assert result[0] == 0
assert result[1] == 0
assert "Failed to get Jira issues" in result[2]
# ---------------------------------------------------------------------------
# Slice 7: Failed docs warning
# ---------------------------------------------------------------------------
async def test_failed_docs_warning_in_result(jira_mocks):
jira_mocks["batch_mock"].return_value = ([], 0, 2)
_, _, warning = await _run_index(jira_mocks)
assert warning is not None
assert "2 failed" in warning