refactor: move PKCE pair generatio for airtable

- Removed the `generate_pkce_pair` function from `airtable_add_connector_route.py` and relocated it to `oauth_security.py` for better organization.
- Updated imports in `airtable_add_connector_route.py` to reflect the new location of the PKCE generation function.
This commit is contained in:
Anish Sarkar 2026-04-04 03:36:54 +05:30
parent 8e6b1c77ea
commit e814540727
2 changed files with 12 additions and 25 deletions

View file

@ -1,7 +1,5 @@
import base64 import base64
import hashlib
import logging import logging
import secrets
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from uuid import UUID from uuid import UUID
@ -26,7 +24,7 @@ from app.utils.connector_naming import (
check_duplicate_connector, check_duplicate_connector,
generate_unique_connector_name, generate_unique_connector_name,
) )
from app.utils.oauth_security import OAuthStateManager, TokenEncryption from app.utils.oauth_security import OAuthStateManager, TokenEncryption, generate_pkce_pair
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -75,28 +73,6 @@ def make_basic_auth_header(client_id: str, client_secret: str) -> str:
return f"Basic {b64}" return f"Basic {b64}"
def generate_pkce_pair() -> tuple[str, str]:
"""
Generate PKCE code verifier and code challenge.
Returns:
Tuple of (code_verifier, code_challenge)
"""
# Generate code verifier (43-128 characters)
code_verifier = (
base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=")
)
# Generate code challenge (SHA256 hash of verifier, base64url encoded)
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest())
.decode("utf-8")
.rstrip("=")
)
return code_verifier, code_challenge
@router.get("/auth/airtable/connector/add") @router.get("/auth/airtable/connector/add")
async def connect_airtable(space_id: int, user: User = Depends(current_active_user)): async def connect_airtable(space_id: int, user: User = Depends(current_active_user)):
""" """

View file

@ -29,6 +29,17 @@ def generate_code_verifier(length: int = 128) -> str:
return "".join(_PKCE_RNG.choice(_PKCE_CHARS) for _ in range(length)) return "".join(_PKCE_RNG.choice(_PKCE_CHARS) for _ in range(length))
def generate_pkce_pair(length: int = 128) -> tuple[str, str]:
"""Generate a PKCE code_verifier and its S256 code_challenge."""
verifier = generate_code_verifier(length)
challenge = (
base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())
.decode()
.rstrip("=")
)
return verifier, challenge
class OAuthStateManager: class OAuthStateManager:
"""Manages secure OAuth state parameters with HMAC signatures.""" """Manages secure OAuth state parameters with HMAC signatures."""