feat: Implement Electric SQL replication setup for notifications table

- Added setup_electric_replication function to handle Electric SQL replication for the notifications table during app startup.
- Updated alembic migration script to remove direct SQL commands for replication, now managed in app/db.py.
- Refactored indexing functions in search_source_connectors_routes to support new start_date and end_date parameters for improved flexibility.
- Enhanced Google Gmail indexing task to utilize new date parameters, ensuring better control over indexing periods.
This commit is contained in:
Anish Sarkar 2026-01-13 03:16:42 +05:30
parent 44605749c5
commit 38f907e65b
5 changed files with 151 additions and 176 deletions

View file

@ -2,6 +2,9 @@
Revision ID: 60 Revision ID: 60
Revises: 59 Revises: 59
Note: Electric SQL replication setup (REPLICA IDENTITY FULL and publication)
is handled in app/db.py setup_electric_replication() which runs on app startup.
""" """
from collections.abc import Sequence from collections.abc import Sequence
@ -31,7 +34,7 @@ def upgrade() -> None:
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ updated_at TIMESTAMPTZ
); );
""" """
) )
# Create indexes # Create indexes
@ -40,28 +43,6 @@ def upgrade() -> None:
op.create_index("ix_notifications_created_at", "notifications", ["created_at"]) op.create_index("ix_notifications_created_at", "notifications", ["created_at"])
op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"]) op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"])
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
op.execute("ALTER TABLE notifications REPLICA IDENTITY FULL;")
# Grant SELECT to electric user for Electric SQL replication
# This is needed because ALTER DEFAULT PRIVILEGES only applies during initial DB setup
op.execute("GRANT SELECT ON notifications TO electric;")
# Add notifications table to Electric SQL publication for replication
# This is required for Electric SQL to sync the table
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_publication_tables
WHERE pubname = 'electric_publication_default'
AND tablename = 'notifications'
) THEN
ALTER PUBLICATION electric_publication_default ADD TABLE notifications;
END IF;
END
$$;
""")
def downgrade() -> None: def downgrade() -> None:
"""Downgrade schema - remove notifications table.""" """Downgrade schema - remove notifications table."""

View file

@ -895,6 +895,37 @@ async def create_db_and_tables():
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
await conn.run_sync(Base.metadata.create_all) await conn.run_sync(Base.metadata.create_all)
await setup_indexes() await setup_indexes()
await setup_electric_replication()
async def setup_electric_replication():
"""Set up Electric SQL replication for the notifications table."""
async with engine.begin() as conn:
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
# This logs full row data for UPDATE/DELETE operations in the WAL
await conn.execute(text("ALTER TABLE notifications REPLICA IDENTITY FULL;"))
# Add notifications table to Electric SQL publication for replication
# Only add if publication exists and table not already in it
await conn.execute(
text(
"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'electric_publication_default') THEN
IF NOT EXISTS (
SELECT 1 FROM pg_publication_tables
WHERE pubname = 'electric_publication_default'
AND tablename = 'notifications'
) THEN
ALTER PUBLICATION electric_publication_default ADD TABLE notifications;
END IF;
END IF;
END
$$;
"""
)
)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]: async def get_async_session() -> AsyncGenerator[AsyncSession, None]:

View file

@ -1227,36 +1227,27 @@ async def run_linear_indexing(
start_date: str, start_date: str,
end_date: str, end_date: str,
): ):
"""Runs the Linear indexing task and updates the timestamp.""" """
try: Background task to run Linear indexing.
indexed_count, error_message = await index_linear_issues(
session, Args:
connector_id, session: Database session
search_space_id, connector_id: ID of the Linear connector
user_id, search_space_id: ID of the search space
start_date, user_id: ID of the user
end_date, start_date: Start date for indexing
update_last_indexed=False, end_date: End date for indexing
) """
if error_message: await _run_indexing_with_notifications(
logger.error( session=session,
f"Linear indexing failed for connector {connector_id}: {error_message}" connector_id=connector_id,
) search_space_id=search_space_id,
# Optionally update status in DB to indicate failure user_id=user_id,
else: start_date=start_date,
logger.info( end_date=end_date,
f"Linear indexing successful for connector {connector_id}. Indexed {indexed_count} documents." indexing_function=index_linear_issues,
) update_timestamp_func=_update_connector_timestamp_by_id,
# Update the last indexed timestamp only on success )
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
await session.rollback()
logger.error(
f"Critical error in run_linear_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
# Add new helper functions for discord indexing # Add new helper functions for discord indexing
@ -1582,35 +1573,27 @@ async def run_airtable_indexing(
start_date: str, start_date: str,
end_date: str, end_date: str,
): ):
"""Runs the Airtable indexing task and updates the timestamp.""" """
try: Background task to run Airtable indexing.
indexed_count, error_message = await index_airtable_records(
session, Args:
connector_id, session: Database session
search_space_id, connector_id: ID of the Airtable connector
user_id, search_space_id: ID of the search space
start_date, user_id: ID of the user
end_date, start_date: Start date for indexing
update_last_indexed=False, end_date: End date for indexing
) """
if error_message: await _run_indexing_with_notifications(
logger.error( session=session,
f"Airtable indexing failed for connector {connector_id}: {error_message}" connector_id=connector_id,
) search_space_id=search_space_id,
# Optionally update status in DB to indicate failure user_id=user_id,
else: start_date=start_date,
logger.info( end_date=end_date,
f"Airtable indexing successful for connector {connector_id}. Indexed {indexed_count} records." indexing_function=index_airtable_records,
) update_timestamp_func=_update_connector_timestamp_by_id,
# Update the last indexed timestamp only on success )
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_airtable_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
# Add new helper functions for Google Calendar indexing # Add new helper functions for Google Calendar indexing
@ -1642,55 +1625,44 @@ async def run_google_calendar_indexing(
start_date: str, start_date: str,
end_date: str, end_date: str,
): ):
"""Runs the Google Calendar indexing task and updates the timestamp.""" """
try: Background task to run Google Calendar indexing.
indexed_count, error_message = await index_google_calendar_events(
session, Args:
connector_id, session: Database session
search_space_id, connector_id: ID of the Google Calendar connector
user_id, search_space_id: ID of the search space
start_date, user_id: ID of the user
end_date, start_date: Start date for indexing
update_last_indexed=False, end_date: End date for indexing
) """
if error_message: await _run_indexing_with_notifications(
logger.error( session=session,
f"Google Calendar indexing failed for connector {connector_id}: {error_message}" connector_id=connector_id,
) search_space_id=search_space_id,
# Optionally update status in DB to indicate failure user_id=user_id,
else: start_date=start_date,
logger.info( end_date=end_date,
f"Google Calendar indexing successful for connector {connector_id}. Indexed {indexed_count} documents." indexing_function=index_google_calendar_events,
) update_timestamp_func=_update_connector_timestamp_by_id,
# Update the last indexed timestamp only on success )
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_google_calendar_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
async def run_google_gmail_indexing_with_new_session( async def run_google_gmail_indexing_with_new_session(
connector_id: int, connector_id: int,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
max_messages: int, start_date: str,
days_back: int, end_date: str,
): ):
"""Wrapper to run Google Gmail indexing with its own database session.""" """
logger.info( Create a new session and run the Google Gmail indexing task.
f"Background task started: Indexing Google Gmail connector {connector_id} into space {search_space_id} for {max_messages} messages from the last {days_back} days" This prevents session leaks by creating a dedicated session for the background task.
) """
async with async_session_maker() as session: async with async_session_maker() as session:
await run_google_gmail_indexing( await run_google_gmail_indexing(
session, connector_id, search_space_id, user_id, max_messages, days_back session, connector_id, search_space_id, user_id, start_date, end_date
) )
logger.info(
f"Background task finished: Indexing Google Gmail connector {connector_id}"
)
async def run_google_gmail_indexing( async def run_google_gmail_indexing(
@ -1698,46 +1670,55 @@ async def run_google_gmail_indexing(
connector_id: int, connector_id: int,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
max_messages: int, start_date: str,
days_back: int, end_date: str,
): ):
"""Runs the Google Gmail indexing task and updates the timestamp.""" """
try: Background task to run Google Gmail indexing.
# Convert days_back to start_date string in YYYY-MM-DD format
from datetime import datetime, timedelta
start_date_obj = datetime.now() - timedelta(days=days_back)
start_date = start_date_obj.strftime("%Y-%m-%d")
end_date = None # No end date, index up to current time
Args:
session: Database session
connector_id: ID of the Google Gmail connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
# Create a wrapper function that calls index_google_gmail_messages with max_messages
async def gmail_indexing_wrapper(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None,
end_date: str | None,
update_last_indexed: bool,
) -> tuple[int, str | None]:
# Use a reasonable default for max_messages
max_messages = 1000
indexed_count, error_message = await index_google_gmail_messages( indexed_count, error_message = await index_google_gmail_messages(
session, session=session,
connector_id, connector_id=connector_id,
search_space_id, search_space_id=search_space_id,
user_id, user_id=user_id,
start_date=start_date, start_date=start_date,
end_date=end_date, end_date=end_date,
update_last_indexed=False, update_last_indexed=update_last_indexed,
max_messages=max_messages, max_messages=max_messages,
) )
if error_message: # index_google_gmail_messages returns (int, str) but we need (int, str | None)
logger.error( return indexed_count, error_message if error_message else None
f"Google Gmail indexing failed for connector {connector_id}: {error_message}"
) await _run_indexing_with_notifications(
# Optionally update status in DB to indicate failure session=session,
else: connector_id=connector_id,
logger.info( search_space_id=search_space_id,
f"Google Gmail indexing successful for connector {connector_id}. Indexed {indexed_count} documents." user_id=user_id,
) start_date=start_date,
# Update the last indexed timestamp only on success end_date=end_date,
await _update_connector_timestamp_by_id(session, connector_id) indexing_function=gmail_indexing_wrapper,
await session.commit() # Commit timestamp update update_timestamp_func=_update_connector_timestamp_by_id,
except Exception as e: )
logger.error(
f"Critical error in run_google_gmail_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
async def run_google_drive_indexing( async def run_google_drive_indexing(

View file

@ -445,31 +445,13 @@ async def _index_google_gmail_messages(
end_date: str, end_date: str,
): ):
"""Index Google Gmail messages with new session.""" """Index Google Gmail messages with new session."""
from datetime import datetime
from app.routes.search_source_connectors_routes import ( from app.routes.search_source_connectors_routes import (
run_google_gmail_indexing, run_google_gmail_indexing,
) )
# Parse dates to calculate days_back
max_messages = 100
days_back = 30 # Default
if start_date:
try:
# Parse start_date (format: YYYY-MM-DD)
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
# Calculate days back from now
days_back = (datetime.now() - start_dt).days
# Ensure at least 1 day
days_back = max(1, days_back)
except ValueError:
# If parsing fails, use default
days_back = 30
async with get_celery_session_maker()() as session: async with get_celery_session_maker()() as session:
await run_google_gmail_indexing( await run_google_gmail_indexing(
session, connector_id, search_space_id, user_id, max_messages, days_back session, connector_id, search_space_id, user_id, start_date, end_date
) )

View file

@ -105,18 +105,18 @@ export function NotificationPopup({
<div className="flex items-start justify-between gap-2 mb-1"> <div className="flex items-start justify-between gap-2 mb-1">
<p <p
className={cn( className={cn(
"text-sm font-medium truncate", "text-xs font-medium break-words",
!notification.read && "font-semibold" !notification.read && "font-semibold"
)} )}
> >
{notification.title} {notification.title}
</p> </p>
</div> </div>
<p className="text-xs text-muted-foreground line-clamp-2"> <p className="text-[11px] text-muted-foreground break-words line-clamp-2">
{notification.message} {notification.message}
</p> </p>
<div className="flex items-center justify-between mt-2"> <div className="flex items-center justify-between mt-2">
<span className="text-xs text-muted-foreground"> <span className="text-[10px] text-muted-foreground">
{formatTime(notification.created_at)} {formatTime(notification.created_at)}
</span> </span>
</div> </div>