SurfSense/surfsense_backend/app/connectors/linear_connector.py

455 lines
16 KiB
Python
Raw Normal View History

2025-04-15 23:10:35 -07:00
"""
Linear Connector Module
A module for retrieving issues and comments from Linear.
Allows fetching issue lists and their comments with date range filtering.
"""
import requests
from datetime import datetime
2025-04-15 23:10:35 -07:00
from typing import Dict, List, Optional, Tuple, Any, Union
class LinearConnector:
"""Class for retrieving issues and comments from Linear."""
def __init__(self, token: str = None):
"""
Initialize the LinearConnector class.
Args:
token: Linear API token (optional, can be set later with set_token)
"""
self.token = token
self.api_url = "https://api.linear.app/graphql"
def set_token(self, token: str) -> None:
"""
Set the Linear API token.
Args:
token: Linear API token
"""
self.token = token
def get_headers(self) -> Dict[str, str]:
"""
Get headers for Linear API requests.
Returns:
Dictionary of headers
Raises:
ValueError: If no Linear token has been set
"""
if not self.token:
raise ValueError("Linear token not initialized. Call set_token() first.")
return {
'Content-Type': 'application/json',
'Authorization': self.token
}
def execute_graphql_query(self, query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute a GraphQL query against the Linear API.
Args:
query: GraphQL query string
variables: Variables for the GraphQL query (optional)
Returns:
Response data from the API
Raises:
ValueError: If no Linear token has been set
Exception: If the API request fails
"""
if not self.token:
raise ValueError("Linear token not initialized. Call set_token() first.")
headers = self.get_headers()
payload = {'query': query}
if variables:
payload['variables'] = variables
response = requests.post(
self.api_url,
headers=headers,
json=payload
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
def get_all_issues(self, include_comments: bool = True) -> List[Dict[str, Any]]:
"""
Fetch all issues from Linear.
Args:
include_comments: Whether to include comments in the response
Returns:
List of issue objects
Raises:
ValueError: If no Linear token has been set
Exception: If the API request fails
"""
comments_query = ""
if include_comments:
comments_query = """
comments {
nodes {
id
body
user {
id
name
email
}
createdAt
updatedAt
}
}
"""
query = f"""
query {{
issues {{
nodes {{
id
identifier
title
description
state {{
id
name
type
}}
assignee {{
id
name
email
}}
creator {{
id
name
email
}}
createdAt
updatedAt
{comments_query}
}}
}}
}}
"""
result = self.execute_graphql_query(query)
# Extract issues from the response
if "data" in result and "issues" in result["data"] and "nodes" in result["data"]["issues"]:
return result["data"]["issues"]["nodes"]
return []
def get_issues_by_date_range(
self,
start_date: str,
end_date: str,
include_comments: bool = True
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""
Fetch issues within a date range.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (inclusive)
include_comments: Whether to include comments in the response
Returns:
Tuple containing (issues list, error message or None)
"""
# Convert date strings to ISO format
try:
# For Linear API: we need to use a more specific format for the filter
# Instead of DateTime, use a string in the filter for DateTimeOrDuration
comments_query = ""
if include_comments:
comments_query = """
comments {
nodes {
id
body
user {
id
name
email
}
createdAt
updatedAt
}
}
"""
# Query issues that were either created OR updated within the date range
# This ensures we catch both new issues and updated existing issues
query = f"""
query IssuesByDateRange($after: String) {{
issues(
first: 100,
after: $after,
filter: {{
or: [
{{
createdAt: {{
gte: "{start_date}T00:00:00Z"
lte: "{end_date}T23:59:59Z"
}}
}},
{{
updatedAt: {{
gte: "{start_date}T00:00:00Z"
lte: "{end_date}T23:59:59Z"
}}
}}
]
}}
) {{
nodes {{
id
identifier
title
description
state {{
id
name
type
}}
assignee {{
id
name
email
}}
creator {{
id
name
email
}}
createdAt
updatedAt
{comments_query}
}}
pageInfo {{
hasNextPage
endCursor
}}
}}
}}
"""
try:
all_issues = []
has_next_page = True
cursor = None
# Handle pagination to get all issues
while has_next_page:
variables = {"after": cursor} if cursor else {}
result = self.execute_graphql_query(query, variables)
# Check for errors
if "errors" in result:
error_message = "; ".join([error.get("message", "Unknown error") for error in result["errors"]])
return [], f"GraphQL errors: {error_message}"
# Extract issues from the response
if "data" in result and "issues" in result["data"]:
issues_page = result["data"]["issues"]
# Add issues from this page
if "nodes" in issues_page:
all_issues.extend(issues_page["nodes"])
# Check if there are more pages
if "pageInfo" in issues_page:
page_info = issues_page["pageInfo"]
has_next_page = page_info.get("hasNextPage", False)
cursor = page_info.get("endCursor") if has_next_page else None
else:
has_next_page = False
else:
has_next_page = False
if not all_issues:
return [], "No issues found in the specified date range."
return all_issues, None
except Exception as e:
return [], f"Error fetching issues: {str(e)}"
except ValueError as e:
return [], f"Invalid date format: {str(e)}. Please use YYYY-MM-DD."
def format_issue(self, issue: Dict[str, Any]) -> Dict[str, Any]:
"""
Format an issue for easier consumption.
Args:
issue: The issue object from Linear API
Returns:
Formatted issue dictionary
"""
# Extract basic issue details
formatted = {
"id": issue.get("id", ""),
"identifier": issue.get("identifier", ""),
"title": issue.get("title", ""),
"description": issue.get("description", ""),
"state": issue.get("state", {}).get("name", "Unknown") if issue.get("state") else "Unknown",
"state_type": issue.get("state", {}).get("type", "Unknown") if issue.get("state") else "Unknown",
"created_at": issue.get("createdAt", ""),
"updated_at": issue.get("updatedAt", ""),
"creator": {
"id": issue.get("creator", {}).get("id", "") if issue.get("creator") else "",
"name": issue.get("creator", {}).get("name", "Unknown") if issue.get("creator") else "Unknown",
"email": issue.get("creator", {}).get("email", "") if issue.get("creator") else ""
} if issue.get("creator") else {"id": "", "name": "Unknown", "email": ""},
"assignee": {
"id": issue.get("assignee", {}).get("id", ""),
"name": issue.get("assignee", {}).get("name", "Unknown"),
"email": issue.get("assignee", {}).get("email", "")
} if issue.get("assignee") else None,
"comments": []
}
# Extract comments if available
if "comments" in issue and "nodes" in issue["comments"]:
for comment in issue["comments"]["nodes"]:
formatted_comment = {
"id": comment.get("id", ""),
"body": comment.get("body", ""),
"created_at": comment.get("createdAt", ""),
"updated_at": comment.get("updatedAt", ""),
"user": {
"id": comment.get("user", {}).get("id", "") if comment.get("user") else "",
"name": comment.get("user", {}).get("name", "Unknown") if comment.get("user") else "Unknown",
"email": comment.get("user", {}).get("email", "") if comment.get("user") else ""
} if comment.get("user") else {"id": "", "name": "Unknown", "email": ""}
}
formatted["comments"].append(formatted_comment)
return formatted
def format_issue_to_markdown(self, issue: Dict[str, Any]) -> str:
"""
Convert an issue to markdown format.
Args:
issue: The issue object (either raw or formatted)
Returns:
Markdown string representation of the issue
"""
# Format the issue if it's not already formatted
if "identifier" not in issue:
issue = self.format_issue(issue)
# Build the markdown content
markdown = f"# {issue.get('identifier', 'No ID')}: {issue.get('title', 'No Title')}\n\n"
if issue.get('state'):
markdown += f"**Status:** {issue['state']}\n\n"
if issue.get('assignee') and issue['assignee'].get('name'):
markdown += f"**Assignee:** {issue['assignee']['name']}\n"
if issue.get('creator') and issue['creator'].get('name'):
markdown += f"**Created by:** {issue['creator']['name']}\n"
if issue.get('created_at'):
created_date = self.format_date(issue['created_at'])
markdown += f"**Created:** {created_date}\n"
if issue.get('updated_at'):
updated_date = self.format_date(issue['updated_at'])
markdown += f"**Updated:** {updated_date}\n\n"
if issue.get('description'):
markdown += f"## Description\n\n{issue['description']}\n\n"
if issue.get('comments'):
markdown += f"## Comments ({len(issue['comments'])})\n\n"
for comment in issue['comments']:
user_name = "Unknown"
if comment.get('user') and comment['user'].get('name'):
user_name = comment['user']['name']
comment_date = "Unknown date"
if comment.get('created_at'):
comment_date = self.format_date(comment['created_at'])
markdown += f"### {user_name} ({comment_date})\n\n{comment.get('body', '')}\n\n---\n\n"
return markdown
@staticmethod
def format_date(iso_date: str) -> str:
"""
Format an ISO date string to a more readable format.
Args:
iso_date: ISO format date string
Returns:
Formatted date string
"""
if not iso_date or not isinstance(iso_date, str):
return "Unknown date"
try:
dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
return iso_date
# Example usage (uncomment to use):
"""
if __name__ == "__main__":
# Set your token here
token = "YOUR_LINEAR_API_KEY"
linear = LinearConnector(token)
try:
# Get all issues with comments
issues = linear.get_all_issues()
print(f"Retrieved {len(issues)} issues")
# Format and print the first issue as markdown
if issues:
issue_md = linear.format_issue_to_markdown(issues[0])
print("\nSample Issue in Markdown:\n")
print(issue_md)
# Get issues by date range
start_date = "2023-01-01"
end_date = "2023-01-31"
date_issues, error = linear.get_issues_by_date_range(start_date, end_date)
if error:
print(f"Error: {error}")
else:
print(f"\nRetrieved {len(date_issues)} issues from {start_date} to {end_date}")
except Exception as e:
print(f"Error: {e}")
"""