refactor: update token decryption handling for connectors

- Enhanced token decryption logic in Airtable, Google Drive, Linear, and Notion indexers to only attempt decryption when tokens are explicitly marked as encrypted.
- Added error handling for missing SECRET_KEY when tokens are marked as encrypted, improving robustness and clarity in error reporting.
- Updated comments to clarify the handling of plaintext tokens when encryption is not indicated.
This commit is contained in:
Anish Sarkar 2026-01-03 03:43:40 +05:30
parent 645e849d93
commit ed995b0341
4 changed files with 87 additions and 64 deletions

View file

@ -90,15 +90,23 @@ async def index_airtable_records(
connector.config.copy()
) # Work with a copy to avoid modifying original
# Decrypt tokens if they are encrypted (for backward compatibility)
# Decrypt tokens if they are encrypted (only when explicitly marked)
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
if token_encrypted:
# Tokens are explicitly marked as encrypted, attempt decryption
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but tokens are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return 0, "SECRET_KEY not configured but tokens are marked as encrypted"
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt access_token
if config_data.get("access_token"):
if token_encryption.is_encrypted(config_data["access_token"]):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
@ -108,7 +116,6 @@ async def index_airtable_records(
# Decrypt refresh_token if present
if config_data.get("refresh_token"):
if token_encryption.is_encrypted(config_data["refresh_token"]):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
@ -123,6 +130,7 @@ async def index_airtable_records(
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to decrypt Airtable tokens: {e!s}"
# If _token_encrypted is False or not set, treat tokens as plaintext
try:
credentials = AirtableAuthCredentialsBase.from_dict(config_data)

View file

@ -23,7 +23,6 @@ from app.tasks.connector_indexers.base import (
update_connector_last_indexed,
)
from app.utils.document_converters import generate_unique_identifier_hash
from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
@ -89,27 +88,25 @@ async def index_google_drive_files(
{"stage": "client_initialization"},
)
# Check if credentials are encrypted and validate decryption capability
# Check if credentials are encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
# Verify we can decrypt credentials before proceeding
token_encryption = TokenEncryption(config.SECRET_KEY)
# Check if any sensitive fields exist and are encrypted
if connector.config.get("token") and token_encryption.is_encrypted(
connector.config.get("token")
):
if token_encrypted:
# Credentials are explicitly marked as encrypted, will be decrypted during client initialization
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to initialize token decryption for Google Drive connector {connector_id}: {e!s}",
"Token decryption initialization failed",
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to initialize token decryption: {e!s}"
# If _token_encrypted is False or not set, treat credentials as plaintext
drive_client = GoogleDriveClient(session, connector_id)
@ -273,27 +270,25 @@ async def index_google_drive_single_file(
{"stage": "client_initialization"},
)
# Check if credentials are encrypted and validate decryption capability
# Check if credentials are encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
# Verify we can decrypt credentials before proceeding
token_encryption = TokenEncryption(config.SECRET_KEY)
# Check if any sensitive fields exist and are encrypted
if connector.config.get("token") and token_encryption.is_encrypted(
connector.config.get("token")
):
if token_encrypted:
# Credentials are explicitly marked as encrypted, will be decrypted during client initialization
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to initialize token decryption for Google Drive connector {connector_id}: {e!s}",
"Token decryption initialization failed",
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to initialize token decryption: {e!s}"
# If _token_encrypted is False or not set, treat credentials as plaintext
drive_client = GoogleDriveClient(session, connector_id)

View file

@ -92,7 +92,10 @@ async def index_linear_issues(
)
# Get the Linear access token from the connector config
linear_access_token = connector.config.get("access_token")
# Support both new OAuth format (access_token) and old API key format (LINEAR_API_KEY)
linear_access_token = connector.config.get(
"access_token"
) or connector.config.get("LINEAR_API_KEY")
if not linear_access_token:
await task_logger.log_task_failure(
log_entry,
@ -102,15 +105,21 @@ async def index_linear_issues(
)
return 0, "Linear access token not found in connector config"
# Decrypt token if it's encrypted (for backward compatibility)
# Decrypt token if it's encrypted (only when explicitly marked)
from app.config import config
from app.utils.oauth_security import TokenEncryption
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted or (
config.SECRET_KEY
and TokenEncryption(config.SECRET_KEY).is_encrypted(linear_access_token)
):
if token_encrypted:
# Token is explicitly marked as encrypted, attempt decryption
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but token is marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return 0, "SECRET_KEY not configured but token is marked as encrypted"
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
linear_access_token = token_encryption.decrypt_token(
@ -127,6 +136,7 @@ async def index_linear_issues(
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to decrypt Linear access token: {e!s}"
# If _token_encrypted is False or not set, treat token as plaintext
# Initialize Linear client
await task_logger.log_task_progress(

View file

@ -94,8 +94,11 @@ async def index_notion_pages(
f"Connector with ID {connector_id} not found or is not a Notion connector",
)
# Get the Notion access token from the connector config (OAuth-based)
notion_token = connector.config.get("access_token")
# Get the Notion access token from the connector config
# Support both new OAuth format (access_token) and old integration token format (NOTION_INTEGRATION_TOKEN)
notion_token = connector.config.get("access_token") or connector.config.get(
"NOTION_INTEGRATION_TOKEN"
)
if not notion_token:
await task_logger.log_task_failure(
log_entry,
@ -105,12 +108,18 @@ async def index_notion_pages(
)
return 0, "Notion access token not found in connector config"
# Decrypt token if it's encrypted (for backward compatibility)
# Decrypt token if it's encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted or (
config.SECRET_KEY
and TokenEncryption(config.SECRET_KEY).is_encrypted(notion_token)
):
if token_encrypted:
# Token is explicitly marked as encrypted, attempt decryption
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but token is marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return 0, "SECRET_KEY not configured but token is marked as encrypted"
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
notion_token = token_encryption.decrypt_token(notion_token)
@ -125,6 +134,7 @@ async def index_notion_pages(
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to decrypt Notion access token: {e!s}"
# If _token_encrypted is False or not set, treat token as plaintext
# Initialize Notion client
await task_logger.log_task_progress(