Fixed formatting and linting post Jira connector PR

This commit is contained in:
Utkarsh-Patel-13 2025-07-25 10:52:34 -07:00
commit 2827522ebc
30 changed files with 5428 additions and 3279 deletions

View file

@ -84,9 +84,9 @@ async def fetch_documents_by_ids(
"document": {
"id": doc.id,
"title": doc.title,
"document_type": doc.document_type.value
if doc.document_type
else "UNKNOWN",
"document_type": (
doc.document_type.value if doc.document_type else "UNKNOWN"
),
"metadata": doc.document_metadata or {},
},
"source": doc.document_type.value if doc.document_type else "UNKNOWN",
@ -186,9 +186,11 @@ async def fetch_documents_by_ids(
title = f"GitHub: {doc.title}"
description = metadata.get(
"description",
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content,
(
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content
),
)
url = metadata.get("url", "")
@ -204,9 +206,11 @@ async def fetch_documents_by_ids(
description = metadata.get(
"description",
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content,
(
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content
),
)
url = (
f"https://www.youtube.com/watch?v={video_id}"
@ -238,6 +242,35 @@ async def fetch_documents_by_ids(
else:
url = ""
elif doc_type == "JIRA_CONNECTOR":
# Extract Jira-specific metadata
issue_key = metadata.get("issue_key", "Unknown Issue")
issue_title = metadata.get("issue_title", "Untitled Issue")
status = metadata.get("status", "")
priority = metadata.get("priority", "")
issue_type = metadata.get("issue_type", "")
title = f"Jira: {issue_key} - {issue_title}"
if status:
title += f" ({status})"
description = (
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content
)
if priority:
description += f" | Priority: {priority}"
if issue_type:
description += f" | Type: {issue_type}"
# Construct Jira URL if we have the base URL
base_url = metadata.get("base_url", "")
if base_url and issue_key:
url = f"{base_url}/browse/{issue_key}"
else:
url = ""
elif doc_type == "EXTENSION":
# Extract Extension-specific metadata
webpage_title = metadata.get("VisitedWebPageTitle", doc.title)
@ -268,9 +301,11 @@ async def fetch_documents_by_ids(
"og:description",
metadata.get(
"ogDescription",
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content,
(
doc.content[:100] + "..."
if len(doc.content) > 100
else doc.content
),
),
)
url = metadata.get("url", "")
@ -301,6 +336,7 @@ async def fetch_documents_by_ids(
"GITHUB_CONNECTOR": "GitHub (Selected)",
"YOUTUBE_VIDEO": "YouTube Videos (Selected)",
"DISCORD_CONNECTOR": "Discord (Selected)",
"JIRA_CONNECTOR": "Jira Issues (Selected)",
"EXTENSION": "Browser Extension (Selected)",
"CRAWLED_URL": "Web Pages (Selected)",
"FILE": "Files (Selected)",
@ -376,10 +412,10 @@ async def write_answer_outline(
# Create the human message content
human_message_content = f"""
Now Please create an answer outline for the following query:
User Query: {reformulated_query}
Number of Sections: {num_sections}
Remember to format your response as valid JSON exactly matching this structure:
{{
"answer_outline": [
@ -393,7 +429,7 @@ async def write_answer_outline(
}}
]
}}
Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation.
"""
@ -802,7 +838,9 @@ async def fetch_relevant_documents(
source_object,
linkup_chunks,
) = await connector_service.search_linkup(
user_query=reformulated_query, user_id=user_id, mode=linkup_mode
user_query=reformulated_query,
user_id=user_id,
mode=linkup_mode,
)
# Add to sources and raw documents
@ -845,6 +883,30 @@ async def fetch_relevant_documents(
}
)
elif connector == "JIRA_CONNECTOR":
source_object, jira_chunks = await connector_service.search_jira(
user_query=reformulated_query,
user_id=user_id,
search_space_id=search_space_id,
top_k=top_k,
search_mode=search_mode,
)
# Add to sources and raw documents
if source_object:
all_sources.append(source_object)
all_raw_documents.extend(jira_chunks)
# Stream found document count
if streaming_service and writer:
writer(
{
"yield_value": streaming_service.format_terminal_info_delta(
f"🎫 Found {len(jira_chunks)} Jira issues related to your query"
)
}
)
except Exception as e:
error_message = f"Error searching connector {connector}: {e!s}"
print(error_message)
@ -1214,7 +1276,7 @@ async def process_sections(
# Combine the results into a final report with section titles
final_report = []
for _, (section, content) in enumerate(
for _i, (section, content) in enumerate(
zip(answer_outline.answer_outline, processed_results, strict=False)
):
# Skip adding the section header since the content already contains the title
@ -1725,11 +1787,11 @@ async def generate_further_questions(
# Create the human message content
human_message_content = f"""
{chat_history_xml}
{documents_xml}
Based on the chat history and available documents above, generate 3-5 contextually relevant follow-up questions that would naturally extend the conversation and provide additional value to the user. Make sure the questions can be reasonably answered using the available documents or knowledge base.
Your response MUST be valid JSON in exactly this format:
{{
"further_questions": [
@ -1743,7 +1805,7 @@ async def generate_further_questions(
}}
]
}}
Do not include any other text or explanation. Only return the JSON.
"""

View file

@ -15,7 +15,8 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel
- YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos)
- GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions)
- LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management)
- DISCORD_CONNECTOR: "Discord server messages and channels" (personal community interactions)
- JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking)
- DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications)
- TAVILY_API: "Tavily search API results" (personalized search results)
- LINKUP_API: "Linkup search API results" (personalized search results)
</knowledge_sources>
@ -71,7 +72,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel
Python's asyncio library provides tools for writing concurrent code using the async/await syntax. It's particularly useful for I/O-bound and high-level structured network code.
</content>
</document>
<document>
<metadata>
<source_id>12</source_id>

View file

@ -43,6 +43,8 @@ def get_connector_emoji(connector_name: str) -> str:
"NOTION_CONNECTOR": "📘",
"GITHUB_CONNECTOR": "🐙",
"LINEAR_CONNECTOR": "📊",
"JIRA_CONNECTOR": "🎫",
"DISCORD_CONNECTOR": "🗨️",
"TAVILY_API": "🔍",
"LINKUP_API": "🔗",
}
@ -60,6 +62,8 @@ def get_connector_friendly_name(connector_name: str) -> str:
"NOTION_CONNECTOR": "Notion",
"GITHUB_CONNECTOR": "GitHub",
"LINEAR_CONNECTOR": "Linear",
"JIRA_CONNECTOR": "Jira",
"DISCORD_CONNECTOR": "Discord",
"TAVILY_API": "Tavily Search",
"LINKUP_API": "Linkup Search",
}

View file

@ -0,0 +1,487 @@
"""
Jira Connector Module
A module for retrieving data from Jira.
Allows fetching issue lists and their comments, projects and more.
"""
import base64
from datetime import datetime
from typing import Any
import requests
class JiraConnector:
"""Class for retrieving data from Jira."""
def __init__(
self,
base_url: str | None = None,
email: str | None = None,
api_token: str | None = None,
):
"""
Initialize the JiraConnector class.
Args:
base_url: Jira instance base URL (e.g., 'https://yourcompany.atlassian.net') (optional)
email: Jira account email address (optional)
api_token: Jira API token (optional)
"""
self.base_url = base_url.rstrip("/") if base_url else None
self.email = email
self.api_token = api_token
self.api_version = "3" # Jira Cloud API version
def set_credentials(self, base_url: str, email: str, api_token: str) -> None:
"""
Set the Jira credentials.
Args:
base_url: Jira instance base URL
email: Jira account email address
api_token: Jira API token
"""
self.base_url = base_url.rstrip("/")
self.email = email
self.api_token = api_token
def set_email(self, email: str) -> None:
"""
Set the Jira account email.
Args:
email: Jira account email address
"""
self.email = email
def set_api_token(self, api_token: str) -> None:
"""
Set the Jira API token.
Args:
api_token: Jira API token
"""
self.api_token = api_token
def get_headers(self) -> dict[str, str]:
"""
Get headers for Jira API requests using Basic Authentication.
Returns:
Dictionary of headers
Raises:
ValueError: If email, api_token, or base_url have not been set
"""
if not all([self.base_url, self.email, self.api_token]):
raise ValueError(
"Jira credentials not initialized. Call set_credentials() first."
)
# Create Basic Auth header using email:api_token
auth_str = f"{self.email}:{self.api_token}"
auth_bytes = auth_str.encode("utf-8")
auth_header = "Basic " + base64.b64encode(auth_bytes).decode("ascii")
return {
"Content-Type": "application/json",
"Authorization": auth_header,
"Accept": "application/json",
}
def make_api_request(
self, endpoint: str, params: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Make a request to the Jira API.
Args:
endpoint: API endpoint (without base URL)
params: Query parameters for the request (optional)
Returns:
Response data from the API
Raises:
ValueError: If email, api_token, or base_url have not been set
Exception: If the API request fails
"""
if not all([self.base_url, self.email, self.api_token]):
raise ValueError(
"Jira credentials not initialized. Call set_credentials() first."
)
url = f"{self.base_url}/rest/api/{self.api_version}/{endpoint}"
headers = self.get_headers()
response = requests.get(url, headers=headers, params=params, timeout=500)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"API request failed with status code {response.status_code}: {response.text}"
)
def get_all_projects(self) -> dict[str, Any]:
"""
Fetch all projects from Jira.
Returns:
List of project objects
Raises:
ValueError: If credentials have not been set
Exception: If the API request fails
"""
return self.make_api_request("project/search")
def get_all_issues(self, project_key: str | None = None) -> list[dict[str, Any]]:
"""
Fetch all issues from Jira.
Args:
project_key: Optional project key to filter issues (e.g., 'PROJ')
Returns:
List of issue objects
Raises:
ValueError: If credentials have not been set
Exception: If the API request fails
"""
jql = "ORDER BY created DESC"
if project_key:
jql = f'project = "{project_key}" ' + jql
fields = [
"summary",
"description",
"status",
"assignee",
"reporter",
"created",
"updated",
"priority",
"issuetype",
"project",
]
params = {
"jql": jql,
"fields": ",".join(fields),
"maxResults": 100,
"startAt": 0,
}
all_issues = []
start_at = 0
while True:
params["startAt"] = start_at
result = self.make_api_request("search", params)
if not isinstance(result, dict) or "issues" not in result:
raise Exception("Invalid response from Jira API")
issues = result["issues"]
all_issues.extend(issues)
print(f"Fetched {len(issues)} issues (startAt={start_at})")
total = result.get("total", 0)
if start_at + len(issues) >= total:
break
start_at += len(issues)
return all_issues
def get_issues_by_date_range(
self,
start_date: str,
end_date: str,
include_comments: bool = True,
project_key: str | None = None,
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch issues within a date range.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (inclusive)
include_comments: Whether to include comments in the response
project_key: Optional project key to filter issues
Returns:
Tuple containing (issues list, error message or None)
"""
try:
# Build JQL query for date range
# Query issues that were either created OR updated within the date range
date_filter = (
f"(createdDate >= '{start_date}' AND createdDate <= '{end_date}')"
)
# TODO : This JQL needs some improvement to work as expected
_jql = f"{date_filter}"
if project_key:
_jql = (
f'project = "{project_key}" AND {date_filter} ORDER BY created DESC'
)
# Define fields to retrieve
fields = [
"summary",
"description",
"status",
"assignee",
"reporter",
"created",
"updated",
"priority",
"issuetype",
"project",
]
if include_comments:
fields.append("comment")
params = {
# "jql": "", TODO : Add a JQL query to filter from a date range
"fields": ",".join(fields),
"maxResults": 100,
"startAt": 0,
}
all_issues = []
start_at = 0
while True:
params["startAt"] = start_at
result = self.make_api_request("search", params)
if not isinstance(result, dict) or "issues" not in result:
return [], "Invalid response from Jira API"
issues = result["issues"]
all_issues.extend(issues)
# Check if there are more issues to fetch
total = result.get("total", 0)
if start_at + len(issues) >= total:
break
start_at += len(issues)
if not all_issues:
return [], "No issues found in the specified date range."
return all_issues, None
except Exception as e:
return [], f"Error fetching issues: {e!s}"
def format_issue(self, issue: dict[str, Any]) -> dict[str, Any]:
"""
Format an issue for easier consumption.
Args:
issue: The issue object from Jira API
Returns:
Formatted issue dictionary
"""
fields = issue.get("fields", {})
# Extract basic issue details
formatted = {
"id": issue.get("id", ""),
"key": issue.get("key", ""),
"title": fields.get("summary", ""),
"description": fields.get("description", ""),
"status": (
fields.get("status", {}).get("name", "Unknown")
if fields.get("status")
else "Unknown"
),
"status_category": (
fields.get("status", {})
.get("statusCategory", {})
.get("name", "Unknown")
if fields.get("status")
else "Unknown"
),
"priority": (
fields.get("priority", {}).get("name", "Unknown")
if fields.get("priority")
else "Unknown"
),
"issue_type": (
fields.get("issuetype", {}).get("name", "Unknown")
if fields.get("issuetype")
else "Unknown"
),
"project": (
fields.get("project", {}).get("key", "Unknown")
if fields.get("project")
else "Unknown"
),
"created_at": fields.get("created", ""),
"updated_at": fields.get("updated", ""),
"reporter": (
{
"account_id": (
fields.get("reporter", {}).get("accountId", "")
if fields.get("reporter")
else ""
),
"display_name": (
fields.get("reporter", {}).get("displayName", "Unknown")
if fields.get("reporter")
else "Unknown"
),
"email": (
fields.get("reporter", {}).get("emailAddress", "")
if fields.get("reporter")
else ""
),
}
if fields.get("reporter")
else {"account_id": "", "display_name": "Unknown", "email": ""}
),
"assignee": (
{
"account_id": fields.get("assignee", {}).get("accountId", ""),
"display_name": fields.get("assignee", {}).get(
"displayName", "Unknown"
),
"email": fields.get("assignee", {}).get("emailAddress", ""),
}
if fields.get("assignee")
else None
),
"comments": [],
}
# Extract comments if available
if "comment" in fields and "comments" in fields["comment"]:
for comment in fields["comment"]["comments"]:
formatted_comment = {
"id": comment.get("id", ""),
"body": comment.get("body", ""),
"created_at": comment.get("created", ""),
"updated_at": comment.get("updated", ""),
"author": (
{
"account_id": (
comment.get("author", {}).get("accountId", "")
if comment.get("author")
else ""
),
"display_name": (
comment.get("author", {}).get("displayName", "Unknown")
if comment.get("author")
else "Unknown"
),
"email": (
comment.get("author", {}).get("emailAddress", "")
if comment.get("author")
else ""
),
}
if comment.get("author")
else {"account_id": "", "display_name": "Unknown", "email": ""}
),
}
formatted["comments"].append(formatted_comment)
return formatted
def format_issue_to_markdown(self, issue: dict[str, Any]) -> str:
"""
Convert an issue to markdown format.
Args:
issue: The issue object (either raw or formatted)
Returns:
Markdown string representation of the issue
"""
# Format the issue if it's not already formatted
if "key" not in issue:
issue = self.format_issue(issue)
# Build the markdown content
markdown = (
f"# {issue.get('key', 'No Key')}: {issue.get('title', 'No Title')}\n\n"
)
if issue.get("status"):
markdown += f"**Status:** {issue['status']}\n"
if issue.get("priority"):
markdown += f"**Priority:** {issue['priority']}\n"
if issue.get("issue_type"):
markdown += f"**Type:** {issue['issue_type']}\n"
if issue.get("project"):
markdown += f"**Project:** {issue['project']}\n\n"
if issue.get("assignee") and issue["assignee"].get("display_name"):
markdown += f"**Assignee:** {issue['assignee']['display_name']}\n"
if issue.get("reporter") and issue["reporter"].get("display_name"):
markdown += f"**Reporter:** {issue['reporter']['display_name']}\n"
if issue.get("created_at"):
created_date = self.format_date(issue["created_at"])
markdown += f"**Created:** {created_date}\n"
if issue.get("updated_at"):
updated_date = self.format_date(issue["updated_at"])
markdown += f"**Updated:** {updated_date}\n\n"
if issue.get("description"):
markdown += f"## Description\n\n{issue['description']}\n\n"
if issue.get("comments"):
markdown += f"## Comments ({len(issue['comments'])})\n\n"
for comment in issue["comments"]:
author_name = "Unknown"
if comment.get("author") and comment["author"].get("display_name"):
author_name = comment["author"]["display_name"]
comment_date = "Unknown date"
if comment.get("created_at"):
comment_date = self.format_date(comment["created_at"])
markdown += f"### {author_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n"
return markdown
@staticmethod
def format_date(iso_date: str) -> str:
"""
Format an ISO date string to a more readable format.
Args:
iso_date: ISO format date string
Returns:
Formatted date string
"""
if not iso_date or not isinstance(iso_date, str):
return "Unknown date"
try:
# Jira dates are typically in format: 2023-01-01T12:00:00.000+0000
dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return iso_date

View file

@ -3,6 +3,7 @@ from datetime import UTC, datetime
from enum import Enum
from fastapi import Depends
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase
from pgvector.sqlalchemy import Vector
from sqlalchemy import (
ARRAY,
@ -26,13 +27,7 @@ from app.retriver.chunks_hybrid_search import ChucksHybridSearchRetriever
from app.retriver.documents_hybrid_search import DocumentHybridSearchRetriever
if config.AUTH_TYPE == "GOOGLE":
from fastapi_users.db import (
SQLAlchemyBaseOAuthAccountTableUUID,
SQLAlchemyBaseUserTableUUID,
SQLAlchemyUserDatabase,
)
else:
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
DATABASE_URL = config.DATABASE_URL
@ -47,6 +42,7 @@ class DocumentType(str, Enum):
GITHUB_CONNECTOR = "GITHUB_CONNECTOR"
LINEAR_CONNECTOR = "LINEAR_CONNECTOR"
DISCORD_CONNECTOR = "DISCORD_CONNECTOR"
JIRA_CONNECTOR = "JIRA_CONNECTOR"
class SearchSourceConnectorType(str, Enum):
@ -58,6 +54,7 @@ class SearchSourceConnectorType(str, Enum):
GITHUB_CONNECTOR = "GITHUB_CONNECTOR"
LINEAR_CONNECTOR = "LINEAR_CONNECTOR"
DISCORD_CONNECTOR = "DISCORD_CONNECTOR"
JIRA_CONNECTOR = "JIRA_CONNECTOR"
class ChatType(str, Enum):
@ -320,6 +317,7 @@ if config.AUTH_TYPE == "GOOGLE":
strategic_llm = relationship(
"LLMConfig", foreign_keys=[strategic_llm_id], post_update=True
)
else:
class User(SQLAlchemyBaseUserTableUUID, Base):
@ -402,6 +400,7 @@ if config.AUTH_TYPE == "GOOGLE":
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User, OAuthAccount)
else:
async def get_user_db(session: AsyncSession = Depends(get_async_session)):

View file

@ -38,6 +38,7 @@ from app.schemas import (
from app.tasks.connectors_indexing_tasks import (
index_discord_messages,
index_github_repos,
index_jira_issues,
index_linear_issues,
index_notion_pages,
index_slack_messages,
@ -336,6 +337,7 @@ async def index_connector_content(
- NOTION_CONNECTOR: Indexes pages from all accessible Notion pages
- GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories
- LINEAR_CONNECTOR: Indexes issues and comments from Linear
- JIRA_CONNECTOR: Indexes issues and comments from Jira
- DISCORD_CONNECTOR: Indexes messages from all accessible Discord channels
Args:
@ -353,7 +355,9 @@ async def index_connector_content(
)
# Check if the search space belongs to the user
await check_ownership(session, SearchSpace, search_space_id, user)
_search_space = await check_ownership(
session, SearchSpace, search_space_id, user
)
# Handle different connector types
response_message = ""
@ -438,6 +442,21 @@ async def index_connector_content(
)
response_message = "Linear indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.JIRA_CONNECTOR:
# Run indexing in background
logger.info(
f"Triggering Jira indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
background_tasks.add_task(
run_jira_indexing_with_new_session,
connector_id,
search_space_id,
str(user.id),
indexing_from,
indexing_to,
)
response_message = "Jira indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
# Run indexing in background
logger.info(
@ -807,3 +826,61 @@ async def run_discord_indexing(
)
except Exception as e:
logger.error(f"Error in background Discord indexing task: {e!s}")
# Add new helper functions for Jira indexing
async def run_jira_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Wrapper to run Jira indexing with its own database session."""
logger.info(
f"Background task started: Indexing Jira connector {connector_id} into space {search_space_id} from {start_date} to {end_date}"
)
async with async_session_maker() as session:
await run_jira_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
logger.info(f"Background task finished: Indexing Jira connector {connector_id}")
async def run_jira_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Runs the Jira indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_jira_issues(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Jira indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await update_connector_last_indexed(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_jira_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure

View file

@ -123,6 +123,25 @@ class SearchSourceConnectorBase(BaseModel):
# Ensure the bot token is not empty
if not config.get("DISCORD_BOT_TOKEN"):
raise ValueError("DISCORD_BOT_TOKEN cannot be empty")
elif connector_type == SearchSourceConnectorType.JIRA_CONNECTOR:
# For JIRA_CONNECTOR, require JIRA_EMAIL, JIRA_API_TOKEN and JIRA_BASE_URL
allowed_keys = ["JIRA_EMAIL", "JIRA_API_TOKEN", "JIRA_BASE_URL"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For JIRA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the email is not empty
if not config.get("JIRA_EMAIL"):
raise ValueError("JIRA_EMAIL cannot be empty")
# Ensure the API token is not empty
if not config.get("JIRA_API_TOKEN"):
raise ValueError("JIRA_API_TOKEN cannot be empty")
# Ensure the base URL is not empty
if not config.get("JIRA_BASE_URL"):
raise ValueError("JIRA_BASE_URL cannot be empty")
return config

View file

@ -1,4 +1,5 @@
import asyncio
from typing import Any
from linkup import LinkupClient
from sqlalchemy import func
@ -204,7 +205,9 @@ class ConnectorService:
return result_object, files_chunks
def _transform_document_results(self, document_results: list[dict]) -> list[dict]:
def _transform_document_results(
self, document_results: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""
Transform results from document_retriever.hybrid_search() to match the format
expected by the processing code.
@ -608,6 +611,7 @@ class ConnectorService:
visit_duration = metadata.get(
"VisitedWebPageVisitDurationInMilliseconds", ""
)
_browsing_session_id = metadata.get("BrowsingSessionId", "")
# Create a more descriptive title for extension data
title = webpage_title
@ -948,6 +952,127 @@ class ConnectorService:
return result_object, linear_chunks
async def search_jira(
self,
user_query: str,
user_id: str,
search_space_id: int,
top_k: int = 20,
search_mode: SearchMode = SearchMode.CHUNKS,
) -> tuple:
"""
Search for Jira issues and comments and return both the source information and langchain documents
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
search_mode: Search mode (CHUNKS or DOCUMENTS)
Returns:
tuple: (sources_info, langchain_documents)
"""
if search_mode == SearchMode.CHUNKS:
jira_chunks = await self.chunk_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="JIRA_CONNECTOR",
)
elif search_mode == SearchMode.DOCUMENTS:
jira_chunks = await self.document_retriever.hybrid_search(
query_text=user_query,
top_k=top_k,
user_id=user_id,
search_space_id=search_space_id,
document_type="JIRA_CONNECTOR",
)
# Transform document retriever results to match expected format
jira_chunks = self._transform_document_results(jira_chunks)
# Early return if no results
if not jira_chunks:
return {
"id": 30,
"name": "Jira Issues",
"type": "JIRA_CONNECTOR",
"sources": [],
}, []
# Process each chunk and create sources directly without deduplication
sources_list = []
async with self.counter_lock:
for _i, chunk in enumerate(jira_chunks):
# Extract document metadata
document = chunk.get("document", {})
metadata = document.get("metadata", {})
# Extract Jira-specific metadata
issue_key = metadata.get("issue_key", "")
issue_title = metadata.get("issue_title", "Untitled Issue")
status = metadata.get("status", "")
priority = metadata.get("priority", "")
issue_type = metadata.get("issue_type", "")
comment_count = metadata.get("comment_count", 0)
# Create a more descriptive title for Jira issues
title = f"Jira: {issue_key} - {issue_title}"
if status:
title += f" ({status})"
# Create a more descriptive description for Jira issues
description = chunk.get("content", "")[:100]
if len(description) == 100:
description += "..."
# Add priority and type info to description
info_parts = []
if priority:
info_parts.append(f"Priority: {priority}")
if issue_type:
info_parts.append(f"Type: {issue_type}")
if comment_count:
info_parts.append(f"Comments: {comment_count}")
if info_parts:
if description:
description += f" | {' | '.join(info_parts)}"
else:
description = " | ".join(info_parts)
# For URL, we could construct a URL to the Jira issue if we have the base URL
# For now, use a generic placeholder
url = ""
if issue_key and metadata.get("base_url"):
url = f"{metadata.get('base_url')}/browse/{issue_key}"
source = {
"id": document.get("id", self.source_id_counter),
"title": title,
"description": description,
"url": url,
"issue_key": issue_key,
"status": status,
"priority": priority,
"issue_type": issue_type,
"comment_count": comment_count,
}
self.source_id_counter += 1
sources_list.append(source)
# Create result object
result_object = {
"id": 10, # Assign a unique ID for the Jira connector
"name": "Jira Issues",
"type": "JIRA_CONNECTOR",
"sources": sources_list,
}
return result_object, jira_chunks
async def search_linkup(
self, user_query: str, user_id: str, mode: str = "standard"
) -> tuple:
@ -1013,12 +1138,12 @@ class ConnectorService:
# Create a source entry
source = {
"id": self.source_id_counter,
"title": result.name
if hasattr(result, "name")
else "Linkup Result",
"description": result.content[:100]
if hasattr(result, "content")
else "",
"title": (
result.name if hasattr(result, "name") else "Linkup Result"
),
"description": (
result.content[:100] if hasattr(result, "content") else ""
),
"url": result.url if hasattr(result, "url") else "",
}
sources_list.append(source)
@ -1030,9 +1155,11 @@ class ConnectorService:
"score": 1.0, # Default score since not provided by Linkup
"document": {
"id": self.source_id_counter,
"title": result.name
if hasattr(result, "name")
else "Linkup Result",
"title": (
result.name
if hasattr(result, "name")
else "Linkup Result"
),
"document_type": "LINKUP_API",
"metadata": {
"url": result.url if hasattr(result, "url") else "",

View file

@ -10,6 +10,7 @@ from sqlalchemy.future import select
from app.config import config
from app.connectors.discord_connector import DiscordConnector
from app.connectors.github_connector import GitHubConnector
from app.connectors.jira_connector import JiraConnector
from app.connectors.linear_connector import LinearConnector
from app.connectors.notion_history import NotionHistoryConnector
from app.connectors.slack_history import SlackHistory
@ -1374,9 +1375,9 @@ async def index_linear_issues(
# Process each issue
for issue in issues:
try:
issue_id = issue.get("id")
issue_identifier = issue.get("identifier", "")
issue_title = issue.get("title", "")
issue_id = issue.get("key")
issue_identifier = issue.get("id", "")
issue_title = issue.get("key", "")
if not issue_id or not issue_title:
logger.warning(
@ -1978,3 +1979,353 @@ async def index_discord_messages(
)
logger.error(f"Failed to index Discord messages: {e!s}", exc_info=True)
return 0, f"Failed to index Discord messages: {e!s}"
async def index_jira_issues(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index Jira issues and comments.
Args:
session: Database session
connector_id: ID of the Jira connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="jira_issues_indexing",
source="connector_indexing_task",
message=f"Starting Jira issues indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date,
"end_date": end_date,
},
)
try:
# Get the connector from the database
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return 0, f"Connector with ID {connector_id} not found"
# Get the Jira credentials from the connector config
jira_email = connector.config.get("JIRA_EMAIL")
jira_api_token = connector.config.get("JIRA_API_TOKEN")
jira_base_url = connector.config.get("JIRA_BASE_URL")
if not jira_email or not jira_api_token or not jira_base_url:
await task_logger.log_task_failure(
log_entry,
f"Jira credentials not found in connector config for connector {connector_id}",
"Missing Jira credentials",
{"error_type": "MissingCredentials"},
)
return 0, "Jira credentials not found in connector config"
# Initialize Jira client
await task_logger.log_task_progress(
log_entry,
f"Initializing Jira client for connector {connector_id}",
{"stage": "client_initialization"},
)
jira_client = JiraConnector(
base_url=jira_base_url, email=jira_email, api_token=jira_api_token
)
# Calculate date range
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
# Use last_indexed_at as start date if available, otherwise use 365 days ago
if connector.last_indexed_at:
# Convert dates to be comparable (both timezone-naive)
last_indexed_naive = (
connector.last_indexed_at.replace(tzinfo=None)
if connector.last_indexed_at.tzinfo
else connector.last_indexed_at
)
# Check if last_indexed_at is in the future or after end_date
if last_indexed_naive > calculated_end_date:
logger.warning(
f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 365 days ago instead."
)
calculated_start_date = calculated_end_date - timedelta(days=365)
else:
calculated_start_date = last_indexed_naive
logger.info(
f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
)
else:
calculated_start_date = calculated_end_date - timedelta(
days=365
) # Use 365 days as default
logger.info(
f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date"
)
# Use calculated dates if not provided
start_date_str = (
start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
)
end_date_str = (
end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
)
else:
# Use provided dates
start_date_str = start_date
end_date_str = end_date
await task_logger.log_task_progress(
log_entry,
f"Fetching Jira issues from {start_date_str} to {end_date_str}",
{
"stage": "fetching_issues",
"start_date": start_date_str,
"end_date": end_date_str,
},
)
# Get issues within date range
try:
issues, error = jira_client.get_issues_by_date_range(
start_date=start_date_str, end_date=end_date_str, include_comments=True
)
if error:
logger.error(f"Failed to get Jira issues: {error}")
# Don't treat "No issues found" as an error that should stop indexing
if "No issues found" in error:
logger.info(
"No issues found is not a critical error, continuing with update"
)
if update_last_indexed:
connector.last_indexed_at = datetime.now()
await session.commit()
logger.info(
f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found"
)
await task_logger.log_task_success(
log_entry,
f"No Jira issues found in date range {start_date_str} to {end_date_str}",
{"issues_found": 0},
)
return 0, None
else:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Jira issues: {error}",
"API Error",
{"error_type": "APIError"},
)
return 0, f"Failed to get Jira issues: {error}"
logger.info(f"Retrieved {len(issues)} issues from Jira API")
except Exception as e:
logger.error(f"Error fetching Jira issues: {e!s}", exc_info=True)
return 0, f"Error fetching Jira issues: {e!s}"
# Process and index each issue
documents_indexed = 0
skipped_issues = []
documents_skipped = 0
for issue in issues:
try:
issue_id = issue.get("key")
issue_identifier = issue.get("key", "")
issue_title = issue.get("id", "")
if not issue_id or not issue_title:
logger.warning(
f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}"
)
skipped_issues.append(
f"{issue_identifier or 'Unknown'} (missing data)"
)
documents_skipped += 1
continue
# Format the issue for better readability
formatted_issue = jira_client.format_issue(issue)
# Convert to markdown
issue_content = jira_client.format_issue_to_markdown(formatted_issue)
if not issue_content:
logger.warning(
f"Skipping issue with no content: {issue_identifier} - {issue_title}"
)
skipped_issues.append(f"{issue_identifier} (no content)")
documents_skipped += 1
continue
# Create a simple summary
summary_content = f"Jira Issue {issue_identifier}: {issue_title}\n\nStatus: {formatted_issue.get('status', 'Unknown')}\n\n"
if formatted_issue.get("description"):
summary_content += (
f"Description: {formatted_issue.get('description')}\n\n"
)
# Add comment count
comment_count = len(formatted_issue.get("comments", []))
summary_content += f"Comments: {comment_count}"
# Generate content hash
content_hash = generate_content_hash(issue_content, search_space_id)
# Check if document already exists
existing_doc_by_hash_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
existing_document_by_hash = (
existing_doc_by_hash_result.scalars().first()
)
if existing_document_by_hash:
logger.info(
f"Document with content hash {content_hash} already exists for issue {issue_identifier}. Skipping processing."
)
documents_skipped += 1
continue
# Generate embedding for the summary
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks - using the full issue content with comments
chunks = [
Chunk(
content=chunk.text,
embedding=config.embedding_model_instance.embed(chunk.text),
)
for chunk in config.chunker_instance.chunk(issue_content)
]
# Create and store new document
logger.info(
f"Creating new document for issue {issue_identifier} - {issue_title}"
)
document = Document(
search_space_id=search_space_id,
title=f"Jira - {issue_identifier}: {issue_title}",
document_type=DocumentType.JIRA_CONNECTOR,
document_metadata={
"issue_id": issue_id,
"issue_identifier": issue_identifier,
"issue_title": issue_title,
"state": formatted_issue.get("status", "Unknown"),
"comment_count": comment_count,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
embedding=summary_embedding,
chunks=chunks,
)
session.add(document)
documents_indexed += 1
logger.info(
f"Successfully indexed new issue {issue_identifier} - {issue_title}"
)
except Exception as e:
logger.error(
f"Error processing issue {issue.get('identifier', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_issues.append(
f"{issue.get('identifier', 'Unknown')} (processing error)"
)
documents_skipped += 1
continue # Skip this issue and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if update_last_indexed:
connector.last_indexed_at = datetime.now()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Commit all changes
await session.commit()
logger.info("Successfully committed all JIRA document changes to database")
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed JIRA indexing for connector {connector_id}",
{
"issues_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_issues_count": len(skipped_issues),
},
)
logger.info(
f"JIRA indexing completed: {documents_indexed} new issues, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during JIRA indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index JIRA issues for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True)
return 0, f"Failed to index JIRA issues: {e!s}"