From 38f907e65b18c040f93b65f19819eee96ea0ea80 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Tue, 13 Jan 2026 03:16:42 +0530 Subject: [PATCH] feat: Implement Electric SQL replication setup for notifications table - Added setup_electric_replication function to handle Electric SQL replication for the notifications table during app startup. - Updated alembic migration script to remove direct SQL commands for replication, now managed in app/db.py. - Refactored indexing functions in search_source_connectors_routes to support new start_date and end_date parameters for improved flexibility. - Enhanced Google Gmail indexing task to utilize new date parameters, ensuring better control over indexing periods. --- .../versions/60_add_notifications_table.py | 27 +- surfsense_backend/app/db.py | 31 +++ .../routes/search_source_connectors_routes.py | 243 ++++++++---------- .../app/tasks/celery_tasks/connector_tasks.py | 20 +- .../notifications/NotificationPopup.tsx | 6 +- 5 files changed, 151 insertions(+), 176 deletions(-) diff --git a/surfsense_backend/alembic/versions/60_add_notifications_table.py b/surfsense_backend/alembic/versions/60_add_notifications_table.py index 4eb3d5728..fe2d9359f 100644 --- a/surfsense_backend/alembic/versions/60_add_notifications_table.py +++ b/surfsense_backend/alembic/versions/60_add_notifications_table.py @@ -2,6 +2,9 @@ Revision ID: 60 Revises: 59 + +Note: Electric SQL replication setup (REPLICA IDENTITY FULL and publication) +is handled in app/db.py setup_electric_replication() which runs on app startup. """ from collections.abc import Sequence @@ -31,7 +34,7 @@ def upgrade() -> None: created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ ); - """ + """ ) # Create indexes @@ -40,28 +43,6 @@ def upgrade() -> None: op.create_index("ix_notifications_created_at", "notifications", ["created_at"]) op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"]) - # Set REPLICA IDENTITY FULL (required by Electric SQL for replication) - op.execute("ALTER TABLE notifications REPLICA IDENTITY FULL;") - - # Grant SELECT to electric user for Electric SQL replication - # This is needed because ALTER DEFAULT PRIVILEGES only applies during initial DB setup - op.execute("GRANT SELECT ON notifications TO electric;") - - # Add notifications table to Electric SQL publication for replication - # This is required for Electric SQL to sync the table - op.execute(""" - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_publication_tables - WHERE pubname = 'electric_publication_default' - AND tablename = 'notifications' - ) THEN - ALTER PUBLICATION electric_publication_default ADD TABLE notifications; - END IF; - END - $$; - """) def downgrade() -> None: """Downgrade schema - remove notifications table.""" diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index a677611c7..57495f4f9 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -895,6 +895,37 @@ async def create_db_and_tables(): await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) await conn.run_sync(Base.metadata.create_all) await setup_indexes() + await setup_electric_replication() + + +async def setup_electric_replication(): + """Set up Electric SQL replication for the notifications table.""" + async with engine.begin() as conn: + # Set REPLICA IDENTITY FULL (required by Electric SQL for replication) + # This logs full row data for UPDATE/DELETE operations in the WAL + await conn.execute(text("ALTER TABLE notifications REPLICA IDENTITY FULL;")) + + # Add notifications table to Electric SQL publication for replication + # Only add if publication exists and table not already in it + await conn.execute( + text( + """ + DO $$ + BEGIN + IF EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'electric_publication_default') THEN + IF NOT EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'notifications' + ) THEN + ALTER PUBLICATION electric_publication_default ADD TABLE notifications; + END IF; + END IF; + END + $$; + """ + ) + ) async def get_async_session() -> AsyncGenerator[AsyncSession, None]: diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index ca4e03c93..c5be0d1fd 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1227,36 +1227,27 @@ async def run_linear_indexing( start_date: str, end_date: str, ): - """Runs the Linear indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_linear_issues( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"Linear indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"Linear indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - await session.rollback() - logger.error( - f"Critical error in run_linear_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run Linear indexing. + + Args: + session: Database session + connector_id: ID of the Linear connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_linear_issues, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for discord indexing @@ -1582,35 +1573,27 @@ async def run_airtable_indexing( start_date: str, end_date: str, ): - """Runs the Airtable indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_airtable_records( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"Airtable indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"Airtable indexing successful for connector {connector_id}. Indexed {indexed_count} records." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_airtable_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run Airtable indexing. + + Args: + session: Database session + connector_id: ID of the Airtable connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_airtable_records, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for Google Calendar indexing @@ -1642,55 +1625,44 @@ async def run_google_calendar_indexing( start_date: str, end_date: str, ): - """Runs the Google Calendar indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_google_calendar_events( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"Google Calendar indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"Google Calendar indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_google_calendar_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run Google Calendar indexing. + + Args: + session: Database session + connector_id: ID of the Google Calendar connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_google_calendar_events, + update_timestamp_func=_update_connector_timestamp_by_id, + ) async def run_google_gmail_indexing_with_new_session( connector_id: int, search_space_id: int, user_id: str, - max_messages: int, - days_back: int, + start_date: str, + end_date: str, ): - """Wrapper to run Google Gmail indexing with its own database session.""" - logger.info( - f"Background task started: Indexing Google Gmail connector {connector_id} into space {search_space_id} for {max_messages} messages from the last {days_back} days" - ) + """ + Create a new session and run the Google Gmail indexing task. + This prevents session leaks by creating a dedicated session for the background task. + """ async with async_session_maker() as session: await run_google_gmail_indexing( - session, connector_id, search_space_id, user_id, max_messages, days_back + session, connector_id, search_space_id, user_id, start_date, end_date ) - logger.info( - f"Background task finished: Indexing Google Gmail connector {connector_id}" - ) async def run_google_gmail_indexing( @@ -1698,46 +1670,55 @@ async def run_google_gmail_indexing( connector_id: int, search_space_id: int, user_id: str, - max_messages: int, - days_back: int, + start_date: str, + end_date: str, ): - """Runs the Google Gmail indexing task and updates the timestamp.""" - try: - # Convert days_back to start_date string in YYYY-MM-DD format - from datetime import datetime, timedelta - - start_date_obj = datetime.now() - timedelta(days=days_back) - start_date = start_date_obj.strftime("%Y-%m-%d") - end_date = None # No end date, index up to current time + """ + Background task to run Google Gmail indexing. + Args: + session: Database session + connector_id: ID of the Google Gmail connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + # Create a wrapper function that calls index_google_gmail_messages with max_messages + async def gmail_indexing_wrapper( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None, + end_date: str | None, + update_last_indexed: bool, + ) -> tuple[int, str | None]: + # Use a reasonable default for max_messages + max_messages = 1000 indexed_count, error_message = await index_google_gmail_messages( - session, - connector_id, - search_space_id, - user_id, + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, start_date=start_date, end_date=end_date, - update_last_indexed=False, + update_last_indexed=update_last_indexed, max_messages=max_messages, ) - if error_message: - logger.error( - f"Google Gmail indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"Google Gmail indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_google_gmail_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + # index_google_gmail_messages returns (int, str) but we need (int, str | None) + return indexed_count, error_message if error_message else None + + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=gmail_indexing_wrapper, + update_timestamp_func=_update_connector_timestamp_by_id, + ) async def run_google_drive_indexing( diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 1d1cbe361..a88ac32c0 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -445,31 +445,13 @@ async def _index_google_gmail_messages( end_date: str, ): """Index Google Gmail messages with new session.""" - from datetime import datetime - from app.routes.search_source_connectors_routes import ( run_google_gmail_indexing, ) - # Parse dates to calculate days_back - max_messages = 100 - days_back = 30 # Default - - if start_date: - try: - # Parse start_date (format: YYYY-MM-DD) - start_dt = datetime.strptime(start_date, "%Y-%m-%d") - # Calculate days back from now - days_back = (datetime.now() - start_dt).days - # Ensure at least 1 day - days_back = max(1, days_back) - except ValueError: - # If parsing fails, use default - days_back = 30 - async with get_celery_session_maker()() as session: await run_google_gmail_indexing( - session, connector_id, search_space_id, user_id, max_messages, days_back + session, connector_id, search_space_id, user_id, start_date, end_date ) diff --git a/surfsense_web/components/notifications/NotificationPopup.tsx b/surfsense_web/components/notifications/NotificationPopup.tsx index 8bf6cce00..c08555453 100644 --- a/surfsense_web/components/notifications/NotificationPopup.tsx +++ b/surfsense_web/components/notifications/NotificationPopup.tsx @@ -105,18 +105,18 @@ export function NotificationPopup({

{notification.title}

-

+

{notification.message}

- + {formatTime(notification.created_at)}