refactor: move Linear OAuth utils to connector, use httpx.AsyncClient

This commit is contained in:
CREDO23 2026-01-07 13:13:19 +02:00
parent 4b3d427e90
commit f1a715e04e
4 changed files with 50 additions and 67 deletions

View file

@ -9,6 +9,7 @@ import logging
from datetime import datetime
from typing import Any
import httpx
import requests
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
@ -21,6 +22,53 @@ from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
LINEAR_GRAPHQL_URL = "https://api.linear.app/graphql"
ORGANIZATION_QUERY = """
query {
organization {
name
}
}
"""
async def fetch_linear_organization_name(access_token: str) -> str | None:
"""
Fetch organization/workspace name from Linear GraphQL API.
Args:
access_token: The Linear OAuth access token
Returns:
Organization name or None if fetch fails
"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
LINEAR_GRAPHQL_URL,
headers={
"Authorization": access_token,
"Content-Type": "application/json",
},
json={"query": ORGANIZATION_QUERY},
timeout=10.0,
)
if response.status_code == 200:
data = response.json()
org_name = data.get("data", {}).get("organization", {}).get("name")
if org_name:
logger.debug(f"Fetched Linear organization name: {org_name}")
return org_name
logger.warning(f"Failed to fetch Linear org info: {response.status_code}")
return None
except Exception as e:
logger.warning(f"Error fetching Linear organization name: {e!s}")
return None
class LinearConnector:
"""Class for retrieving issues and comments from Linear."""