diff --git a/surfsense_backend/app/routes/airtable_add_connector_route.py b/surfsense_backend/app/routes/airtable_add_connector_route.py index fe359d2f3..d2d25d006 100644 --- a/surfsense_backend/app/routes/airtable_add_connector_route.py +++ b/surfsense_backend/app/routes/airtable_add_connector_route.py @@ -1,7 +1,5 @@ import base64 -import hashlib import logging -import secrets from datetime import UTC, datetime, timedelta from uuid import UUID @@ -26,7 +24,7 @@ from app.utils.connector_naming import ( check_duplicate_connector, generate_unique_connector_name, ) -from app.utils.oauth_security import OAuthStateManager, TokenEncryption +from app.utils.oauth_security import OAuthStateManager, TokenEncryption, generate_pkce_pair logger = logging.getLogger(__name__) @@ -75,28 +73,6 @@ def make_basic_auth_header(client_id: str, client_secret: str) -> str: return f"Basic {b64}" -def generate_pkce_pair() -> tuple[str, str]: - """ - Generate PKCE code verifier and code challenge. - - Returns: - Tuple of (code_verifier, code_challenge) - """ - # Generate code verifier (43-128 characters) - code_verifier = ( - base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=") - ) - - # Generate code challenge (SHA256 hash of verifier, base64url encoded) - code_challenge = ( - base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()) - .decode("utf-8") - .rstrip("=") - ) - - return code_verifier, code_challenge - - @router.get("/auth/airtable/connector/add") async def connect_airtable(space_id: int, user: User = Depends(current_active_user)): """ diff --git a/surfsense_backend/app/utils/oauth_security.py b/surfsense_backend/app/utils/oauth_security.py index 0ad9d3bd9..c39b1e9b1 100644 --- a/surfsense_backend/app/utils/oauth_security.py +++ b/surfsense_backend/app/utils/oauth_security.py @@ -29,6 +29,17 @@ def generate_code_verifier(length: int = 128) -> str: return "".join(_PKCE_RNG.choice(_PKCE_CHARS) for _ in range(length)) +def generate_pkce_pair(length: int = 128) -> tuple[str, str]: + """Generate a PKCE code_verifier and its S256 code_challenge.""" + verifier = generate_code_verifier(length) + challenge = ( + base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()) + .decode() + .rstrip("=") + ) + return verifier, challenge + + class OAuthStateManager: """Manages secure OAuth state parameters with HMAC signatures."""