feat: enhance Airtable integration with OAuth support and date validation

- Introduced AirtableHistoryConnector to manage OAuth-based authentication and token refresh for Airtable API access.
- Added date string validation in AirtableConnector to ensure valid date inputs before processing.
- Updated indexing logic to utilize the new AirtableHistoryConnector, improving credential management and token handling.
This commit is contained in:
Anish Sarkar 2026-01-07 03:00:56 +05:30
parent ba54e1da06
commit f2724ea162
5 changed files with 247 additions and 110 deletions

View file

@ -294,6 +294,12 @@ class AirtableConnector:
Tuple of (records, error_message)
"""
try:
# Validate date strings before parsing
if not start_date or start_date.lower() in ("undefined", "null", "none"):
return [], "Invalid start_date: date string is required"
if not end_date or end_date.lower() in ("undefined", "null", "none"):
return [], "Invalid end_date: date string is required"
# Parse and validate dates
start_dt = isoparse(start_date)
end_dt = isoparse(end_date)

View file

@ -0,0 +1,175 @@
"""
Airtable OAuth Connector.
Handles OAuth-based authentication and token refresh for Airtable API access.
"""
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.config import config
from app.connectors.airtable_connector import AirtableConnector
from app.db import SearchSourceConnector
from app.routes.airtable_add_connector_route import refresh_airtable_token
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
class AirtableHistoryConnector:
"""
Airtable connector with OAuth support and automatic token refresh.
This connector uses OAuth 2.0 access tokens to authenticate with the
Airtable API. It automatically refreshes expired tokens when needed.
"""
def __init__(
self,
session: AsyncSession,
connector_id: int,
credentials: AirtableAuthCredentialsBase | None = None,
):
"""
Initialize the AirtableHistoryConnector with auto-refresh capability.
Args:
session: Database session for updating connector
connector_id: Connector ID for direct updates
credentials: Airtable OAuth credentials (optional, will be loaded from DB if not provided)
"""
self._session = session
self._connector_id = connector_id
self._credentials = credentials
self._airtable_connector: AirtableConnector | None = None
async def _get_valid_token(self) -> str:
"""
Get valid Airtable access token, refreshing if needed.
Returns:
Valid access token
Raises:
ValueError: If credentials are missing or invalid
Exception: If token refresh fails
"""
# Load credentials from DB if not provided
if self._credentials is None:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if not connector:
raise ValueError(f"Connector {self._connector_id} not found")
config_data = connector.config.copy()
# Decrypt credentials if they are encrypted
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
logger.info(
f"Decrypted Airtable credentials for connector {self._connector_id}"
)
except Exception as e:
logger.error(
f"Failed to decrypt Airtable credentials for connector {self._connector_id}: {e!s}"
)
raise ValueError(
f"Failed to decrypt Airtable credentials: {e!s}"
) from e
try:
self._credentials = AirtableAuthCredentialsBase.from_dict(config_data)
except Exception as e:
raise ValueError(f"Invalid Airtable credentials: {e!s}") from e
# Check if token is expired and refreshable
if self._credentials.is_expired and self._credentials.is_refreshable:
try:
logger.info(
f"Airtable token expired for connector {self._connector_id}, refreshing..."
)
# Get connector for refresh
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if not connector:
raise RuntimeError(
f"Connector {self._connector_id} not found; cannot refresh token."
)
# Refresh token
connector = await refresh_airtable_token(self._session, connector)
# Reload credentials after refresh
config_data = connector.config.copy()
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
token_encryption = TokenEncryption(config.SECRET_KEY)
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
self._credentials = AirtableAuthCredentialsBase.from_dict(config_data)
# Invalidate cached connector so it's recreated with new token
self._airtable_connector = None
logger.info(
f"Successfully refreshed Airtable token for connector {self._connector_id}"
)
except Exception as e:
logger.error(
f"Failed to refresh Airtable token for connector {self._connector_id}: {e!s}"
)
raise Exception(
f"Failed to refresh Airtable OAuth credentials: {e!s}"
) from e
return self._credentials.access_token
async def _get_connector(self) -> AirtableConnector:
"""
Get or create AirtableConnector with valid token.
Returns:
AirtableConnector instance
"""
if self._airtable_connector is None:
# Ensure we have valid credentials (this will refresh if needed)
await self._get_valid_token()
# Use the credentials object which is now guaranteed to be valid
if not self._credentials:
raise ValueError("Credentials not loaded")
self._airtable_connector = AirtableConnector(self._credentials)
return self._airtable_connector

View file

@ -377,7 +377,7 @@ class SlackHistory:
else:
raise # Re-raise to outer handler for not_in_channel or other SlackApiErrors
if not current_api_call_successful:
if not current_api_call_successful or result is None:
continue # Retry the current page fetch due to handled rate limit
# Process result if successful

View file

@ -371,7 +371,7 @@ async def airtable_callback(
async def refresh_airtable_token(
session: AsyncSession, connector: SearchSourceConnector
):
) -> SearchSourceConnector:
"""
Refresh the Airtable access token for a connector.
@ -401,6 +401,12 @@ async def refresh_airtable_token(
status_code=500, detail="Failed to decrypt stored refresh token"
) from e
if not refresh_token:
raise HTTPException(
status_code=400,
detail="No refresh token available. Please re-authenticate.",
)
auth_header = make_basic_auth_header(
config.AIRTABLE_CLIENT_ID, config.AIRTABLE_CLIENT_SECRET
)
@ -425,8 +431,14 @@ async def refresh_airtable_token(
)
if token_response.status_code != 200:
error_detail = token_response.text
try:
error_json = token_response.json()
error_detail = error_json.get("error_description", error_detail)
except Exception:
pass
raise HTTPException(
status_code=400, detail="Token refresh failed: {token_response.text}"
status_code=400, detail=f"Token refresh failed: {error_detail}"
)
token_json = token_response.json()
@ -468,6 +480,8 @@ async def refresh_airtable_token(
)
return connector
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to refresh Airtable token: {e!s}"

View file

@ -6,10 +6,8 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.airtable_connector import AirtableConnector
from app.connectors.airtable_history import AirtableHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.routes.airtable_add_connector_route import refresh_airtable_token
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -18,7 +16,6 @@ from app.utils.document_converters import (
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.oauth_security import TokenEncryption
from .base import (
calculate_date_range,
@ -85,76 +82,11 @@ async def index_airtable_records(
)
return 0, f"Connector with ID {connector_id} not found"
# Create credentials from connector config
config_data = (
connector.config.copy()
) # Work with a copy to avoid modifying original
# Decrypt tokens if they are encrypted (only when explicitly marked)
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted:
# Tokens are explicitly marked as encrypted, attempt decryption
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but tokens are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return 0, "SECRET_KEY not configured but tokens are marked as encrypted"
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt access_token
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
logger.info(
f"Decrypted Airtable access token for connector {connector_id}"
)
# Decrypt refresh_token if present
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
logger.info(
f"Decrypted Airtable refresh token for connector {connector_id}"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Airtable tokens for connector {connector_id}: {e!s}",
"Token decryption failed",
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to decrypt Airtable tokens: {e!s}"
# If _token_encrypted is False or not set, treat tokens as plaintext
try:
credentials = AirtableAuthCredentialsBase.from_dict(config_data)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Invalid Airtable credentials in connector {connector_id}",
str(e),
{"error_type": "InvalidCredentials"},
)
return 0, f"Invalid Airtable credentials: {e!s}"
# Check if credentials are expired
if credentials.is_expired:
await task_logger.log_task_failure(
log_entry,
f"Airtable credentials expired for connector {connector_id}",
"Credentials expired",
{"error_type": "ExpiredCredentials"},
)
connector = await refresh_airtable_token(session, connector)
# return 0, "Airtable credentials have expired. Please re-authenticate."
# Normalize "undefined" strings to None (from frontend)
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range for indexing
start_date_str, end_date_str = calculate_date_range(
@ -166,8 +98,9 @@ async def index_airtable_records(
f"from {start_date_str} to {end_date_str}"
)
# Initialize Airtable connector
airtable_connector = AirtableConnector(credentials)
# Initialize Airtable history connector with auto-refresh capability
airtable_history = AirtableHistoryConnector(session, connector_id)
airtable_connector = await airtable_history._get_connector()
total_processed = 0
try:
@ -459,47 +392,56 @@ async def index_airtable_records(
documents_skipped += 1
continue # Skip this message and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
# Accumulate total processed across all tables
total_processed += documents_indexed
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} Airtable records processed"
)
await session.commit()
logger.info(
"Successfully committed all Airtable document changes to database"
)
if documents_indexed > 0:
logger.info(
f"Final commit for table {table_name}: {documents_indexed} Airtable records processed"
)
await session.commit()
logger.info(
f"Successfully committed all Airtable document changes for table {table_name}"
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Airtable indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_messages_count": len(skipped_messages),
},
)
# Update the last_indexed_at timestamp for the connector only if requested
# (after all tables in all bases are processed)
if total_processed > 0:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
logger.info(
f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
# Log success after processing all bases and tables
await task_logger.log_task_success(
log_entry,
f"Successfully completed Airtable indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": total_processed,
},
)
logger.info(
f"Airtable indexing completed: {total_processed} total records processed"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except Exception as e:
logger.error(
f"Fetching Airtable bases for connector {connector_id} failed: {e!s}",
exc_info=True,
)
await task_logger.log_task_failure(
log_entry,
f"Failed to fetch Airtable bases for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
return 0, f"Failed to fetch Airtable bases: {e!s}"
except SQLAlchemyError as db_error:
await session.rollback()