diff --git a/surfsense_backend/app/connectors/airtable_connector.py b/surfsense_backend/app/connectors/airtable_connector.py index 840b2276c..7b9209bec 100644 --- a/surfsense_backend/app/connectors/airtable_connector.py +++ b/surfsense_backend/app/connectors/airtable_connector.py @@ -294,6 +294,12 @@ class AirtableConnector: Tuple of (records, error_message) """ try: + # Validate date strings before parsing + if not start_date or start_date.lower() in ("undefined", "null", "none"): + return [], "Invalid start_date: date string is required" + if not end_date or end_date.lower() in ("undefined", "null", "none"): + return [], "Invalid end_date: date string is required" + # Parse and validate dates start_dt = isoparse(start_date) end_dt = isoparse(end_date) diff --git a/surfsense_backend/app/connectors/airtable_history.py b/surfsense_backend/app/connectors/airtable_history.py new file mode 100644 index 000000000..64f6465fe --- /dev/null +++ b/surfsense_backend/app/connectors/airtable_history.py @@ -0,0 +1,175 @@ +""" +Airtable OAuth Connector. + +Handles OAuth-based authentication and token refresh for Airtable API access. +""" + +import logging + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.connectors.airtable_connector import AirtableConnector +from app.db import SearchSourceConnector +from app.routes.airtable_add_connector_route import refresh_airtable_token +from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase +from app.utils.oauth_security import TokenEncryption + +logger = logging.getLogger(__name__) + + +class AirtableHistoryConnector: + """ + Airtable connector with OAuth support and automatic token refresh. + + This connector uses OAuth 2.0 access tokens to authenticate with the + Airtable API. It automatically refreshes expired tokens when needed. + """ + + def __init__( + self, + session: AsyncSession, + connector_id: int, + credentials: AirtableAuthCredentialsBase | None = None, + ): + """ + Initialize the AirtableHistoryConnector with auto-refresh capability. + + Args: + session: Database session for updating connector + connector_id: Connector ID for direct updates + credentials: Airtable OAuth credentials (optional, will be loaded from DB if not provided) + """ + self._session = session + self._connector_id = connector_id + self._credentials = credentials + self._airtable_connector: AirtableConnector | None = None + + async def _get_valid_token(self) -> str: + """ + Get valid Airtable access token, refreshing if needed. + + Returns: + Valid access token + + Raises: + ValueError: If credentials are missing or invalid + Exception: If token refresh fails + """ + # Load credentials from DB if not provided + if self._credentials is None: + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise ValueError(f"Connector {self._connector_id} not found") + + config_data = connector.config.copy() + + # Decrypt credentials if they are encrypted + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + try: + token_encryption = TokenEncryption(config.SECRET_KEY) + + # Decrypt sensitive fields + if config_data.get("access_token"): + config_data["access_token"] = token_encryption.decrypt_token( + config_data["access_token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + + logger.info( + f"Decrypted Airtable credentials for connector {self._connector_id}" + ) + except Exception as e: + logger.error( + f"Failed to decrypt Airtable credentials for connector {self._connector_id}: {e!s}" + ) + raise ValueError( + f"Failed to decrypt Airtable credentials: {e!s}" + ) from e + + try: + self._credentials = AirtableAuthCredentialsBase.from_dict(config_data) + except Exception as e: + raise ValueError(f"Invalid Airtable credentials: {e!s}") from e + + # Check if token is expired and refreshable + if self._credentials.is_expired and self._credentials.is_refreshable: + try: + logger.info( + f"Airtable token expired for connector {self._connector_id}, refreshing..." + ) + + # Get connector for refresh + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise RuntimeError( + f"Connector {self._connector_id} not found; cannot refresh token." + ) + + # Refresh token + connector = await refresh_airtable_token(self._session, connector) + + # Reload credentials after refresh + config_data = connector.config.copy() + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("access_token"): + config_data["access_token"] = token_encryption.decrypt_token( + config_data["access_token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + + self._credentials = AirtableAuthCredentialsBase.from_dict(config_data) + + # Invalidate cached connector so it's recreated with new token + self._airtable_connector = None + + logger.info( + f"Successfully refreshed Airtable token for connector {self._connector_id}" + ) + except Exception as e: + logger.error( + f"Failed to refresh Airtable token for connector {self._connector_id}: {e!s}" + ) + raise Exception( + f"Failed to refresh Airtable OAuth credentials: {e!s}" + ) from e + + return self._credentials.access_token + + async def _get_connector(self) -> AirtableConnector: + """ + Get or create AirtableConnector with valid token. + + Returns: + AirtableConnector instance + """ + if self._airtable_connector is None: + # Ensure we have valid credentials (this will refresh if needed) + await self._get_valid_token() + # Use the credentials object which is now guaranteed to be valid + if not self._credentials: + raise ValueError("Credentials not loaded") + self._airtable_connector = AirtableConnector(self._credentials) + return self._airtable_connector diff --git a/surfsense_backend/app/connectors/slack_history.py b/surfsense_backend/app/connectors/slack_history.py index dbf43bb24..2b36b9f96 100644 --- a/surfsense_backend/app/connectors/slack_history.py +++ b/surfsense_backend/app/connectors/slack_history.py @@ -377,7 +377,7 @@ class SlackHistory: else: raise # Re-raise to outer handler for not_in_channel or other SlackApiErrors - if not current_api_call_successful: + if not current_api_call_successful or result is None: continue # Retry the current page fetch due to handled rate limit # Process result if successful diff --git a/surfsense_backend/app/routes/airtable_add_connector_route.py b/surfsense_backend/app/routes/airtable_add_connector_route.py index 9284d89e8..c45930a83 100644 --- a/surfsense_backend/app/routes/airtable_add_connector_route.py +++ b/surfsense_backend/app/routes/airtable_add_connector_route.py @@ -371,7 +371,7 @@ async def airtable_callback( async def refresh_airtable_token( session: AsyncSession, connector: SearchSourceConnector -): +) -> SearchSourceConnector: """ Refresh the Airtable access token for a connector. @@ -401,6 +401,12 @@ async def refresh_airtable_token( status_code=500, detail="Failed to decrypt stored refresh token" ) from e + if not refresh_token: + raise HTTPException( + status_code=400, + detail="No refresh token available. Please re-authenticate.", + ) + auth_header = make_basic_auth_header( config.AIRTABLE_CLIENT_ID, config.AIRTABLE_CLIENT_SECRET ) @@ -425,8 +431,14 @@ async def refresh_airtable_token( ) if token_response.status_code != 200: + error_detail = token_response.text + try: + error_json = token_response.json() + error_detail = error_json.get("error_description", error_detail) + except Exception: + pass raise HTTPException( - status_code=400, detail="Token refresh failed: {token_response.text}" + status_code=400, detail=f"Token refresh failed: {error_detail}" ) token_json = token_response.json() @@ -468,6 +480,8 @@ async def refresh_airtable_token( ) return connector + except HTTPException: + raise except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to refresh Airtable token: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 3ea6dccc9..4d5a33b79 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -6,10 +6,8 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.config import config -from app.connectors.airtable_connector import AirtableConnector +from app.connectors.airtable_history import AirtableHistoryConnector from app.db import Document, DocumentType, SearchSourceConnectorType -from app.routes.airtable_add_connector_route import refresh_airtable_token -from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -18,7 +16,6 @@ from app.utils.document_converters import ( generate_document_summary, generate_unique_identifier_hash, ) -from app.utils.oauth_security import TokenEncryption from .base import ( calculate_date_range, @@ -85,76 +82,11 @@ async def index_airtable_records( ) return 0, f"Connector with ID {connector_id} not found" - # Create credentials from connector config - config_data = ( - connector.config.copy() - ) # Work with a copy to avoid modifying original - - # Decrypt tokens if they are encrypted (only when explicitly marked) - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted: - # Tokens are explicitly marked as encrypted, attempt decryption - if not config.SECRET_KEY: - await task_logger.log_task_failure( - log_entry, - f"SECRET_KEY not configured but tokens are marked as encrypted for connector {connector_id}", - "Missing SECRET_KEY for token decryption", - {"error_type": "MissingSecretKey"}, - ) - return 0, "SECRET_KEY not configured but tokens are marked as encrypted" - try: - token_encryption = TokenEncryption(config.SECRET_KEY) - - # Decrypt access_token - if config_data.get("access_token"): - config_data["access_token"] = token_encryption.decrypt_token( - config_data["access_token"] - ) - logger.info( - f"Decrypted Airtable access token for connector {connector_id}" - ) - - # Decrypt refresh_token if present - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - logger.info( - f"Decrypted Airtable refresh token for connector {connector_id}" - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to decrypt Airtable tokens for connector {connector_id}: {e!s}", - "Token decryption failed", - {"error_type": "TokenDecryptionError"}, - ) - return 0, f"Failed to decrypt Airtable tokens: {e!s}" - # If _token_encrypted is False or not set, treat tokens as plaintext - - try: - credentials = AirtableAuthCredentialsBase.from_dict(config_data) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Invalid Airtable credentials in connector {connector_id}", - str(e), - {"error_type": "InvalidCredentials"}, - ) - return 0, f"Invalid Airtable credentials: {e!s}" - - # Check if credentials are expired - if credentials.is_expired: - await task_logger.log_task_failure( - log_entry, - f"Airtable credentials expired for connector {connector_id}", - "Credentials expired", - {"error_type": "ExpiredCredentials"}, - ) - - connector = await refresh_airtable_token(session, connector) - - # return 0, "Airtable credentials have expired. Please re-authenticate." + # Normalize "undefined" strings to None (from frontend) + if start_date == "undefined" or start_date == "": + start_date = None + if end_date == "undefined" or end_date == "": + end_date = None # Calculate date range for indexing start_date_str, end_date_str = calculate_date_range( @@ -166,8 +98,9 @@ async def index_airtable_records( f"from {start_date_str} to {end_date_str}" ) - # Initialize Airtable connector - airtable_connector = AirtableConnector(credentials) + # Initialize Airtable history connector with auto-refresh capability + airtable_history = AirtableHistoryConnector(session, connector_id) + airtable_connector = await airtable_history._get_connector() total_processed = 0 try: @@ -459,47 +392,56 @@ async def index_airtable_records( documents_skipped += 1 continue # Skip this message and continue with others - # Update the last_indexed_at timestamp for the connector only if requested - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed( - session, connector, update_last_indexed - ) + # Accumulate total processed across all tables + total_processed += documents_indexed # Final commit for any remaining documents not yet committed in batches - logger.info( - f"Final commit: Total {documents_indexed} Airtable records processed" - ) - await session.commit() - logger.info( - "Successfully committed all Airtable document changes to database" - ) + if documents_indexed > 0: + logger.info( + f"Final commit for table {table_name}: {documents_indexed} Airtable records processed" + ) + await session.commit() + logger.info( + f"Successfully committed all Airtable document changes for table {table_name}" + ) - # Log success - await task_logger.log_task_success( - log_entry, - f"Successfully completed Airtable indexing for connector {connector_id}", - { - "events_processed": total_processed, - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "skipped_messages_count": len(skipped_messages), - }, - ) + # Update the last_indexed_at timestamp for the connector only if requested + # (after all tables in all bases are processed) + if total_processed > 0: + await update_connector_last_indexed( + session, connector, update_last_indexed + ) - logger.info( - f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped" - ) - return ( - total_processed, - None, - ) # Return None as the error message to indicate success + # Log success after processing all bases and tables + await task_logger.log_task_success( + log_entry, + f"Successfully completed Airtable indexing for connector {connector_id}", + { + "events_processed": total_processed, + "documents_indexed": total_processed, + }, + ) + + logger.info( + f"Airtable indexing completed: {total_processed} total records processed" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success except Exception as e: logger.error( f"Fetching Airtable bases for connector {connector_id} failed: {e!s}", exc_info=True, ) + await task_logger.log_task_failure( + log_entry, + f"Failed to fetch Airtable bases for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + return 0, f"Failed to fetch Airtable bases: {e!s}" except SQLAlchemyError as db_error: await session.rollback()