feat: enhance ConfluenceHistoryConnector to support legacy API token authentication

- Added support for legacy API token authentication alongside OAuth 2.0 in ConfluenceHistoryConnector.
- Implemented logic to handle both authentication methods, ensuring backward compatibility.
- Refactored token management to accommodate legacy credentials and updated API request handling accordingly.
- Enhanced error handling for credential validation and improved logging for better traceability.
This commit is contained in:
Anish Sarkar 2026-01-06 15:04:58 +05:30
parent 0f5bf93f68
commit c7fa640594

View file

@ -12,6 +12,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.config import config
from app.connectors.confluence_connector import ConfluenceConnector
from app.db import SearchSourceConnector
from app.routes.confluence_add_connector_route import refresh_confluence_token
from app.schemas.atlassian_auth_credentials import AtlassianAuthCredentialsBase
@ -26,6 +27,7 @@ class ConfluenceHistoryConnector:
This connector uses OAuth 2.0 access tokens to authenticate with the
Confluence API. It automatically refreshes expired tokens when needed.
Also supports legacy API token authentication for backward compatibility.
"""
def __init__(
@ -48,6 +50,10 @@ class ConfluenceHistoryConnector:
self._cloud_id: str | None = None
self._base_url: str | None = None
self._http_client: httpx.AsyncClient | None = None
self._use_oauth = True
self._legacy_email: str | None = None
self._legacy_api_token: str | None = None
self._legacy_confluence_client: ConfluenceConnector | None = None
async def _get_valid_token(self) -> str:
"""
@ -74,43 +80,58 @@ class ConfluenceHistoryConnector:
config_data = connector.config.copy()
# Decrypt credentials if they are encrypted
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
# Check if using OAuth or legacy API token
is_oauth = config_data.get("_token_encrypted", False) or config_data.get("access_token")
if is_oauth:
# OAuth 2.0 authentication
# Decrypt credentials if they are encrypted
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
logger.info(
f"Decrypted Confluence credentials for connector {self._connector_id}"
)
except Exception as e:
logger.error(
f"Failed to decrypt Confluence credentials for connector {self._connector_id}: {e!s}"
)
raise ValueError(
f"Failed to decrypt Confluence credentials: {e!s}"
) from e
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
logger.info(
f"Decrypted Confluence credentials for connector {self._connector_id}"
)
self._credentials = AtlassianAuthCredentialsBase.from_dict(config_data)
# Store cloud_id and base_url for API calls (with backward compatibility for site_url)
self._cloud_id = config_data.get("cloud_id")
self._base_url = config_data.get("base_url") or config_data.get("site_url")
self._use_oauth = True
except Exception as e:
logger.error(
f"Failed to decrypt Confluence credentials for connector {self._connector_id}: {e!s}"
)
raise ValueError(
f"Failed to decrypt Confluence credentials: {e!s}"
) from e
raise ValueError(f"Invalid Confluence OAuth credentials: {e!s}") from e
else:
# Legacy API token authentication
self._legacy_email = config_data.get("CONFLUENCE_EMAIL")
self._legacy_api_token = config_data.get("CONFLUENCE_API_TOKEN")
self._base_url = config_data.get("CONFLUENCE_BASE_URL")
self._use_oauth = False
try:
self._credentials = AtlassianAuthCredentialsBase.from_dict(config_data)
# Store cloud_id and base_url for API calls (with backward compatibility for site_url)
self._cloud_id = config_data.get("cloud_id")
self._base_url = config_data.get("base_url") or config_data.get("site_url")
except Exception as e:
raise ValueError(f"Invalid Confluence credentials: {e!s}") from e
if not self._legacy_email or not self._legacy_api_token or not self._base_url:
raise ValueError("Confluence credentials not found in connector config")
# Check if token is expired and refreshable
if self._credentials.is_expired and self._credentials.is_refreshable:
# Check if token is expired and refreshable (only for OAuth)
if self._use_oauth and self._credentials.is_expired and self._credentials.is_refreshable:
try:
logger.info(
f"Confluence token expired for connector {self._connector_id}, refreshing..."
@ -167,7 +188,11 @@ class ConfluenceHistoryConnector:
f"Failed to refresh Confluence OAuth credentials: {e!s}"
) from e
return self._credentials.access_token
if self._use_oauth:
return self._credentials.access_token
else:
# For legacy auth, return empty string (not used for token-based auth)
return ""
async def _get_client(self) -> httpx.AsyncClient:
"""
@ -180,6 +205,21 @@ class ConfluenceHistoryConnector:
self._http_client = httpx.AsyncClient(timeout=30.0)
return self._http_client
async def _get_legacy_client(self) -> ConfluenceConnector:
"""
Get or create ConfluenceConnector with legacy credentials.
Returns:
ConfluenceConnector instance
"""
if self._legacy_confluence_client is None:
self._legacy_confluence_client = ConfluenceConnector(
base_url=self._base_url,
email=self._legacy_email,
api_token=self._legacy_api_token,
)
return self._legacy_confluence_client
async def _get_base_url(self) -> str:
"""
Get the base URL for Confluence API calls.
@ -187,6 +227,10 @@ class ConfluenceHistoryConnector:
Returns:
Base URL string
"""
if not self._use_oauth:
# For legacy auth, use the base_url directly
return self._base_url or ""
if not self._cloud_id:
raise ValueError("Cloud ID not available. Cannot construct API URL.")
@ -210,9 +254,22 @@ class ConfluenceHistoryConnector:
ValueError: If credentials have not been set
Exception: If the API request fails
"""
if not self._use_oauth:
# Use legacy ConfluenceConnector for API requests
client = await self._get_legacy_client()
# ConfluenceConnector uses synchronous requests, so we need to handle this differently
# For now, we'll use the legacy client's make_api_request method
# But since it's sync, we'll need to wrap it
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, client.make_api_request, endpoint, params
)
# OAuth flow
token = await self._get_valid_token()
base_url = await self._get_base_url()
client = await self._get_client()
http_client = await self._get_client()
url = f"{base_url}/wiki/api/v2/{endpoint}"
headers = {
@ -222,7 +279,7 @@ class ConfluenceHistoryConnector:
}
try:
response = await client.get(url, headers=headers, params=params)
response = await http_client.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
@ -431,6 +488,24 @@ class ConfluenceHistoryConnector:
Tuple containing (pages list with comments, error message or None)
"""
try:
if not self._use_oauth:
# Use legacy ConfluenceConnector for API requests
client = await self._get_legacy_client()
# Ensure credentials are loaded
await self._get_valid_token()
# ConfluenceConnector.get_pages_by_date_range is synchronous
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
client.get_pages_by_date_range,
start_date,
end_date,
space_ids,
include_comments,
)
# OAuth flow
all_pages = []
if space_ids:
@ -477,6 +552,8 @@ class ConfluenceHistoryConnector:
if self._http_client:
await self._http_client.aclose()
self._http_client = None
# Legacy client doesn't need explicit closing
self._legacy_confluence_client = None
async def __aenter__(self):
"""Async context manager entry."""