feat: add Atlassian OAuth support for Jira and Confluence

- Introduced a shared schema for Atlassian OAuth 2.0 credentials, accommodating both Jira and Confluence.
- Updated Jira connector routes to utilize the new AtlassianAuthCredentialsBase for handling OAuth tokens.
- Enhanced configuration to include new environment variables for Jira OAuth integration.
- Refactored token handling in Jira indexing logic to support the new shared credential structure.
This commit is contained in:
Anish Sarkar 2026-01-06 01:27:29 +05:30
parent 982b9ceb76
commit bf8c3bfcf7
4 changed files with 27 additions and 7 deletions

View file

@ -0,0 +1,87 @@
"""
Atlassian OAuth 2.0 Authentication Credentials Schema.
Shared schema for both Jira and Confluence OAuth credentials.
Both products use the same Atlassian OAuth 2.0 (3LO) flow and token structure.
"""
from datetime import UTC, datetime
from pydantic import BaseModel, field_validator
class AtlassianAuthCredentialsBase(BaseModel):
"""
Base model for Atlassian OAuth 2.0 credentials.
Used for both Jira and Confluence connectors since they share
the same Atlassian OAuth infrastructure and token structure.
"""
access_token: str
refresh_token: str | None = None
token_type: str = "Bearer"
expires_in: int | None = None
expires_at: datetime | None = None
scope: str | None = None
cloud_id: str | None = None
base_url: str | None = None
@property
def is_expired(self) -> bool:
"""Check if the credentials have expired."""
if self.expires_at is None:
return False
return self.expires_at <= datetime.now(UTC)
@property
def is_refreshable(self) -> bool:
"""Check if the credentials can be refreshed."""
return self.refresh_token is not None
def to_dict(self) -> dict:
"""Convert credentials to dictionary for storage."""
return {
"access_token": self.access_token,
"refresh_token": self.refresh_token,
"token_type": self.token_type,
"expires_in": self.expires_in,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"scope": self.scope,
"cloud_id": self.cloud_id,
"base_url": self.base_url,
}
@classmethod
def from_dict(cls, data: dict) -> "AtlassianAuthCredentialsBase":
"""Create credentials from dictionary."""
expires_at = None
if data.get("expires_at"):
expires_at = datetime.fromisoformat(data["expires_at"])
return cls(
access_token=data["access_token"],
refresh_token=data.get("refresh_token"),
token_type=data.get("token_type", "Bearer"),
expires_in=data.get("expires_in"),
expires_at=expires_at,
scope=data.get("scope"),
cloud_id=data.get("cloud_id"),
base_url=data.get("base_url"),
)
@field_validator("expires_at", mode="before")
@classmethod
def ensure_aware_utc(cls, v):
# Strings like "2025-08-26T14:46:57.367184"
if isinstance(v, str):
# add +00:00 if missing tz info
if v.endswith("Z"):
return datetime.fromisoformat(v.replace("Z", "+00:00"))
dt = datetime.fromisoformat(v)
return dt if dt.tzinfo else dt.replace(tzinfo=UTC)
# datetime objects
if isinstance(v, datetime):
return v if v.tzinfo else v.replace(tzinfo=UTC)
return v