From 55d0cc4d0d14c1a00f2cbd58cf6485b3ac292391 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 26 Aug 2025 15:42:42 +0200 Subject: [PATCH] Add sirtable indexer --- .../app/tasks/connector_indexers/__init__.py | 5 +- .../connector_indexers/airtable_indexer.py | 321 ++++++++++++++++++ 2 files changed, 324 insertions(+), 2 deletions(-) create mode 100644 surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 7befa5986..acc3db4c4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -19,14 +19,14 @@ Available indexers: """ # Communication platforms +# Calendar and scheduling +from .airtable_indexer import index_airtable_records from .clickup_indexer import index_clickup_tasks from .confluence_indexer import index_confluence_pages from .discord_indexer import index_discord_messages # Development platforms from .github_indexer import index_github_repos - -# Calendar and scheduling from .google_calendar_indexer import index_google_calendar_events from .google_gmail_indexer import index_google_gmail_messages from .jira_indexer import index_jira_issues @@ -39,6 +39,7 @@ from .notion_indexer import index_notion_pages from .slack_indexer import index_slack_messages __all__ = [ # noqa: RUF022 + "index_airtable_records", "index_clickup_tasks", "index_confluence_pages", "index_discord_messages", diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py new file mode 100644 index 000000000..1d5879746 --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -0,0 +1,321 @@ +""" +Airtable connector indexer. +""" + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.airtable_connector import AirtableConnector +from app.db import Document, DocumentType, SearchSourceConnectorType +from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) + +from .base import ( + calculate_date_range, + check_duplicate_document_by_hash, + get_connector_by_id, + logger, + update_connector_last_indexed, +) + + +async def index_airtable_records( + session: AsyncSession, + connector_id: int, + start_date: str | None = None, + end_date: str | None = None, + max_records: int = 2500, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index Airtable records for a given connector. + + Args: + session: Database session + connector_id: ID of the Airtable connector + start_date: Start date for filtering records (YYYY-MM-DD) + end_date: End date for filtering records (YYYY-MM-DD) + max_records: Maximum number of records to fetch per table + update_last_indexed: Whether to update the last_indexed_at timestamp + + Returns: + Tuple of (number_of_documents_processed, error_message) + """ + task_logger = TaskLoggingService(session) + log_entry = await task_logger.create_task_log( + task_name="index_airtable_records", + task_params={ + "connector_id": connector_id, + "start_date": start_date, + "end_date": end_date, + "max_records": max_records, + }, + ) + + try: + # Get the connector from the database + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.AIRTABLE_CONNECTOR + ) + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return 0, f"Connector with ID {connector_id} not found" + + # Create credentials from connector config + config_data = connector.config + try: + credentials = AirtableAuthCredentialsBase.from_dict(config_data) + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Invalid Airtable credentials in connector {connector_id}", + str(e), + {"error_type": "InvalidCredentials"}, + ) + return 0, f"Invalid Airtable credentials: {e!s}" + + # Check if credentials are expired + if credentials.is_expired: + await task_logger.log_task_failure( + log_entry, + f"Airtable credentials expired for connector {connector_id}", + "Credentials expired", + {"error_type": "ExpiredCredentials"}, + ) + return 0, "Airtable credentials have expired. Please re-authenticate." + + # Calculate date range for indexing + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + logger.info( + f"Starting Airtable indexing for connector {connector_id} " + f"from {start_date_str} to {end_date_str}" + ) + + # Initialize Airtable connector + airtable_connector = AirtableConnector(credentials) + total_processed = 0 + + try: + # Get accessible bases + logger.info(f"Fetching Airtable bases for connector {connector_id}") + bases, error = airtable_connector.get_bases() + + if error: + await task_logger.log_task_failure( + log_entry, + f"Failed to fetch Airtable bases: {error}", + "API Error", + {"error_type": "APIError"}, + ) + return 0, f"Failed to fetch Airtable bases: {error}" + + if not bases: + success_msg = "No Airtable bases found or accessible" + await task_logger.log_task_success( + log_entry, success_msg, {"bases_count": 0} + ) + return 0, success_msg + + logger.info(f"Found {len(bases)} Airtable bases to process") + + # Process each base + for base in bases: + base_id = base.get("id") + base_name = base.get("name", "Unknown Base") + + if not base_id: + logger.warning(f"Skipping base without ID: {base}") + continue + + logger.info(f"Processing base: {base_name} ({base_id})") + + # Get base schema to find tables + schema_data, schema_error = airtable_connector.get_base_schema(base_id) + + if schema_error: + logger.warning( + f"Failed to get schema for base {base_id}: {schema_error}" + ) + continue + + if not schema_data or "tables" not in schema_data: + logger.warning(f"No tables found in base {base_id}") + continue + + tables = schema_data["tables"] + logger.info(f"Found {len(tables)} tables in base {base_name}") + + # Process each table + for table in tables: + table_id = table.get("id") + table_name = table.get("name", "Unknown Table") + + if not table_id: + logger.warning(f"Skipping table without ID: {table}") + continue + + logger.info(f"Processing table: {table_name} ({table_id})") + + # Fetch records + if start_date_str and end_date_str: + # Use date filtering if available + records, records_error = ( + airtable_connector.get_records_by_date_range( + base_id=base_id, + table_id=table_id, + date_field="createdTime", + start_date=start_date_str, + end_date=end_date_str, + max_records=max_records, + ) + ) + else: + # Fetch all records + records, records_error = airtable_connector.get_all_records( + base_id=base_id, + table_id=table_id, + max_records=max_records, + ) + + if records_error: + logger.warning( + f"Failed to fetch records from table {table_name}: {records_error}" + ) + continue + + if not records: + logger.info(f"No records found in table {table_name}") + continue + + logger.info(f"Found {len(records)} records in table {table_name}") + + # Process each record + for record in records: + try: + # Generate markdown content + markdown_content = ( + airtable_connector.format_record_to_markdown( + record, f"{base_name} - {table_name}" + ) + ) + + # Generate content hash + content_hash = generate_content_hash(markdown_content) + + # Check for duplicates + existing_doc = await check_duplicate_document_by_hash( + session, content_hash + ) + if existing_doc: + logger.debug( + f"Skipping duplicate record {record.get('id')}" + ) + continue + + # Generate document summary + llm = get_user_long_context_llm(connector.user_id) + summary = await generate_document_summary( + markdown_content, llm + ) + + # Create document + document = Document( + title=f"{base_name} - {table_name} - Record {record.get('id', 'Unknown')}", + content=markdown_content, + content_hash=content_hash, + summary=summary, + document_type=DocumentType.AIRTABLE_CONNECTOR, + source_url=f"https://airtable.com/{base_id}/{table_id}", + metadata={ + "base_id": base_id, + "base_name": base_name, + "table_id": table_id, + "table_name": table_name, + "record_id": record.get("id"), + "created_time": record.get("createdTime"), + "connector_id": connector_id, + }, + user_id=connector.user_id, + ) + + session.add(document) + await session.flush() + + # Create document chunks + await create_document_chunks( + session, document, markdown_content, llm + ) + + total_processed += 1 + logger.debug( + f"Processed record {record.get('id')} from {table_name}" + ) + + except Exception as e: + logger.error( + f"Error processing record {record.get('id')}: {e!s}" + ) + continue + + # Update last indexed timestamp + if update_last_indexed: + await update_connector_last_indexed( + session, connector, update_last_indexed + ) + + await session.commit() + + success_msg = f"Successfully indexed {total_processed} Airtable records" + await task_logger.log_task_success( + log_entry, + success_msg, + { + "records_processed": total_processed, + "bases_count": len(bases), + "date_range": f"{start_date_str} to {end_date_str}", + }, + ) + + logger.info(success_msg) + return total_processed, None + + finally: + airtable_connector.close() + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Airtable indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error( + f"Database error during Airtable indexing: {db_error!s}", exc_info=True + ) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Airtable records for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Error during Airtable indexing: {e!s}", exc_info=True) + return 0, f"Failed to index Airtable records: {e!s}"