mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-02 04:12:47 +02:00
merge conflicts
This commit is contained in:
commit
f2f426d5eb
16 changed files with 1418 additions and 39 deletions
454
surfsense_backend/app/connectors/linear_connector.py
Normal file
454
surfsense_backend/app/connectors/linear_connector.py
Normal file
|
|
@ -0,0 +1,454 @@
|
|||
"""
|
||||
Linear Connector Module
|
||||
|
||||
A module for retrieving issues and comments from Linear.
|
||||
Allows fetching issue lists and their comments with date range filtering.
|
||||
"""
|
||||
|
||||
import requests
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||||
|
||||
|
||||
class LinearConnector:
|
||||
"""Class for retrieving issues and comments from Linear."""
|
||||
|
||||
def __init__(self, token: str = None):
|
||||
"""
|
||||
Initialize the LinearConnector class.
|
||||
|
||||
Args:
|
||||
token: Linear API token (optional, can be set later with set_token)
|
||||
"""
|
||||
self.token = token
|
||||
self.api_url = "https://api.linear.app/graphql"
|
||||
|
||||
def set_token(self, token: str) -> None:
|
||||
"""
|
||||
Set the Linear API token.
|
||||
|
||||
Args:
|
||||
token: Linear API token
|
||||
"""
|
||||
self.token = token
|
||||
|
||||
def get_headers(self) -> Dict[str, str]:
|
||||
"""
|
||||
Get headers for Linear API requests.
|
||||
|
||||
Returns:
|
||||
Dictionary of headers
|
||||
|
||||
Raises:
|
||||
ValueError: If no Linear token has been set
|
||||
"""
|
||||
if not self.token:
|
||||
raise ValueError("Linear token not initialized. Call set_token() first.")
|
||||
|
||||
return {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': self.token
|
||||
}
|
||||
|
||||
def execute_graphql_query(self, query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a GraphQL query against the Linear API.
|
||||
|
||||
Args:
|
||||
query: GraphQL query string
|
||||
variables: Variables for the GraphQL query (optional)
|
||||
|
||||
Returns:
|
||||
Response data from the API
|
||||
|
||||
Raises:
|
||||
ValueError: If no Linear token has been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
if not self.token:
|
||||
raise ValueError("Linear token not initialized. Call set_token() first.")
|
||||
|
||||
headers = self.get_headers()
|
||||
payload = {'query': query}
|
||||
|
||||
if variables:
|
||||
payload['variables'] = variables
|
||||
|
||||
response = requests.post(
|
||||
self.api_url,
|
||||
headers=headers,
|
||||
json=payload
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
|
||||
|
||||
def get_all_issues(self, include_comments: bool = True) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Fetch all issues from Linear.
|
||||
|
||||
Args:
|
||||
include_comments: Whether to include comments in the response
|
||||
|
||||
Returns:
|
||||
List of issue objects
|
||||
|
||||
Raises:
|
||||
ValueError: If no Linear token has been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
comments_query = ""
|
||||
if include_comments:
|
||||
comments_query = """
|
||||
comments {
|
||||
nodes {
|
||||
id
|
||||
body
|
||||
user {
|
||||
id
|
||||
name
|
||||
email
|
||||
}
|
||||
createdAt
|
||||
updatedAt
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
query = f"""
|
||||
query {{
|
||||
issues {{
|
||||
nodes {{
|
||||
id
|
||||
identifier
|
||||
title
|
||||
description
|
||||
state {{
|
||||
id
|
||||
name
|
||||
type
|
||||
}}
|
||||
assignee {{
|
||||
id
|
||||
name
|
||||
email
|
||||
}}
|
||||
creator {{
|
||||
id
|
||||
name
|
||||
email
|
||||
}}
|
||||
createdAt
|
||||
updatedAt
|
||||
{comments_query}
|
||||
}}
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
|
||||
result = self.execute_graphql_query(query)
|
||||
|
||||
# Extract issues from the response
|
||||
if "data" in result and "issues" in result["data"] and "nodes" in result["data"]["issues"]:
|
||||
return result["data"]["issues"]["nodes"]
|
||||
|
||||
return []
|
||||
|
||||
def get_issues_by_date_range(
|
||||
self,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
include_comments: bool = True
|
||||
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
|
||||
"""
|
||||
Fetch issues within a date range.
|
||||
|
||||
Args:
|
||||
start_date: Start date in YYYY-MM-DD format
|
||||
end_date: End date in YYYY-MM-DD format (inclusive)
|
||||
include_comments: Whether to include comments in the response
|
||||
|
||||
Returns:
|
||||
Tuple containing (issues list, error message or None)
|
||||
"""
|
||||
# Convert date strings to ISO format
|
||||
try:
|
||||
# For Linear API: we need to use a more specific format for the filter
|
||||
# Instead of DateTime, use a string in the filter for DateTimeOrDuration
|
||||
comments_query = ""
|
||||
if include_comments:
|
||||
comments_query = """
|
||||
comments {
|
||||
nodes {
|
||||
id
|
||||
body
|
||||
user {
|
||||
id
|
||||
name
|
||||
email
|
||||
}
|
||||
createdAt
|
||||
updatedAt
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
# Query issues that were either created OR updated within the date range
|
||||
# This ensures we catch both new issues and updated existing issues
|
||||
query = f"""
|
||||
query IssuesByDateRange($after: String) {{
|
||||
issues(
|
||||
first: 100,
|
||||
after: $after,
|
||||
filter: {{
|
||||
or: [
|
||||
{{
|
||||
createdAt: {{
|
||||
gte: "{start_date}T00:00:00Z"
|
||||
lte: "{end_date}T23:59:59Z"
|
||||
}}
|
||||
}},
|
||||
{{
|
||||
updatedAt: {{
|
||||
gte: "{start_date}T00:00:00Z"
|
||||
lte: "{end_date}T23:59:59Z"
|
||||
}}
|
||||
}}
|
||||
]
|
||||
}}
|
||||
) {{
|
||||
nodes {{
|
||||
id
|
||||
identifier
|
||||
title
|
||||
description
|
||||
state {{
|
||||
id
|
||||
name
|
||||
type
|
||||
}}
|
||||
assignee {{
|
||||
id
|
||||
name
|
||||
email
|
||||
}}
|
||||
creator {{
|
||||
id
|
||||
name
|
||||
email
|
||||
}}
|
||||
createdAt
|
||||
updatedAt
|
||||
{comments_query}
|
||||
}}
|
||||
pageInfo {{
|
||||
hasNextPage
|
||||
endCursor
|
||||
}}
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
|
||||
try:
|
||||
all_issues = []
|
||||
has_next_page = True
|
||||
cursor = None
|
||||
|
||||
# Handle pagination to get all issues
|
||||
while has_next_page:
|
||||
variables = {"after": cursor} if cursor else {}
|
||||
result = self.execute_graphql_query(query, variables)
|
||||
|
||||
# Check for errors
|
||||
if "errors" in result:
|
||||
error_message = "; ".join([error.get("message", "Unknown error") for error in result["errors"]])
|
||||
return [], f"GraphQL errors: {error_message}"
|
||||
|
||||
# Extract issues from the response
|
||||
if "data" in result and "issues" in result["data"]:
|
||||
issues_page = result["data"]["issues"]
|
||||
|
||||
# Add issues from this page
|
||||
if "nodes" in issues_page:
|
||||
all_issues.extend(issues_page["nodes"])
|
||||
|
||||
# Check if there are more pages
|
||||
if "pageInfo" in issues_page:
|
||||
page_info = issues_page["pageInfo"]
|
||||
has_next_page = page_info.get("hasNextPage", False)
|
||||
cursor = page_info.get("endCursor") if has_next_page else None
|
||||
else:
|
||||
has_next_page = False
|
||||
else:
|
||||
has_next_page = False
|
||||
|
||||
if not all_issues:
|
||||
return [], "No issues found in the specified date range."
|
||||
|
||||
return all_issues, None
|
||||
|
||||
except Exception as e:
|
||||
return [], f"Error fetching issues: {str(e)}"
|
||||
|
||||
except ValueError as e:
|
||||
return [], f"Invalid date format: {str(e)}. Please use YYYY-MM-DD."
|
||||
|
||||
def format_issue(self, issue: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Format an issue for easier consumption.
|
||||
|
||||
Args:
|
||||
issue: The issue object from Linear API
|
||||
|
||||
Returns:
|
||||
Formatted issue dictionary
|
||||
"""
|
||||
# Extract basic issue details
|
||||
formatted = {
|
||||
"id": issue.get("id", ""),
|
||||
"identifier": issue.get("identifier", ""),
|
||||
"title": issue.get("title", ""),
|
||||
"description": issue.get("description", ""),
|
||||
"state": issue.get("state", {}).get("name", "Unknown") if issue.get("state") else "Unknown",
|
||||
"state_type": issue.get("state", {}).get("type", "Unknown") if issue.get("state") else "Unknown",
|
||||
"created_at": issue.get("createdAt", ""),
|
||||
"updated_at": issue.get("updatedAt", ""),
|
||||
"creator": {
|
||||
"id": issue.get("creator", {}).get("id", "") if issue.get("creator") else "",
|
||||
"name": issue.get("creator", {}).get("name", "Unknown") if issue.get("creator") else "Unknown",
|
||||
"email": issue.get("creator", {}).get("email", "") if issue.get("creator") else ""
|
||||
} if issue.get("creator") else {"id": "", "name": "Unknown", "email": ""},
|
||||
"assignee": {
|
||||
"id": issue.get("assignee", {}).get("id", ""),
|
||||
"name": issue.get("assignee", {}).get("name", "Unknown"),
|
||||
"email": issue.get("assignee", {}).get("email", "")
|
||||
} if issue.get("assignee") else None,
|
||||
"comments": []
|
||||
}
|
||||
|
||||
# Extract comments if available
|
||||
if "comments" in issue and "nodes" in issue["comments"]:
|
||||
for comment in issue["comments"]["nodes"]:
|
||||
formatted_comment = {
|
||||
"id": comment.get("id", ""),
|
||||
"body": comment.get("body", ""),
|
||||
"created_at": comment.get("createdAt", ""),
|
||||
"updated_at": comment.get("updatedAt", ""),
|
||||
"user": {
|
||||
"id": comment.get("user", {}).get("id", "") if comment.get("user") else "",
|
||||
"name": comment.get("user", {}).get("name", "Unknown") if comment.get("user") else "Unknown",
|
||||
"email": comment.get("user", {}).get("email", "") if comment.get("user") else ""
|
||||
} if comment.get("user") else {"id": "", "name": "Unknown", "email": ""}
|
||||
}
|
||||
formatted["comments"].append(formatted_comment)
|
||||
|
||||
return formatted
|
||||
|
||||
def format_issue_to_markdown(self, issue: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Convert an issue to markdown format.
|
||||
|
||||
Args:
|
||||
issue: The issue object (either raw or formatted)
|
||||
|
||||
Returns:
|
||||
Markdown string representation of the issue
|
||||
"""
|
||||
# Format the issue if it's not already formatted
|
||||
if "identifier" not in issue:
|
||||
issue = self.format_issue(issue)
|
||||
|
||||
# Build the markdown content
|
||||
markdown = f"# {issue.get('identifier', 'No ID')}: {issue.get('title', 'No Title')}\n\n"
|
||||
|
||||
if issue.get('state'):
|
||||
markdown += f"**Status:** {issue['state']}\n\n"
|
||||
|
||||
if issue.get('assignee') and issue['assignee'].get('name'):
|
||||
markdown += f"**Assignee:** {issue['assignee']['name']}\n"
|
||||
|
||||
if issue.get('creator') and issue['creator'].get('name'):
|
||||
markdown += f"**Created by:** {issue['creator']['name']}\n"
|
||||
|
||||
if issue.get('created_at'):
|
||||
created_date = self.format_date(issue['created_at'])
|
||||
markdown += f"**Created:** {created_date}\n"
|
||||
|
||||
if issue.get('updated_at'):
|
||||
updated_date = self.format_date(issue['updated_at'])
|
||||
markdown += f"**Updated:** {updated_date}\n\n"
|
||||
|
||||
if issue.get('description'):
|
||||
markdown += f"## Description\n\n{issue['description']}\n\n"
|
||||
|
||||
if issue.get('comments'):
|
||||
markdown += f"## Comments ({len(issue['comments'])})\n\n"
|
||||
|
||||
for comment in issue['comments']:
|
||||
user_name = "Unknown"
|
||||
if comment.get('user') and comment['user'].get('name'):
|
||||
user_name = comment['user']['name']
|
||||
|
||||
comment_date = "Unknown date"
|
||||
if comment.get('created_at'):
|
||||
comment_date = self.format_date(comment['created_at'])
|
||||
|
||||
markdown += f"### {user_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n"
|
||||
|
||||
return markdown
|
||||
|
||||
@staticmethod
|
||||
def format_date(iso_date: str) -> str:
|
||||
"""
|
||||
Format an ISO date string to a more readable format.
|
||||
|
||||
Args:
|
||||
iso_date: ISO format date string
|
||||
|
||||
Returns:
|
||||
Formatted date string
|
||||
"""
|
||||
if not iso_date or not isinstance(iso_date, str):
|
||||
return "Unknown date"
|
||||
|
||||
try:
|
||||
dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00'))
|
||||
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
except ValueError:
|
||||
return iso_date
|
||||
|
||||
|
||||
# Example usage (uncomment to use):
|
||||
"""
|
||||
if __name__ == "__main__":
|
||||
# Set your token here
|
||||
token = "YOUR_LINEAR_API_KEY"
|
||||
|
||||
linear = LinearConnector(token)
|
||||
|
||||
try:
|
||||
# Get all issues with comments
|
||||
issues = linear.get_all_issues()
|
||||
print(f"Retrieved {len(issues)} issues")
|
||||
|
||||
# Format and print the first issue as markdown
|
||||
if issues:
|
||||
issue_md = linear.format_issue_to_markdown(issues[0])
|
||||
print("\nSample Issue in Markdown:\n")
|
||||
print(issue_md)
|
||||
|
||||
# Get issues by date range
|
||||
start_date = "2023-01-01"
|
||||
end_date = "2023-01-31"
|
||||
date_issues, error = linear.get_issues_by_date_range(start_date, end_date)
|
||||
|
||||
if error:
|
||||
print(f"Error: {error}")
|
||||
else:
|
||||
print(f"\nRetrieved {len(date_issues)} issues from {start_date} to {end_date}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
"""
|
||||
|
|
@ -41,6 +41,7 @@ class DocumentType(str, Enum):
|
|||
NOTION_CONNECTOR = "NOTION_CONNECTOR"
|
||||
YOUTUBE_VIDEO = "YOUTUBE_VIDEO"
|
||||
GITHUB_CONNECTOR = "GITHUB_CONNECTOR"
|
||||
LINEAR_CONNECTOR = "LINEAR_CONNECTOR"
|
||||
|
||||
class SearchSourceConnectorType(str, Enum):
|
||||
SERPER_API = "SERPER_API"
|
||||
|
|
@ -48,6 +49,7 @@ class SearchSourceConnectorType(str, Enum):
|
|||
SLACK_CONNECTOR = "SLACK_CONNECTOR"
|
||||
NOTION_CONNECTOR = "NOTION_CONNECTOR"
|
||||
GITHUB_CONNECTOR = "GITHUB_CONNECTOR"
|
||||
LINEAR_CONNECTOR = "LINEAR_CONNECTOR"
|
||||
|
||||
class ChatType(str, Enum):
|
||||
GENERAL = "GENERAL"
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ PUT /search-source-connectors/{connector_id} - Update a specific connector
|
|||
DELETE /search-source-connectors/{connector_id} - Delete a specific connector
|
||||
POST /search-source-connectors/{connector_id}/index - Index content from a connector to a search space
|
||||
|
||||
Note: Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, NOTION_CONNECTOR).
|
||||
Note: Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, NOTION_CONNECTOR, GITHUB_CONNECTOR, LINEAR_CONNECTOR).
|
||||
"""
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks, Body
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
|
@ -18,10 +18,9 @@ from app.db import get_async_session, User, SearchSourceConnector, SearchSourceC
|
|||
from app.schemas import SearchSourceConnectorCreate, SearchSourceConnectorUpdate, SearchSourceConnectorRead, SearchSourceConnectorBase
|
||||
from app.users import current_active_user
|
||||
from app.utils.check_ownership import check_ownership
|
||||
from pydantic import ValidationError, BaseModel, Field
|
||||
from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos
|
||||
from app.connectors.github_connector import GitHubConnector
|
||||
from datetime import datetime, timezone
|
||||
from pydantic import ValidationError
|
||||
from app.tasks.connectors_indexing_tasks import index_slack_messages, index_notion_pages, index_github_repos, index_linear_issues
|
||||
from datetime import datetime, timezone, timedelta
|
||||
import logging
|
||||
|
||||
# Set up logging
|
||||
|
|
@ -66,7 +65,7 @@ async def create_search_source_connector(
|
|||
"""
|
||||
Create a new search source connector.
|
||||
|
||||
Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR).
|
||||
Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, etc.).
|
||||
The config must contain the appropriate keys for the connector type.
|
||||
"""
|
||||
try:
|
||||
|
|
@ -275,10 +274,10 @@ async def index_connector_content(
|
|||
Index content from a connector to a search space.
|
||||
|
||||
Currently supports:
|
||||
- SLACK_CONNECTOR: Indexes messages from all accessible Slack channels since the last indexing
|
||||
(or the last 365 days if never indexed before)
|
||||
- NOTION_CONNECTOR: Indexes pages from all accessible Notion pages since the last indexing
|
||||
(or the last 365 days if never indexed before)
|
||||
- SLACK_CONNECTOR: Indexes messages from all accessible Slack channels
|
||||
- NOTION_CONNECTOR: Indexes pages from all accessible Notion pages
|
||||
- GITHUB_CONNECTOR: Indexes code and documentation from GitHub repositories
|
||||
- LINEAR_CONNECTOR: Indexes issues and comments from Linear
|
||||
|
||||
Args:
|
||||
connector_id: ID of the connector to use
|
||||
|
|
@ -310,7 +309,7 @@ async def index_connector_content(
|
|||
today = datetime.now().date()
|
||||
if connector.last_indexed_at.date() == today:
|
||||
# If last indexed today, go back 1 day to ensure we don't miss anything
|
||||
start_date = (today - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
start_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
else:
|
||||
start_date = connector.last_indexed_at.strftime("%Y-%m-%d")
|
||||
|
||||
|
|
@ -331,7 +330,7 @@ async def index_connector_content(
|
|||
today = datetime.now().date()
|
||||
if connector.last_indexed_at.date() == today:
|
||||
# If last indexed today, go back 1 day to ensure we don't miss anything
|
||||
start_date = (today - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
start_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
else:
|
||||
start_date = connector.last_indexed_at.strftime("%Y-%m-%d")
|
||||
|
||||
|
|
@ -353,6 +352,27 @@ async def index_connector_content(
|
|||
logger.info(f"Triggering GitHub indexing for connector {connector_id} into search space {search_space_id}")
|
||||
background_tasks.add_task(run_github_indexing_with_new_session, connector_id, search_space_id)
|
||||
response_message = "GitHub indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR:
|
||||
# Determine the time range that will be indexed
|
||||
if not connector.last_indexed_at:
|
||||
start_date = "365 days ago"
|
||||
else:
|
||||
# Check if last_indexed_at is today
|
||||
today = datetime.now().date()
|
||||
if connector.last_indexed_at.date() == today:
|
||||
# If last indexed today, go back 1 day to ensure we don't miss anything
|
||||
start_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
else:
|
||||
start_date = connector.last_indexed_at.strftime("%Y-%m-%d")
|
||||
|
||||
indexing_from = start_date
|
||||
indexing_to = today_str
|
||||
|
||||
# Run indexing in background
|
||||
logger.info(f"Triggering Linear indexing for connector {connector_id} into search space {search_space_id}")
|
||||
background_tasks.add_task(run_linear_indexing_with_new_session, connector_id, search_space_id)
|
||||
response_message = "Linear indexing started in the background."
|
||||
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
|
@ -519,3 +539,37 @@ async def run_github_indexing(
|
|||
await session.rollback()
|
||||
logger.error(f"Critical error in run_github_indexing for connector {connector_id}: {e}", exc_info=True)
|
||||
# Optionally update status in DB to indicate failure
|
||||
|
||||
# Add new helper functions for Linear indexing
|
||||
async def run_linear_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
search_space_id: int
|
||||
):
|
||||
"""Wrapper to run Linear indexing with its own database session."""
|
||||
logger.info(f"Background task started: Indexing Linear connector {connector_id} into space {search_space_id}")
|
||||
async with async_session_maker() as session:
|
||||
await run_linear_indexing(session, connector_id, search_space_id)
|
||||
logger.info(f"Background task finished: Indexing Linear connector {connector_id}")
|
||||
|
||||
async def run_linear_indexing(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int
|
||||
):
|
||||
"""Runs the Linear indexing task and updates the timestamp."""
|
||||
try:
|
||||
indexed_count, error_message = await index_linear_issues(
|
||||
session, connector_id, search_space_id, update_last_indexed=False
|
||||
)
|
||||
if error_message:
|
||||
logger.error(f"Linear indexing failed for connector {connector_id}: {error_message}")
|
||||
# Optionally update status in DB to indicate failure
|
||||
else:
|
||||
logger.info(f"Linear indexing successful for connector {connector_id}. Indexed {indexed_count} documents.")
|
||||
# Update the last indexed timestamp only on success
|
||||
await update_connector_last_indexed(session, connector_id)
|
||||
await session.commit() # Commit timestamp update
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Critical error in run_linear_indexing for connector {connector_id}: {e}", exc_info=True)
|
||||
# Optionally update status in DB to indicate failure
|
||||
|
|
|
|||
|
|
@ -71,6 +71,15 @@ class SearchSourceConnectorBase(BaseModel):
|
|||
repo_full_names = config.get("repo_full_names")
|
||||
if not isinstance(repo_full_names, list) or not repo_full_names:
|
||||
raise ValueError("repo_full_names must be a non-empty list of strings")
|
||||
elif connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR:
|
||||
# For LINEAR_CONNECTOR, only allow LINEAR_API_KEY
|
||||
allowed_keys = ["LINEAR_API_KEY"]
|
||||
if set(config.keys()) != set(allowed_keys):
|
||||
raise ValueError(f"For LINEAR_CONNECTOR connector type, config must only contain these keys: {allowed_keys}")
|
||||
|
||||
# Ensure the token is not empty
|
||||
if not config.get("LINEAR_API_KEY"):
|
||||
raise ValueError("LINEAR_API_KEY cannot be empty")
|
||||
|
||||
return config
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from app.prompts import SUMMARY_PROMPT_TEMPLATE
|
|||
from app.connectors.slack_history import SlackHistory
|
||||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
from app.connectors.github_connector import GitHubConnector
|
||||
from app.connectors.linear_connector import LinearConnector
|
||||
from slack_sdk.errors import SlackApiError
|
||||
import logging
|
||||
|
||||
|
|
@ -60,8 +61,20 @@ async def index_slack_messages(
|
|||
end_date = datetime.now()
|
||||
|
||||
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
||||
|
||||
start_date = end_date - timedelta(days=365)
|
||||
if connector.last_indexed_at:
|
||||
# Convert dates to be comparable (both timezone-naive)
|
||||
last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at
|
||||
|
||||
# Check if last_indexed_at is in the future or after end_date
|
||||
if last_indexed_naive > end_date:
|
||||
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead.")
|
||||
start_date = end_date - timedelta(days=30)
|
||||
else:
|
||||
start_date = last_indexed_naive
|
||||
logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date")
|
||||
else:
|
||||
start_date = end_date - timedelta(days=30) # Use 30 days instead of 365 to catch recent issues
|
||||
logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (30 days ago) as start date")
|
||||
|
||||
# Format dates for Slack API
|
||||
start_date_str = start_date.strftime("%Y-%m-%d")
|
||||
|
|
@ -783,3 +796,280 @@ async def index_github_repos(
|
|||
|
||||
error_message = "; ".join(errors) if errors else None
|
||||
return documents_processed, error_message
|
||||
|
||||
async def index_linear_issues(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
update_last_indexed: bool = True
|
||||
) -> Tuple[int, Optional[str]]:
|
||||
"""
|
||||
Index Linear issues and comments.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Linear connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
try:
|
||||
# Get the connector
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector)
|
||||
.filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
|
||||
if not connector:
|
||||
return 0, f"Connector with ID {connector_id} not found or is not a Linear connector"
|
||||
|
||||
# Get the Linear token from the connector config
|
||||
linear_token = connector.config.get("LINEAR_API_KEY")
|
||||
if not linear_token:
|
||||
return 0, "Linear API token not found in connector config"
|
||||
|
||||
# Initialize Linear client
|
||||
linear_client = LinearConnector(token=linear_token)
|
||||
|
||||
# Calculate date range
|
||||
end_date = datetime.now()
|
||||
|
||||
# Use last_indexed_at as start date if available, otherwise use 365 days ago
|
||||
if connector.last_indexed_at:
|
||||
# Convert dates to be comparable (both timezone-naive)
|
||||
last_indexed_naive = connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at
|
||||
|
||||
# Check if last_indexed_at is in the future or after end_date
|
||||
if last_indexed_naive > end_date:
|
||||
logger.warning(f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead.")
|
||||
start_date = end_date - timedelta(days=30)
|
||||
else:
|
||||
start_date = last_indexed_naive
|
||||
logger.info(f"Using last_indexed_at ({start_date.strftime('%Y-%m-%d')}) as start date")
|
||||
else:
|
||||
start_date = end_date - timedelta(days=30) # Use 30 days instead of 365 to catch recent issues
|
||||
logger.info(f"No last_indexed_at found, using {start_date.strftime('%Y-%m-%d')} (30 days ago) as start date")
|
||||
|
||||
# Format dates for Linear API
|
||||
start_date_str = start_date.strftime("%Y-%m-%d")
|
||||
end_date_str = end_date.strftime("%Y-%m-%d")
|
||||
|
||||
logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}")
|
||||
|
||||
# Get issues within date range
|
||||
try:
|
||||
issues, error = linear_client.get_issues_by_date_range(
|
||||
start_date=start_date_str,
|
||||
end_date=end_date_str,
|
||||
include_comments=True
|
||||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Linear issues: {error}")
|
||||
|
||||
# Don't treat "No issues found" as an error that should stop indexing
|
||||
if "No issues found" in error:
|
||||
logger.info("No issues found is not a critical error, continuing with update")
|
||||
if update_last_indexed:
|
||||
connector.last_indexed_at = datetime.now()
|
||||
await session.commit()
|
||||
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
|
||||
return 0, None
|
||||
else:
|
||||
return 0, f"Failed to get Linear issues: {error}"
|
||||
|
||||
logger.info(f"Retrieved {len(issues)} issues from Linear API")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception when calling Linear API: {str(e)}", exc_info=True)
|
||||
return 0, f"Failed to get Linear issues: {str(e)}"
|
||||
|
||||
if not issues:
|
||||
logger.info("No Linear issues found for the specified date range")
|
||||
if update_last_indexed:
|
||||
connector.last_indexed_at = datetime.now()
|
||||
await session.commit()
|
||||
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at} despite no issues found")
|
||||
return 0, None # Return None instead of error message when no issues found
|
||||
|
||||
# Log issue IDs and titles for debugging
|
||||
logger.info("Issues retrieved from Linear API:")
|
||||
for idx, issue in enumerate(issues[:10]): # Log first 10 issues
|
||||
logger.info(f" {idx+1}. {issue.get('identifier', 'Unknown')} - {issue.get('title', 'Unknown')} - Created: {issue.get('createdAt', 'Unknown')} - Updated: {issue.get('updatedAt', 'Unknown')}")
|
||||
if len(issues) > 10:
|
||||
logger.info(f" ...and {len(issues) - 10} more issues")
|
||||
|
||||
# Get existing documents for this search space and connector type to prevent duplicates
|
||||
existing_docs_result = await session.execute(
|
||||
select(Document)
|
||||
.filter(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.LINEAR_CONNECTOR
|
||||
)
|
||||
)
|
||||
existing_docs = existing_docs_result.scalars().all()
|
||||
|
||||
# Create a lookup dictionary of existing documents by issue_id
|
||||
existing_docs_by_issue_id = {}
|
||||
for doc in existing_docs:
|
||||
if "issue_id" in doc.document_metadata:
|
||||
existing_docs_by_issue_id[doc.document_metadata["issue_id"]] = doc
|
||||
|
||||
logger.info(f"Found {len(existing_docs_by_issue_id)} existing Linear documents in database")
|
||||
|
||||
# Log existing document IDs for debugging
|
||||
if existing_docs_by_issue_id:
|
||||
logger.info("Existing Linear document issue IDs in database:")
|
||||
for idx, (issue_id, doc) in enumerate(list(existing_docs_by_issue_id.items())[:10]): # Log first 10
|
||||
logger.info(f" {idx+1}. {issue_id} - {doc.document_metadata.get('issue_identifier', 'Unknown')} - {doc.document_metadata.get('issue_title', 'Unknown')}")
|
||||
if len(existing_docs_by_issue_id) > 10:
|
||||
logger.info(f" ...and {len(existing_docs_by_issue_id) - 10} more existing documents")
|
||||
|
||||
# Track the number of documents indexed
|
||||
documents_indexed = 0
|
||||
documents_updated = 0
|
||||
documents_skipped = 0
|
||||
skipped_issues = []
|
||||
|
||||
# Process each issue
|
||||
for issue in issues:
|
||||
try:
|
||||
issue_id = issue.get("id")
|
||||
issue_identifier = issue.get("identifier", "")
|
||||
issue_title = issue.get("title", "")
|
||||
|
||||
if not issue_id or not issue_title:
|
||||
logger.warning(f"Skipping issue with missing ID or title: {issue_id or 'Unknown'}")
|
||||
skipped_issues.append(f"{issue_identifier or 'Unknown'} (missing data)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Format the issue first to get well-structured data
|
||||
formatted_issue = linear_client.format_issue(issue)
|
||||
|
||||
# Convert issue to markdown format
|
||||
issue_content = linear_client.format_issue_to_markdown(formatted_issue)
|
||||
|
||||
if not issue_content:
|
||||
logger.warning(f"Skipping issue with no content: {issue_identifier} - {issue_title}")
|
||||
skipped_issues.append(f"{issue_identifier} (no content)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Create a short summary for the embedding
|
||||
# This avoids using the LLM and just uses the issue data directly
|
||||
state = formatted_issue.get("state", "Unknown")
|
||||
description = formatted_issue.get("description", "")
|
||||
# Truncate description if it's too long for the summary
|
||||
if description and len(description) > 500:
|
||||
description = description[:497] + "..."
|
||||
|
||||
# Create a simple summary from the issue data
|
||||
summary_content = f"Linear Issue {issue_identifier}: {issue_title}\n\nStatus: {state}\n\n"
|
||||
if description:
|
||||
summary_content += f"Description: {description}\n\n"
|
||||
|
||||
# Add comment count
|
||||
comment_count = len(formatted_issue.get("comments", []))
|
||||
summary_content += f"Comments: {comment_count}"
|
||||
|
||||
# Generate embedding for the summary
|
||||
summary_embedding = config.embedding_model_instance.embed(summary_content)
|
||||
|
||||
# Process chunks - using the full issue content with comments
|
||||
chunks = [
|
||||
Chunk(content=chunk.text, embedding=chunk.embedding)
|
||||
for chunk in config.chunker_instance.chunk(issue_content)
|
||||
]
|
||||
|
||||
# Check if this issue already exists in our database
|
||||
existing_document = existing_docs_by_issue_id.get(issue_id)
|
||||
|
||||
if existing_document:
|
||||
# Update existing document instead of creating a new one
|
||||
logger.info(f"Updating existing document for issue {issue_identifier} - {issue_title}")
|
||||
|
||||
# Update document fields
|
||||
existing_document.title = f"Linear - {issue_identifier}: {issue_title}"
|
||||
existing_document.document_metadata = {
|
||||
"issue_id": issue_id,
|
||||
"issue_identifier": issue_identifier,
|
||||
"issue_title": issue_title,
|
||||
"state": state,
|
||||
"comment_count": comment_count,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
existing_document.content = summary_content
|
||||
existing_document.embedding = summary_embedding
|
||||
|
||||
# Delete existing chunks and add new ones
|
||||
await session.execute(
|
||||
delete(Chunk)
|
||||
.where(Chunk.document_id == existing_document.id)
|
||||
)
|
||||
|
||||
# Assign new chunks to existing document
|
||||
for chunk in chunks:
|
||||
chunk.document_id = existing_document.id
|
||||
session.add(chunk)
|
||||
|
||||
documents_updated += 1
|
||||
else:
|
||||
# Create and store new document
|
||||
logger.info(f"Creating new document for issue {issue_identifier} - {issue_title}")
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"Linear - {issue_identifier}: {issue_title}",
|
||||
document_type=DocumentType.LINEAR_CONNECTOR,
|
||||
document_metadata={
|
||||
"issue_id": issue_id,
|
||||
"issue_identifier": issue_identifier,
|
||||
"issue_title": issue_title,
|
||||
"state": state,
|
||||
"comment_count": comment_count,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
},
|
||||
content=summary_content,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
logger.info(f"Successfully indexed new issue {issue_identifier} - {issue_title}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing issue {issue.get('identifier', 'Unknown')}: {str(e)}", exc_info=True)
|
||||
skipped_issues.append(f"{issue.get('identifier', 'Unknown')} (processing error)")
|
||||
documents_skipped += 1
|
||||
continue # Skip this issue and continue with others
|
||||
|
||||
# Update the last_indexed_at timestamp for the connector only if requested
|
||||
total_processed = documents_indexed + documents_updated
|
||||
if update_last_indexed:
|
||||
connector.last_indexed_at = datetime.now()
|
||||
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
|
||||
|
||||
# Commit all changes
|
||||
await session.commit()
|
||||
logger.info(f"Successfully committed all Linear document changes to database")
|
||||
|
||||
|
||||
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_updated} updated, {documents_skipped} skipped")
|
||||
return total_processed, None # Return None as the error message to indicate success
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
logger.error(f"Database error: {str(db_error)}", exc_info=True)
|
||||
return 0, f"Database error: {str(db_error)}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
|
||||
return 0, f"Failed to index Linear issues: {str(e)}"
|
||||
|
|
|
|||
|
|
@ -270,6 +270,32 @@ async def stream_connector_search_results(
|
|||
# Add documents to collection
|
||||
all_raw_documents.extend(github_chunks)
|
||||
|
||||
# Linear Connector
|
||||
if connector == "LINEAR_CONNECTOR":
|
||||
# Send terminal message about starting search
|
||||
yield streaming_service.add_terminal_message("Starting to search for Linear issues...")
|
||||
|
||||
# Search using Linear API with reformulated query
|
||||
result_object, linear_chunks = await connector_service.search_linear(
|
||||
user_query=reformulated_query,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
top_k=TOP_K
|
||||
)
|
||||
|
||||
# Send terminal message about search results
|
||||
yield streaming_service.add_terminal_message(
|
||||
f"Found {len(result_object['sources'])} relevant results from Linear",
|
||||
"success"
|
||||
)
|
||||
|
||||
# Update sources
|
||||
all_sources.append(result_object)
|
||||
yield streaming_service.update_sources(all_sources)
|
||||
|
||||
# Add documents to collection
|
||||
all_raw_documents.extend(linear_chunks)
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -559,3 +559,87 @@ class ConnectorService:
|
|||
}
|
||||
|
||||
return result_object, github_chunks
|
||||
|
||||
async def search_linear(self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20) -> tuple:
|
||||
"""
|
||||
Search for Linear issues and comments and return both the source information and langchain documents
|
||||
|
||||
Args:
|
||||
user_query: The user's query
|
||||
user_id: The user's ID
|
||||
search_space_id: The search space ID to search in
|
||||
top_k: Maximum number of results to return
|
||||
|
||||
Returns:
|
||||
tuple: (sources_info, langchain_documents)
|
||||
"""
|
||||
linear_chunks = await self.retriever.hybrid_search(
|
||||
query_text=user_query,
|
||||
top_k=top_k,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
document_type="LINEAR_CONNECTOR"
|
||||
)
|
||||
|
||||
# Process each chunk and create sources directly without deduplication
|
||||
sources_list = []
|
||||
for i, chunk in enumerate(linear_chunks):
|
||||
# Fix for UI
|
||||
linear_chunks[i]['document']['id'] = self.source_id_counter
|
||||
|
||||
# Extract document metadata
|
||||
document = chunk.get('document', {})
|
||||
metadata = document.get('metadata', {})
|
||||
|
||||
# Extract Linear-specific metadata
|
||||
issue_identifier = metadata.get('issue_identifier', '')
|
||||
issue_title = metadata.get('issue_title', 'Untitled Issue')
|
||||
issue_state = metadata.get('state', '')
|
||||
comment_count = metadata.get('comment_count', 0)
|
||||
|
||||
# Create a more descriptive title for Linear issues
|
||||
title = f"Linear: {issue_identifier} - {issue_title}"
|
||||
if issue_state:
|
||||
title += f" ({issue_state})"
|
||||
|
||||
# Create a more descriptive description for Linear issues
|
||||
description = chunk.get('content', '')[:100]
|
||||
if len(description) == 100:
|
||||
description += "..."
|
||||
|
||||
# Add comment count info to description
|
||||
if comment_count:
|
||||
if description:
|
||||
description += f" | Comments: {comment_count}"
|
||||
else:
|
||||
description = f"Comments: {comment_count}"
|
||||
|
||||
# For URL, we could construct a URL to the Linear issue if we have the workspace info
|
||||
# For now, use a generic placeholder
|
||||
url = ""
|
||||
if issue_identifier:
|
||||
# This is a generic format, may need to be adjusted based on actual Linear workspace
|
||||
url = f"https://linear.app/issue/{issue_identifier}"
|
||||
|
||||
source = {
|
||||
"id": self.source_id_counter,
|
||||
"title": title,
|
||||
"description": description,
|
||||
"url": url,
|
||||
"issue_identifier": issue_identifier,
|
||||
"state": issue_state,
|
||||
"comment_count": comment_count
|
||||
}
|
||||
|
||||
self.source_id_counter += 1
|
||||
sources_list.append(source)
|
||||
|
||||
# Create result object
|
||||
result_object = {
|
||||
"id": 9, # Assign a unique ID for the Linear connector
|
||||
"name": "Linear Issues",
|
||||
"type": "LINEAR_CONNECTOR",
|
||||
"sources": sources_list,
|
||||
}
|
||||
|
||||
return result_object, linear_chunks
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue