diff --git a/surfsense_backend/alembic/versions/60_add_notifications_table.py b/surfsense_backend/alembic/versions/60_add_notifications_table.py
index d741e637c..4eb3d5728 100644
--- a/surfsense_backend/alembic/versions/60_add_notifications_table.py
+++ b/surfsense_backend/alembic/versions/60_add_notifications_table.py
@@ -41,12 +41,27 @@ def upgrade() -> None:
op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"])
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
- # This allows Electric SQL to track all column values for updates/deletes
op.execute("ALTER TABLE notifications REPLICA IDENTITY FULL;")
- # Note: ElectricSQL 1.x dynamically adds tables to the publication when
- # clients subscribe to shapes. No need to manually create publications.
+ # Grant SELECT to electric user for Electric SQL replication
+ # This is needed because ALTER DEFAULT PRIVILEGES only applies during initial DB setup
+ op.execute("GRANT SELECT ON notifications TO electric;")
+ # Add notifications table to Electric SQL publication for replication
+ # This is required for Electric SQL to sync the table
+ op.execute("""
+ DO $$
+ BEGIN
+ IF NOT EXISTS (
+ SELECT 1 FROM pg_publication_tables
+ WHERE pubname = 'electric_publication_default'
+ AND tablename = 'notifications'
+ ) THEN
+ ALTER PUBLICATION electric_publication_default ADD TABLE notifications;
+ END IF;
+ END
+ $$;
+ """)
def downgrade() -> None:
"""Downgrade schema - remove notifications table."""
diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py
index 06d75c7c9..ca4e03c93 100644
--- a/surfsense_backend/app/routes/search_source_connectors_routes.py
+++ b/surfsense_backend/app/routes/search_source_connectors_routes.py
@@ -30,6 +30,7 @@ from app.db import (
async_session_maker,
get_async_session,
)
+from app.services.notification_service import NotificationService
from app.schemas import (
GoogleDriveIndexRequest,
SearchSourceConnectorBase,
@@ -973,6 +974,118 @@ async def run_slack_indexing(
logger.error(f"Error in background Slack indexing task: {e!s}")
+async def _run_indexing_with_notifications(
+ session: AsyncSession,
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ start_date: str,
+ end_date: str,
+ indexing_function,
+ update_timestamp_func=None,
+):
+ """
+ Generic helper to run indexing with real-time notifications.
+
+ Args:
+ session: Database session
+ connector_id: ID of the connector
+ search_space_id: ID of the search space
+ user_id: ID of the user
+ start_date: Start date for indexing
+ end_date: End date for indexing
+ indexing_function: Async function that performs the indexing
+ update_timestamp_func: Optional function to update connector timestamp
+ """
+ from uuid import UUID
+
+ notification = None
+ try:
+ # Get connector info for notification
+ connector_result = await session.execute(
+ select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
+ )
+ connector = connector_result.scalar_one_or_none()
+
+ if connector:
+ # Create notification when indexing starts
+ notification = await NotificationService.connector_indexing.notify_indexing_started(
+ session=session,
+ user_id=UUID(user_id),
+ connector_id=connector_id,
+ connector_name=connector.name,
+ connector_type=connector.connector_type.value,
+ search_space_id=search_space_id,
+ start_date=start_date,
+ end_date=end_date,
+ )
+
+ # Run the indexing function
+ documents_processed, error_or_warning = await indexing_function(
+ session=session,
+ connector_id=connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ start_date=start_date,
+ end_date=end_date,
+ update_last_indexed=False,
+ )
+
+ # Update connector timestamp if function provided and indexing was successful
+ if documents_processed > 0 and update_timestamp_func:
+ await update_timestamp_func(session, connector_id)
+ logger.info(
+ f"Indexing completed successfully: {documents_processed} documents processed"
+ )
+
+ # Update notification on success
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_completed(
+ session=session,
+ notification=notification,
+ indexed_count=documents_processed,
+ error_message=None,
+ )
+ elif documents_processed > 0:
+ # Success but no timestamp update function
+ logger.info(
+ f"Indexing completed successfully: {documents_processed} documents processed"
+ )
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_completed(
+ session=session,
+ notification=notification,
+ indexed_count=documents_processed,
+ error_message=None,
+ )
+ else:
+ # Failure or no documents processed
+ logger.error(
+ f"Indexing failed or no documents processed: {error_or_warning}"
+ )
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_completed(
+ session=session,
+ notification=notification,
+ indexed_count=0,
+ error_message=error_or_warning or "No documents processed",
+ )
+ except Exception as e:
+ logger.error(f"Error in indexing task: {e!s}", exc_info=True)
+
+ # Update notification on exception
+ if notification:
+ try:
+ await NotificationService.connector_indexing.notify_indexing_completed(
+ session=session,
+ notification=notification,
+ indexed_count=0,
+ error_message=str(e),
+ )
+ except Exception as notif_error:
+ logger.error(f"Failed to update notification: {notif_error!s}")
+
+
async def run_notion_indexing_with_new_session(
connector_id: int,
search_space_id: int,
@@ -985,8 +1098,15 @@ async def run_notion_indexing_with_new_session(
This prevents session leaks by creating a dedicated session for the background task.
"""
async with async_session_maker() as session:
- await run_notion_indexing(
- session, connector_id, search_space_id, user_id, start_date, end_date
+ await _run_indexing_with_notifications(
+ session=session,
+ connector_id=connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ start_date=start_date,
+ end_date=end_date,
+ indexing_function=index_notion_pages,
+ update_timestamp_func=_update_connector_timestamp_by_id,
)
@@ -1009,30 +1129,16 @@ async def run_notion_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
- try:
- # Index Notion pages without updating last_indexed_at (we'll do it separately)
- documents_processed, error_or_warning = await index_notion_pages(
- session=session,
- connector_id=connector_id,
- search_space_id=search_space_id,
- user_id=user_id,
- start_date=start_date,
- end_date=end_date,
- update_last_indexed=False, # Don't update timestamp in the indexing function
- )
-
- # Only update last_indexed_at if indexing was successful (either new docs or updated docs)
- if documents_processed > 0:
- await _update_connector_timestamp_by_id(session, connector_id)
- logger.info(
- f"Notion indexing completed successfully: {documents_processed} documents processed"
- )
- else:
- logger.error(
- f"Notion indexing failed or no documents processed: {error_or_warning}"
- )
- except Exception as e:
- logger.error(f"Error in background Notion indexing task: {e!s}")
+ await _run_indexing_with_notifications(
+ session=session,
+ connector_id=connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ start_date=start_date,
+ end_date=end_date,
+ indexing_function=index_notion_pages,
+ update_timestamp_func=_update_connector_timestamp_by_id,
+ )
# Add new helper functions for GitHub indexing
diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py
index d1fb177c9..cba9a8117 100644
--- a/surfsense_backend/app/services/notification_service.py
+++ b/surfsense_backend/app/services/notification_service.py
@@ -1,18 +1,343 @@
-"""Service for creating and managing notifications."""
+"""Service for creating and managing notifications with Electric SQL sync."""
import logging
+from datetime import UTC, datetime
from typing import Any
from uuid import UUID
+from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm.attributes import flag_modified
from app.db import Notification
logger = logging.getLogger(__name__)
+class BaseNotificationHandler:
+ """Base class for notification handlers - provides common functionality."""
+
+ def __init__(self, notification_type: str):
+ """
+ Initialize the notification handler.
+
+ Args:
+ notification_type: Type of notification (e.g., 'connector_indexing', 'document_processing')
+ """
+ self.notification_type = notification_type
+
+ async def find_notification_by_operation(
+ self,
+ session: AsyncSession,
+ user_id: UUID,
+ operation_id: str,
+ search_space_id: int | None = None,
+ ) -> Notification | None:
+ """
+ Find an existing notification by operation ID.
+
+ Args:
+ session: Database session
+ user_id: User ID
+ operation_id: Unique operation identifier
+ search_space_id: Optional search space ID
+
+ Returns:
+ Notification if found, None otherwise
+ """
+ query = select(Notification).where(
+ Notification.user_id == user_id,
+ Notification.type == self.notification_type,
+ Notification.notification_metadata["operation_id"].astext == operation_id,
+ )
+ if search_space_id is not None:
+ query = query.where(Notification.search_space_id == search_space_id)
+
+ result = await session.execute(query)
+ return result.scalar_one_or_none()
+
+ async def find_or_create_notification(
+ self,
+ session: AsyncSession,
+ user_id: UUID,
+ operation_id: str,
+ title: str,
+ message: str,
+ search_space_id: int | None = None,
+ initial_metadata: dict[str, Any] | None = None,
+ ) -> Notification:
+ """
+ Find an existing notification or create a new one.
+
+ Args:
+ session: Database session
+ user_id: User ID
+ operation_id: Unique operation identifier
+ title: Notification title
+ message: Notification message
+ search_space_id: Optional search space ID
+ initial_metadata: Initial metadata dictionary
+
+ Returns:
+ Notification: The found or created notification
+ """
+ # Try to find existing notification
+ notification = await self.find_notification_by_operation(
+ session, user_id, operation_id, search_space_id
+ )
+
+ if notification:
+ # Update existing notification
+ notification.title = title
+ notification.message = message
+ if initial_metadata:
+ notification.notification_metadata = {
+ **notification.notification_metadata,
+ **initial_metadata,
+ }
+ # Mark JSONB column as modified so SQLAlchemy detects the change
+ flag_modified(notification, "notification_metadata")
+ await session.commit()
+ await session.refresh(notification)
+ logger.info(f"Updated notification {notification.id} for operation {operation_id}")
+ return notification
+
+ # Create new notification
+ metadata = initial_metadata or {}
+ metadata["operation_id"] = operation_id
+ metadata["status"] = "in_progress"
+ metadata["started_at"] = datetime.now(UTC).isoformat()
+
+ notification = Notification(
+ user_id=user_id,
+ search_space_id=search_space_id,
+ type=self.notification_type,
+ title=title,
+ message=message,
+ notification_metadata=metadata,
+ )
+ session.add(notification)
+ await session.commit()
+ await session.refresh(notification)
+ logger.info(f"Created notification {notification.id} for operation {operation_id}")
+ return notification
+
+ async def update_notification(
+ self,
+ session: AsyncSession,
+ notification: Notification,
+ title: str | None = None,
+ message: str | None = None,
+ status: str | None = None,
+ metadata_updates: dict[str, Any] | None = None,
+ ) -> Notification:
+ """
+ Update an existing notification.
+
+ Args:
+ session: Database session
+ notification: Notification to update
+ title: New title (optional)
+ message: New message (optional)
+ status: New status (optional)
+ metadata_updates: Additional metadata to merge (optional)
+
+ Returns:
+ Updated notification
+ """
+ if title is not None:
+ notification.title = title
+ if message is not None:
+ notification.message = message
+
+ if status is not None:
+ notification.notification_metadata["status"] = status
+ if status in ("completed", "failed"):
+ notification.notification_metadata["completed_at"] = (
+ datetime.now(UTC).isoformat()
+ )
+ # Mark JSONB column as modified so SQLAlchemy detects the change
+ flag_modified(notification, "notification_metadata")
+
+ if metadata_updates:
+ notification.notification_metadata = {
+ **notification.notification_metadata,
+ **metadata_updates,
+ }
+ # Mark JSONB column as modified
+ flag_modified(notification, "notification_metadata")
+
+ await session.commit()
+ await session.refresh(notification)
+ logger.info(f"Updated notification {notification.id}")
+ return notification
+
+
+class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
+ """Handler for connector indexing notifications."""
+
+ def __init__(self):
+ super().__init__("connector_indexing")
+
+ def _generate_operation_id(
+ self, connector_id: int, start_date: str | None = None, end_date: str | None = None
+ ) -> str:
+ """
+ Generate a unique operation ID for a connector indexing operation.
+
+ Args:
+ connector_id: Connector ID
+ start_date: Start date (optional)
+ end_date: End date (optional)
+
+ Returns:
+ Unique operation ID string
+ """
+ timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
+ date_range = ""
+ if start_date or end_date:
+ date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
+ return f"connector_{connector_id}_{timestamp}{date_range}"
+
+ async def notify_indexing_started(
+ self,
+ session: AsyncSession,
+ user_id: UUID,
+ connector_id: int,
+ connector_name: str,
+ connector_type: str,
+ search_space_id: int,
+ start_date: str | None = None,
+ end_date: str | None = None,
+ ) -> Notification:
+ """
+ Create or update notification when connector indexing starts.
+
+ Args:
+ session: Database session
+ user_id: User ID
+ connector_id: Connector ID
+ connector_name: Connector name
+ connector_type: Connector type
+ search_space_id: Search space ID
+ start_date: Start date for indexing
+ end_date: End date for indexing
+
+ Returns:
+ Notification: The created or updated notification
+ """
+ operation_id = self._generate_operation_id(connector_id, start_date, end_date)
+ title = f"Indexing: {connector_name}"
+ message = f'Indexing "{connector_name}" in progress...'
+
+ metadata = {
+ "connector_id": connector_id,
+ "connector_name": connector_name,
+ "connector_type": connector_type,
+ "start_date": start_date,
+ "end_date": end_date,
+ "indexed_count": 0,
+ }
+
+ return await self.find_or_create_notification(
+ session=session,
+ user_id=user_id,
+ operation_id=operation_id,
+ title=title,
+ message=message,
+ search_space_id=search_space_id,
+ initial_metadata=metadata,
+ )
+
+ async def notify_indexing_progress(
+ self,
+ session: AsyncSession,
+ notification: Notification,
+ indexed_count: int,
+ total_count: int | None = None,
+ ) -> Notification:
+ """
+ Update notification with indexing progress.
+
+ Args:
+ session: Database session
+ notification: Notification to update
+ indexed_count: Number of items indexed so far
+ total_count: Total number of items (optional)
+
+ Returns:
+ Updated notification
+ """
+ connector_name = notification.notification_metadata.get("connector_name", "Connector")
+ progress_msg = f'Indexing "{connector_name}": {indexed_count} items'
+ if total_count is not None:
+ progress_msg += f" of {total_count}"
+ progress_msg += " indexed..."
+
+ metadata_updates = {"indexed_count": indexed_count}
+ if total_count is not None:
+ metadata_updates["total_count"] = total_count
+ progress_percent = int((indexed_count / total_count) * 100)
+ metadata_updates["progress_percent"] = progress_percent
+
+ return await self.update_notification(
+ session=session,
+ notification=notification,
+ message=progress_msg,
+ status="in_progress",
+ metadata_updates=metadata_updates,
+ )
+
+ async def notify_indexing_completed(
+ self,
+ session: AsyncSession,
+ notification: Notification,
+ indexed_count: int,
+ error_message: str | None = None,
+ ) -> Notification:
+ """
+ Update notification when connector indexing completes.
+
+ Args:
+ session: Database session
+ notification: Notification to update
+ indexed_count: Total number of items indexed
+ error_message: Error message if indexing failed (optional)
+
+ Returns:
+ Updated notification
+ """
+ connector_name = notification.notification_metadata.get("connector_name", "Connector")
+
+ if error_message:
+ title = f"Indexing failed: {connector_name}"
+ message = f'Indexing "{connector_name}" failed: {error_message}'
+ status = "failed"
+ else:
+ title = f"Indexing completed: {connector_name}"
+ message = f'Indexing "{connector_name}" completed successfully. {indexed_count} items indexed.'
+ status = "completed"
+
+ metadata_updates = {
+ "indexed_count": indexed_count,
+ "error_message": error_message,
+ }
+
+ return await self.update_notification(
+ session=session,
+ notification=notification,
+ title=title,
+ message=message,
+ status=status,
+ metadata_updates=metadata_updates,
+ )
+
+
class NotificationService:
- """Service for creating notifications that sync via Electric SQL."""
+ """Service for creating and managing notifications that sync via Electric SQL."""
+
+ # Handler instances
+ connector_indexing = ConnectorIndexingNotificationHandler()
@staticmethod
async def create_notification(
@@ -105,6 +430,7 @@ class NotificationService:
) -> Notification:
"""
Create notification when connector indexing completes.
+ DEPRECATED: Use NotificationService.connector_indexing methods instead.
Args:
session: Database session
diff --git a/surfsense_web/components/layout/ui/header/Header.tsx b/surfsense_web/components/layout/ui/header/Header.tsx
index a03761ef5..34363520a 100644
--- a/surfsense_web/components/layout/ui/header/Header.tsx
+++ b/surfsense_web/components/layout/ui/header/Header.tsx
@@ -3,6 +3,7 @@
import { Moon, Sun } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
+import { NotificationButton } from "@/components/notifications/NotificationButton";
interface HeaderProps {
breadcrumb?: React.ReactNode;
@@ -29,6 +30,9 @@ export function Header({
{/* Right side - Actions */}
+ {/* Notifications */}
+
+
{/* Theme toggle */}
{onToggleTheme && (
diff --git a/surfsense_web/components/notifications/NotificationButton.tsx b/surfsense_web/components/notifications/NotificationButton.tsx
new file mode 100644
index 000000000..ae6534394
--- /dev/null
+++ b/surfsense_web/components/notifications/NotificationButton.tsx
@@ -0,0 +1,53 @@
+"use client";
+
+import { Bell } from "lucide-react";
+import { Button } from "@/components/ui/button";
+import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover";
+import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
+import { useNotifications } from "@/hooks/use-notifications";
+import { useAtomValue } from "jotai";
+import { currentUserAtom } from "@/atoms/user/user-query.atoms";
+import { NotificationPopup } from "./NotificationPopup";
+import { cn } from "@/lib/utils";
+
+export function NotificationButton() {
+ const { data: user } = useAtomValue(currentUserAtom);
+ const userId = user?.id ? String(user.id) : null;
+ const { notifications, unreadCount, loading, markAsRead, markAllAsRead } = useNotifications(userId);
+
+ return (
+
+
+
+
+
+
+
+ Notifications
+
+
+
+
+
+ );
+}
+
diff --git a/surfsense_web/components/notifications/NotificationPopup.tsx b/surfsense_web/components/notifications/NotificationPopup.tsx
new file mode 100644
index 000000000..562aee4d1
--- /dev/null
+++ b/surfsense_web/components/notifications/NotificationPopup.tsx
@@ -0,0 +1,175 @@
+"use client";
+
+import { Bell, Check, CheckCheck, Loader2, AlertCircle, CheckCircle2 } from "lucide-react";
+import { Button } from "@/components/ui/button";
+import { ScrollArea } from "@/components/ui/scroll-area";
+import { Separator } from "@/components/ui/separator";
+import { Badge } from "@/components/ui/badge";
+import type { Notification } from "@/hooks/use-notifications";
+import { formatDistanceToNow } from "date-fns";
+import { cn } from "@/lib/utils";
+
+interface NotificationPopupProps {
+ notifications: Notification[];
+ unreadCount: number;
+ loading: boolean;
+ markAsRead: (id: number) => Promise;
+ markAllAsRead: () => Promise;
+}
+
+export function NotificationPopup({
+ notifications,
+ unreadCount,
+ loading,
+ markAsRead,
+ markAllAsRead,
+}: NotificationPopupProps) {
+ const handleMarkAsRead = async (id: number) => {
+ await markAsRead(id);
+ };
+
+ const handleMarkAllAsRead = async () => {
+ await markAllAsRead();
+ };
+
+ const formatTime = (dateString: string) => {
+ try {
+ return formatDistanceToNow(new Date(dateString), { addSuffix: true });
+ } catch {
+ return "Recently";
+ }
+ };
+
+ const getStatusBadge = (notification: Notification) => {
+ const status = notification.metadata?.status as string | undefined;
+ if (!status) return null;
+
+ switch (status) {
+ case "in_progress":
+ return (
+
+
+ In Progress
+
+ );
+ case "completed":
+ return (
+
+
+ Completed
+
+ );
+ case "failed":
+ return (
+
+
+ Failed
+
+ );
+ default:
+ return null;
+ }
+ };
+
+ return (
+
+ {/* Header */}
+
+
+
+
Notifications
+ {unreadCount > 0 && (
+
+ {unreadCount > 99 ? "99+" : unreadCount}
+
+ )}
+
+ {unreadCount > 0 && (
+
+ )}
+
+
+ {/* Notifications List */}
+
+ {loading ? (
+
+
+
+ ) : notifications.length === 0 ? (
+
+ ) : (
+
+ {notifications.map((notification, index) => (
+
+
+ {index < notifications.length - 1 &&
}
+
+ ))}
+
+ )}
+
+
+ );
+}
+
diff --git a/surfsense_web/hooks/use-notifications.ts b/surfsense_web/hooks/use-notifications.ts
index 01280c8ff..d629503e9 100644
--- a/surfsense_web/hooks/use-notifications.ts
+++ b/surfsense_web/hooks/use-notifications.ts
@@ -1,7 +1,7 @@
"use client"
import { useEffect, useState, useCallback, useRef } from 'react'
-import { initElectric, getElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
+import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
export interface Notification {
id: number
@@ -22,9 +22,9 @@ export function useNotifications(userId: string | null) {
const [initialized, setInitialized] = useState(false)
const [error, setError] = useState(null)
const syncHandleRef = useRef(null)
- const pollIntervalRef = useRef | null>(null)
+ const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null)
- // Initialize Electric SQL and start syncing
+ // Initialize Electric SQL and start syncing with real-time updates
useEffect(() => {
if (!userId || initialized) return
@@ -37,12 +37,42 @@ export function useNotifications(userId: string | null) {
setElectric(electricClient)
- // Start syncing notifications for this user
- const handle = await electricClient.syncShape({
+ // Start syncing notifications for this user via Electric SQL
+ // Note: user_id is stored as TEXT in PGlite (UUID from backend is converted)
+ console.log('Starting Electric SQL sync for user:', userId)
+
+ // Use string format for WHERE clause (PGlite sync plugin expects this format)
+ // The user_id is a UUID string, so we need to quote it properly
+ const handle = await electricClient.syncShape({
table: 'notifications',
where: `user_id = '${userId}'`,
primaryKey: ['id'],
})
+
+ console.log('Electric SQL sync started:', {
+ isUpToDate: handle.isUpToDate,
+ hasStream: !!handle.stream,
+ hasInitialSyncPromise: !!handle.initialSyncPromise,
+ })
+
+ // Wait for initial sync to complete if the promise is available
+ if (handle.initialSyncPromise) {
+ console.log('Waiting for initial sync to complete...')
+ try {
+ await handle.initialSyncPromise
+ console.log('Initial sync promise resolved, checking status:', {
+ isUpToDate: handle.isUpToDate,
+ })
+ } catch (syncErr) {
+ console.error('Initial sync failed:', syncErr)
+ }
+ }
+
+ // Check status after waiting
+ console.log('Sync status after waiting:', {
+ isUpToDate: handle.isUpToDate,
+ hasStream: !!handle.stream,
+ })
if (!mounted) {
handle.unsubscribe()
@@ -53,8 +83,113 @@ export function useNotifications(userId: string | null) {
setInitialized(true)
setError(null)
- // Initial fetch
+ // Fetch notifications after sync is complete (we already waited above)
await fetchNotifications(electricClient.db)
+
+ // Set up real-time updates using PGlite live queries
+ // Electric SQL syncs data to PGlite in real-time via WebSocket/HTTP
+ // PGlite live queries detect when the synced data changes and trigger callbacks
+ try {
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ const db = electricClient.db as any
+
+ // Use PGlite's live query API for real-time updates
+ // Based on latest PGlite docs: db.live.query(query, params, callback)
+ if (db.live?.query && typeof db.live.query === 'function') {
+ const liveQuery = db.live.query(
+ `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`,
+ [userId],
+ (result: { rows: Notification[] }) => {
+ // This callback fires automatically when Electric SQL syncs changes
+ if (mounted) {
+ setNotifications(result.rows)
+ }
+ }
+ )
+
+ // Set initial results immediately
+ if (liveQuery.initialResults) {
+ setNotifications(liveQuery.initialResults.rows)
+ }
+
+ if (mounted && liveQuery && typeof liveQuery.unsubscribe === 'function') {
+ liveQueryRef.current = liveQuery
+ console.log('✅ Real-time notifications enabled via PGlite live queries')
+ }
+ } else {
+ // Fallback: Monitor sync handle for updates
+ // Electric SQL's syncShape should trigger updates, but we need to detect them
+ // This is a lightweight approach that only checks when sync indicates changes
+ console.warn('PGlite live queries not available - using sync-based change detection')
+
+ let lastNotificationIds = new Set()
+
+ const checkForSyncUpdates = async () => {
+ if (!mounted) return
+
+ try {
+ const result = await electricClient.db.query(
+ `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`,
+ [userId]
+ )
+
+ // PGlite query returns { rows: [] } format
+ const rows = result.rows || []
+
+ // Only update if data actually changed
+ const currentIds = new Set(rows.map(r => r.id))
+ const currentHash = JSON.stringify(
+ rows.map(r => ({ id: r.id, read: r.read, updated_at: r.updated_at }))
+ )
+
+ // Check if IDs changed (new/deleted notifications)
+ const idsChanged =
+ currentIds.size !== lastNotificationIds.size ||
+ [...currentIds].some(id => !lastNotificationIds.has(id)) ||
+ [...lastNotificationIds].some(id => !currentIds.has(id))
+
+ if (idsChanged) {
+ setNotifications(rows)
+ lastNotificationIds = currentIds
+ } else {
+ // Check if any notification properties changed (e.g., read status)
+ // Compare with current state
+ setNotifications(prev => {
+ const prevHash = JSON.stringify(
+ prev.map(r => ({ id: r.id, read: r.read, updated_at: r.updated_at }))
+ )
+ if (prevHash !== currentHash) {
+ return rows
+ }
+ return prev
+ })
+ }
+ } catch (err) {
+ console.error('Failed to check for notification updates:', err)
+ }
+
+ // Check again after a short delay (Electric SQL syncs are fast)
+ if (mounted) {
+ setTimeout(checkForSyncUpdates, 500) // Check every 500ms - Electric SQL syncs are near-instant
+ }
+ }
+
+ // Start monitoring
+ checkForSyncUpdates()
+
+ liveQueryRef.current = {
+ unsubscribe: () => {
+ mounted = false
+ }
+ }
+ }
+ } catch (liveErr) {
+ console.warn('Failed to set up real-time updates:', liveErr)
+ // Minimal fallback - this should rarely be needed
+ liveQueryRef.current = {
+ unsubscribe: () => {}
+ }
+ }
} catch (err) {
if (!mounted) return
console.error('Failed to initialize Electric SQL:', err)
@@ -66,17 +201,29 @@ export function useNotifications(userId: string | null) {
async function fetchNotifications(db: InstanceType) {
try {
+ // Debug: Check all notifications first
+ const allNotifications = await db.query(
+ `SELECT * FROM notifications ORDER BY created_at DESC`
+ )
+ console.log('All notifications in PGlite:', allNotifications.rows?.length || 0, allNotifications.rows)
+
+ // Use PGlite's query method (not exec for SELECT queries)
const result = await db.query(
`SELECT * FROM notifications
WHERE user_id = $1
ORDER BY created_at DESC`,
[userId]
)
+ console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows)
+
if (mounted) {
- setNotifications(result.rows)
+ // PGlite query returns { rows: [] } format
+ setNotifications(result.rows || [])
}
} catch (err) {
console.error('Failed to fetch notifications:', err)
+ // Log more details for debugging
+ console.error('Error details:', err)
}
}
@@ -88,38 +235,13 @@ export function useNotifications(userId: string | null) {
syncHandleRef.current.unsubscribe()
syncHandleRef.current = null
}
+ if (liveQueryRef.current) {
+ liveQueryRef.current.unsubscribe()
+ liveQueryRef.current = null
+ }
}
}, [userId, initialized])
- // Poll for updates (PGlite doesn't have live queries like the old electric-sql)
- useEffect(() => {
- if (!electric || !userId || !initialized) return
-
- const fetchNotifications = async () => {
- try {
- const result = await electric.db.query(
- `SELECT * FROM notifications
- WHERE user_id = $1
- ORDER BY created_at DESC`,
- [userId]
- )
- setNotifications(result.rows)
- } catch (err) {
- console.error('Failed to fetch notifications:', err)
- }
- }
-
- // Poll every 2 seconds for updates
- pollIntervalRef.current = setInterval(fetchNotifications, 2000)
-
- return () => {
- if (pollIntervalRef.current) {
- clearInterval(pollIntervalRef.current)
- pollIntervalRef.current = null
- }
- }
- }, [electric, userId, initialized])
-
// Mark notification as read (local only - needs backend sync)
const markAsRead = useCallback(
async (notificationId: number) => {
@@ -130,7 +252,7 @@ export function useNotifications(userId: string | null) {
try {
// Update locally in PGlite
- await electric.db.exec(
+ await electric.db.query(
`UPDATE notifications SET read = true, updated_at = NOW() WHERE id = $1`,
[notificationId]
)
diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts
index 19feb87a0..97a2c1ca0 100644
--- a/surfsense_web/lib/electric/client.ts
+++ b/surfsense_web/lib/electric/client.ts
@@ -9,11 +9,12 @@
import { PGlite } from '@electric-sql/pglite'
import { electricSync } from '@electric-sql/pglite-sync'
+import { live } from '@electric-sql/pglite/live'
// Types
export interface ElectricClient {
db: PGlite
- syncShape: >(options: SyncShapeOptions) => Promise>
+ syncShape: (options: SyncShapeOptions) => Promise
}
export interface SyncShapeOptions {
@@ -23,13 +24,13 @@ export interface SyncShapeOptions {
primaryKey?: string[]
}
-export interface SyncHandle> {
+export interface SyncHandle {
unsubscribe: () => void
- isUpToDate: boolean
- shape: {
- handle?: string
- offset?: string
- }
+ readonly isUpToDate: boolean
+ // The stream property contains the ShapeStreamInterface from pglite-sync
+ stream?: unknown
+ // Promise that resolves when initial sync is complete
+ initialSyncPromise?: Promise
}
// Singleton instance
@@ -37,6 +38,10 @@ let electricClient: ElectricClient | null = null
let isInitializing = false
let initPromise: Promise | null = null
+// Version for sync state - increment this to force fresh sync when Electric config changes
+// Incremented to v4 to fix sync completion issues
+const SYNC_VERSION = 4
+
// Get Electric URL from environment
function getElectricUrl(): string {
if (typeof window !== 'undefined') {
@@ -60,11 +65,15 @@ export async function initElectric(): Promise {
isInitializing = true
initPromise = (async () => {
try {
- // Create PGlite instance with Electric sync plugin
- const db = await PGlite.create('idb://surfsense-notifications', {
+ // Create PGlite instance with Electric sync plugin and live queries
+ // Include version in database name to force fresh sync when Electric config changes
+ const db = await PGlite.create({
+ dataDir: `idb://surfsense-notifications-v${SYNC_VERSION}`,
relaxedDurability: true,
extensions: {
- electric: electricSync(),
+ // Enable debug mode in electricSync to see detailed sync logs
+ electric: electricSync({ debug: true }),
+ live, // Enable live queries for real-time updates
},
})
@@ -93,36 +102,185 @@ export async function initElectric(): Promise {
// Create the client wrapper
electricClient = {
db,
- syncShape: async >(options: SyncShapeOptions): Promise> => {
+ syncShape: async (options: SyncShapeOptions): Promise => {
const { table, where, columns, primaryKey = ['id'] } = options
- // Build params for the shape request
- const params: Record = { table }
- if (where) params.where = where
- if (columns) params.columns = columns.join(',')
+ // Build params for the shape request
+ // Electric SQL expects params as URL query parameters
+ const params: Record = { table }
+
+ // Validate and fix WHERE clause to ensure string literals are properly quoted
+ let validatedWhere = where
+ if (where) {
+ // Check if where uses positional parameters
+ if (where.includes('$1')) {
+ // Extract the value from the where clause if it's embedded
+ // For now, we'll use the where clause as-is and let Electric handle it
+ params.where = where
+ validatedWhere = where
+ } else {
+ // Validate that string literals are properly quoted
+ // Count single quotes - should be even (pairs) for properly quoted strings
+ const singleQuoteCount = (where.match(/'/g) || []).length
+
+ if (singleQuoteCount % 2 !== 0) {
+ // Odd number of quotes means unterminated string literal
+ console.warn('Where clause has unmatched quotes, fixing:', where)
+ // Add closing quote at the end
+ validatedWhere = `${where}'`
+ params.where = validatedWhere
+ } else {
+ // Use the where clause directly (already formatted)
+ params.where = where
+ validatedWhere = where
+ }
+ }
+ }
+
+ if (columns) params.columns = columns.join(',')
- // Use PGlite's electric sync plugin to sync the shape
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const shape = await (db as any).electric.syncShapeToTable({
+ console.log('Syncing shape with params:', params)
+ console.log('Electric URL:', `${electricUrl}/v1/shape`)
+ console.log('Where clause:', where, 'Validated:', validatedWhere)
+
+ try {
+ // Debug: Test Electric SQL connection directly first
+ // Use validatedWhere to ensure proper URL encoding
+ const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ''}`
+ console.log('Testing Electric SQL directly:', testUrl)
+ try {
+ const testResponse = await fetch(testUrl)
+ const testHeaders = {
+ handle: testResponse.headers.get('electric-handle'),
+ offset: testResponse.headers.get('electric-offset'),
+ upToDate: testResponse.headers.get('electric-up-to-date'),
+ }
+ console.log('Direct Electric SQL response headers:', testHeaders)
+ const testData = await testResponse.json()
+ console.log('Direct Electric SQL data count:', Array.isArray(testData) ? testData.length : 'not array', testData)
+ } catch (testErr) {
+ console.error('Direct Electric SQL test failed:', testErr)
+ }
+
+ // Use PGlite's electric sync plugin to sync the shape
+ // According to Electric SQL docs, the shape config uses params for table, where, columns
+ // Note: mapColumns is OPTIONAL per pglite-sync types.ts
+
+ // Create a promise that resolves when initial sync is complete
+ let resolveInitialSync: () => void
+ let rejectInitialSync: (error: Error) => void
+ const initialSyncPromise = new Promise((resolve, reject) => {
+ resolveInitialSync = resolve
+ rejectInitialSync = reject
+ // Safety timeout - if sync doesn't complete in 30s, something is wrong
+ setTimeout(() => {
+ console.warn(`⚠️ Sync timeout for ${table} - sync did not complete in 30s`)
+ resolve() // Resolve anyway to not block, but log warning
+ }, 30000)
+ })
+
+ const shapeConfig = {
shape: {
url: `${electricUrl}/v1/shape`,
- params,
+ params: {
+ table,
+ ...(validatedWhere ? { where: validatedWhere } : {}),
+ ...(columns ? { columns: columns.join(',') } : {}),
+ },
},
table,
primaryKey,
+ shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, '_') || 'all'}`, // Versioned key to force fresh sync when needed
+ onInitialSync: () => {
+ console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`)
+ resolveInitialSync()
+ },
+ onError: (error: Error) => {
+ console.error(`❌ Shape sync error for ${table}:`, error)
+ console.error('Error details:', JSON.stringify(error, Object.getOwnPropertyNames(error)))
+ rejectInitialSync(error)
+ },
+ }
+
+ console.log('syncShapeToTable config:', JSON.stringify(shapeConfig, null, 2))
+
+ // Type assertion to PGlite with electric extension
+ const pgWithElectric = db as PGlite & { electric: { syncShapeToTable: (config: typeof shapeConfig) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }> } }
+ const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig)
+
+ if (!shape) {
+ throw new Error('syncShapeToTable returned undefined')
+ }
+
+ // Log the actual shape result structure
+ console.log('Shape sync result (initial):', {
+ hasUnsubscribe: typeof shape?.unsubscribe === 'function',
+ isUpToDate: shape?.isUpToDate,
+ hasStream: !!shape?.stream,
+ streamType: typeof shape?.stream,
+ })
+
+ // Debug the stream if available
+ if (shape?.stream) {
+ const stream = shape.stream as any
+ console.log('Shape stream details:', {
+ shapeHandle: stream?.shapeHandle,
+ lastOffset: stream?.lastOffset,
+ isUpToDate: stream?.isUpToDate,
+ error: stream?.error,
+ hasSubscribe: typeof stream?.subscribe === 'function',
+ hasUnsubscribe: typeof stream?.unsubscribe === 'function',
+ })
+
+ // Try to subscribe to the stream to see if it's receiving messages
+ if (typeof stream?.subscribe === 'function') {
+ console.log('Subscribing to shape stream for debugging...')
+ stream.subscribe((messages: unknown[]) => {
+ console.log('🔵 Shape stream received messages:', messages?.length || 0)
+ if (messages && messages.length > 0) {
+ console.log('First message:', JSON.stringify(messages[0], null, 2))
+ }
+ })
+ }
+ }
+
+ // Wait briefly to see if sync starts
+ await new Promise(resolve => setTimeout(resolve, 100))
+ console.log('Shape sync result (after 100ms):', {
+ isUpToDate: shape?.isUpToDate,
})
+ // Return the shape handle - isUpToDate is a getter that reflects current state
return {
unsubscribe: () => {
+ console.log('unsubscribing')
if (shape && typeof shape.unsubscribe === 'function') {
shape.unsubscribe()
}
},
- isUpToDate: shape?.isUpToDate ?? false,
- shape: {
- handle: shape?.handle,
- offset: shape?.offset,
+ // Use getter to always return current state
+ get isUpToDate() {
+ return shape?.isUpToDate ?? false
},
+ stream: shape?.stream,
+ initialSyncPromise, // Expose promise so callers can wait for sync
+ }
+ } catch (error) {
+ console.error('Failed to sync shape:', error)
+ // Check if Electric SQL server is reachable
+ try {
+ const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
+ method: 'GET',
+ })
+ console.log('Electric SQL server response:', response.status, response.statusText)
+ if (!response.ok) {
+ console.error('Electric SQL server error:', await response.text())
+ }
+ } catch (fetchError) {
+ console.error('Cannot reach Electric SQL server:', fetchError)
+ console.error('Make sure Electric SQL is running at:', electricUrl)
+ }
+ throw error
}
},
}
diff --git a/surfsense_web/package.json b/surfsense_web/package.json
index e097fee33..372be1bcd 100644
--- a/surfsense_web/package.json
+++ b/surfsense_web/package.json
@@ -30,7 +30,7 @@
"@blocknote/react": "^0.45.0",
"@blocknote/server-util": "^0.45.0",
"@electric-sql/client": "^1.4.0",
- "@electric-sql/pglite": "^0.2.17",
+ "@electric-sql/pglite": "^0.3.14",
"@electric-sql/pglite-sync": "^0.4.0",
"@electric-sql/react": "^1.0.26",
"@hookform/resolvers": "^5.2.2",
diff --git a/surfsense_web/pnpm-lock.yaml b/surfsense_web/pnpm-lock.yaml
index ad26fa624..0b93260c5 100644
--- a/surfsense_web/pnpm-lock.yaml
+++ b/surfsense_web/pnpm-lock.yaml
@@ -36,11 +36,11 @@ importers:
specifier: ^1.4.0
version: 1.4.0
'@electric-sql/pglite':
- specifier: ^0.2.17
- version: 0.2.17
+ specifier: ^0.3.14
+ version: 0.3.14
'@electric-sql/pglite-sync':
specifier: ^0.4.0
- version: 0.4.0(@electric-sql/pglite@0.2.17)
+ version: 0.4.0(@electric-sql/pglite@0.3.14)
'@electric-sql/react':
specifier: ^1.0.26
version: 1.0.26(react@19.2.3)
@@ -157,7 +157,7 @@ importers:
version: 17.2.3
drizzle-orm:
specifier: ^0.44.5
- version: 0.44.7(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7)
+ version: 0.44.7(@electric-sql/pglite@0.3.14)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7)
emblor:
specifier: ^1.4.8
version: 1.4.8(@types/react-dom@19.2.3(@types/react@19.2.7))(@types/react@19.2.7)(react-dom@19.2.3(react@19.2.3))(react@19.2.3)
@@ -575,8 +575,8 @@ packages:
peerDependencies:
'@electric-sql/pglite': 0.3.14
- '@electric-sql/pglite@0.2.17':
- resolution: {integrity: sha512-qEpKRT2oUaWDH6tjRxLHjdzMqRUGYDnGZlKrnL4dJ77JVMcP2Hpo3NYnOSPKdZdeec57B6QPprCUFg0picx5Pw==}
+ '@electric-sql/pglite@0.3.14':
+ resolution: {integrity: sha512-3DB258dhqdsArOI1fIt7cb9RpUOgcDg5hXWVgVHAeqVQ/qxtFy605QKs4gx6mFq3jWsSPqDN8TgSEsqC3OfV9Q==}
'@electric-sql/react@1.0.26':
resolution: {integrity: sha512-cCKLQrtGNaAPBzdLZk97bK/Hue3fKkfL0/aA5HAPzoo7U07/TRzzs4EVRy7q+BV6AONEK+YXxxrzH9gEH8YVQA==}
@@ -6760,13 +6760,13 @@ snapshots:
optionalDependencies:
'@rollup/rollup-darwin-arm64': 4.55.1
- '@electric-sql/pglite-sync@0.4.0(@electric-sql/pglite@0.2.17)':
+ '@electric-sql/pglite-sync@0.4.0(@electric-sql/pglite@0.3.14)':
dependencies:
'@electric-sql/client': 1.4.0
'@electric-sql/experimental': 1.0.14(@electric-sql/client@1.4.0)
- '@electric-sql/pglite': 0.2.17
+ '@electric-sql/pglite': 0.3.14
- '@electric-sql/pglite@0.2.17': {}
+ '@electric-sql/pglite@0.3.14': {}
'@electric-sql/react@1.0.26(react@19.2.3)':
dependencies:
@@ -9701,9 +9701,9 @@ snapshots:
transitivePeerDependencies:
- supports-color
- drizzle-orm@0.44.7(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7):
+ drizzle-orm@0.44.7(@electric-sql/pglite@0.3.14)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7):
optionalDependencies:
- '@electric-sql/pglite': 0.2.17
+ '@electric-sql/pglite': 0.3.14
'@opentelemetry/api': 1.9.0
'@prisma/client': 4.8.1
'@types/pg': 8.16.0