From ca986930058250d9f98e1c1e9738b233bb53b894 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 24 Jul 2025 11:52:06 +0200 Subject: [PATCH] update connector indexing / update connector service --- .../app/services/connector_service.py | 114 ++++++++ .../app/tasks/connectors_indexing_tasks.py | 256 ++++++++++++++++++ 2 files changed, 370 insertions(+) diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index f53fd4dfc..8c6f99c5f 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -857,6 +857,120 @@ class ConnectorService: return result_object, linear_chunks + async def search_jira(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20, search_mode: SearchMode = SearchMode.CHUNKS) -> tuple: + """ + Search for Jira issues and comments and return both the source information and langchain documents + + Args: + user_query: The user's query + user_id: The user's ID + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + search_mode: Search mode (CHUNKS or DOCUMENTS) + + Returns: + tuple: (sources_info, langchain_documents) + """ + if search_mode == SearchMode.CHUNKS: + jira_chunks = await self.chunk_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="JIRA_CONNECTOR" + ) + elif search_mode == SearchMode.DOCUMENTS: + jira_chunks = await self.document_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="JIRA_CONNECTOR" + ) + # Transform document retriever results to match expected format + jira_chunks = self._transform_document_results(jira_chunks) + + # Early return if no results + if not jira_chunks: + return { + "id": 10, + "name": "Jira Issues", + "type": "JIRA_CONNECTOR", + "sources": [], + }, [] + + # Process each chunk and create sources directly without deduplication + sources_list = [] + async with self.counter_lock: + for _i, chunk in enumerate(jira_chunks): + # Extract document metadata + document = chunk.get('document', {}) + metadata = document.get('metadata', {}) + + # Extract Jira-specific metadata + issue_key = metadata.get('issue_key', '') + issue_title = metadata.get('issue_title', 'Untitled Issue') + status = metadata.get('status', '') + priority = metadata.get('priority', '') + issue_type = metadata.get('issue_type', '') + comment_count = metadata.get('comment_count', 0) + + # Create a more descriptive title for Jira issues + title = f"Jira: {issue_key} - {issue_title}" + if status: + title += f" ({status})" + + # Create a more descriptive description for Jira issues + description = chunk.get('content', '')[:100] + if len(description) == 100: + description += "..." + + # Add priority and type info to description + info_parts = [] + if priority: + info_parts.append(f"Priority: {priority}") + if issue_type: + info_parts.append(f"Type: {issue_type}") + if comment_count: + info_parts.append(f"Comments: {comment_count}") + + if info_parts: + if description: + description += f" | {' | '.join(info_parts)}" + else: + description = ' | '.join(info_parts) + + # For URL, we could construct a URL to the Jira issue if we have the base URL + # For now, use a generic placeholder + url = "" + if issue_key and metadata.get('base_url'): + url = f"{metadata.get('base_url')}/browse/{issue_key}" + + source = { + "id": document.get('id', self.source_id_counter), + "title": title, + "description": description, + "url": url, + "issue_key": issue_key, + "status": status, + "priority": priority, + "issue_type": issue_type, + "comment_count": comment_count + } + + self.source_id_counter += 1 + sources_list.append(source) + + # Create result object + result_object = { + "id": 10, # Assign a unique ID for the Jira connector + "name": "Jira Issues", + "type": "JIRA_CONNECTOR", + "sources": sources_list, + } + + return result_object, jira_chunks + async def search_linkup(self, user_query: str, user_id: str, mode: str = "standard") -> tuple: """ Search using Linkup API and return both the source information and documents diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index e0b3cd1e0..ab3bc858c 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -13,6 +13,7 @@ from app.connectors.notion_history import NotionHistoryConnector from app.connectors.github_connector import GitHubConnector from app.connectors.linear_connector import LinearConnector from app.connectors.discord_connector import DiscordConnector +from app.connectors.jira_connector import JiraConnector from slack_sdk.errors import SlackApiError import logging import asyncio @@ -1651,3 +1652,258 @@ async def index_discord_messages( ) logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True) return 0, f"Failed to index Discord messages: {str(e)}" + + +async def index_jira_issues( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str = None, + end_date: str = None, + update_last_indexed: bool = True +) -> Tuple[int, Optional[str]]: + """ + Index Jira issues and comments. + + Args: + session: Database session + connector_id: ID of the Jira connector + search_space_id: ID of the search space to store documents in + user_id: User ID + start_date: Start date for indexing (YYYY-MM-DD format) + end_date: End date for indexing (YYYY-MM-DD format) + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="jira_issues_indexing", + source="connector_indexing_task", + message=f"Starting Jira issues indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date} + ) + + try: + # Get the connector from the database + result = await session.execute( + select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id) + ) + connector = result.scalar_one_or_none() + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found", + "Connector not found", + {"error_type": "ConnectorNotFound"} + ) + return 0, f"Connector with ID {connector_id} not found" + + # Get the Jira credentials from the connector config + jira_token = connector.config.get("JIRA_PERSONAL_ACCESS_TOKEN") + jira_base_url = connector.config.get("JIRA_BASE_URL") + + if not jira_token or not jira_base_url: + await task_logger.log_task_failure( + log_entry, + f"Jira credentials not found in connector config for connector {connector_id}", + "Missing Jira credentials", + {"error_type": "MissingCredentials"} + ) + return 0, "Jira credentials not found in connector config" + + # Initialize Jira client + await task_logger.log_task_progress( + log_entry, + f"Initializing Jira client for connector {connector_id}", + {"stage": "client_initialization"} + ) + + jira_client = JiraConnector(base_url=jira_base_url, personal_access_token=jira_token) + + # Calculate date range + if start_date is None or end_date is None: + # Fall back to calculating dates based on last_indexed_at + calculated_end_date = datetime.now() + + if connector.last_indexed_at: + calculated_start_date = connector.last_indexed_at + else: + # If never indexed, go back 30 days + calculated_start_date = calculated_end_date - timedelta(days=30) + + start_date_str = calculated_start_date.strftime('%Y-%m-%d') + end_date_str = calculated_end_date.strftime('%Y-%m-%d') + else: + start_date_str = start_date + end_date_str = end_date + + await task_logger.log_task_progress( + log_entry, + f"Fetching Jira issues from {start_date_str} to {end_date_str}", + {"stage": "fetching_issues", "start_date": start_date_str, "end_date": end_date_str} + ) + + # Get issues within date range + try: + issues, error = jira_client.get_issues_by_date_range( + start_date=start_date_str, + end_date=end_date_str, + include_comments=True + ) + + if error: + logger.error(f"Failed to get Jira issues: {error}") + + # Don't treat "No issues found" as an error that should stop indexing + if "No issues found" in error: + logger.info("No issues found is not a critical error, continuing with update") + if update_last_indexed: + connector.last_indexed_at = datetime.now() + await session.commit() + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found") + + await task_logger.log_task_completion( + log_entry, + f"No Jira issues found in date range {start_date_str} to {end_date_str}", + {"indexed_count": 0} + ) + return 0, None + else: + await task_logger.log_task_failure( + log_entry, + f"Failed to get Jira issues: {error}", + "API Error", + {"error_type": "APIError"} + ) + return 0, f"Failed to get Jira issues: {error}" + + logger.info(f"Retrieved {len(issues)} issues from Jira API") + + await task_logger.log_task_progress( + log_entry, + f"Retrieved {len(issues)} issues from Jira API", + {"stage": "processing_issues", "issue_count": len(issues)} + ) + + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Error fetching Jira issues: {str(e)}", + "Fetch Error", + {"error_type": type(e).__name__} + ) + logger.error(f"Error fetching Jira issues: {str(e)}", exc_info=True) + return 0, f"Error fetching Jira issues: {str(e)}" + + # Process and index each issue + indexed_count = 0 + + for issue in issues: + try: + # Format the issue for better readability + formatted_issue = jira_client.format_issue(issue) + + # Convert to markdown + issue_markdown = jira_client.format_issue_to_markdown(formatted_issue) + + # Create document metadata + metadata = { + "issue_key": formatted_issue.get("key", ""), + "issue_title": formatted_issue.get("title", ""), + "status": formatted_issue.get("status", ""), + "priority": formatted_issue.get("priority", ""), + "issue_type": formatted_issue.get("issue_type", ""), + "project": formatted_issue.get("project", ""), + "assignee": formatted_issue.get("assignee", {}).get("display_name", "") if formatted_issue.get("assignee") else "", + "reporter": formatted_issue.get("reporter", {}).get("display_name", ""), + "created_at": formatted_issue.get("created_at", ""), + "updated_at": formatted_issue.get("updated_at", ""), + "comment_count": len(formatted_issue.get("comments", [])), + "connector_id": connector_id, + "source": "jira", + "base_url": jira_base_url + } + + # Generate content hash + content_hash = generate_content_hash(issue_markdown) + + # Check if document already exists + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + existing_doc = existing_doc_result.scalar_one_or_none() + + if existing_doc: + logger.debug(f"Document with hash {content_hash} already exists, skipping") + continue + + # Create new document + document = Document( + title=f"Jira: {formatted_issue.get('key', 'Unknown')} - {formatted_issue.get('title', 'Untitled')}", + document_type=DocumentType.JIRA_CONNECTOR, + document_metadata=metadata, + content=issue_markdown, + content_hash=content_hash, + search_space_id=search_space_id + ) + + # Generate embedding + embedding = await config.embedding_model_instance.get_embedding(issue_markdown) + document.embedding = embedding + + session.add(document) + await session.flush() # Flush to get the document ID + + # Create chunks for the document + chunks = await config.chunking_model_instance.chunk_document(issue_markdown) + + for chunk_content in chunks: + chunk_embedding = await config.embedding_model_instance.get_embedding(chunk_content) + + chunk = Chunk( + content=chunk_content, + embedding=chunk_embedding, + document_id=document.id + ) + session.add(chunk) + + indexed_count += 1 + logger.debug(f"Indexed Jira issue: {formatted_issue.get('key', 'Unknown')}") + + except Exception as e: + logger.error(f"Error processing Jira issue {issue.get('key', 'Unknown')}: {str(e)}", exc_info=True) + continue + + # Commit all changes + await session.commit() + + # Update last_indexed_at timestamp + if update_last_indexed: + connector.last_indexed_at = datetime.now() + await session.commit() + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + await task_logger.log_task_completion( + log_entry, + f"Successfully indexed {indexed_count} Jira issues", + {"indexed_count": indexed_count} + ) + + logger.info(f"Successfully indexed {indexed_count} Jira issues") + return indexed_count, None + + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to index Jira issues: {str(e)}", + str(e), + {"error_type": type(e).__name__} + ) + logger.error(f"Failed to index Jira issues: {str(e)}", exc_info=True) + return 0, f"Failed to index Jira issues: {str(e)}"