feat: Implement notification system with real-time updates and Electric SQL integration

- Added notifications table to the database schema with replication support for Electric SQL.
- Developed NotificationService to manage indexing notifications, including creation, updates, and status tracking.
- Introduced NotificationButton and NotificationPopup components for displaying notifications in the UI.
- Enhanced useNotifications hook for real-time notification syncing using PGlite live queries.
- Updated package dependencies for Electric SQL and improved error handling in notification processes.
This commit is contained in:
Anish Sarkar 2026-01-12 22:50:15 +05:30
parent f441c7b0ce
commit 93d17b51f5
10 changed files with 1062 additions and 103 deletions

View file

@ -41,12 +41,27 @@ def upgrade() -> None:
op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"])
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
# This allows Electric SQL to track all column values for updates/deletes
op.execute("ALTER TABLE notifications REPLICA IDENTITY FULL;")
# Note: ElectricSQL 1.x dynamically adds tables to the publication when
# clients subscribe to shapes. No need to manually create publications.
# Grant SELECT to electric user for Electric SQL replication
# This is needed because ALTER DEFAULT PRIVILEGES only applies during initial DB setup
op.execute("GRANT SELECT ON notifications TO electric;")
# Add notifications table to Electric SQL publication for replication
# This is required for Electric SQL to sync the table
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_publication_tables
WHERE pubname = 'electric_publication_default'
AND tablename = 'notifications'
) THEN
ALTER PUBLICATION electric_publication_default ADD TABLE notifications;
END IF;
END
$$;
""")
def downgrade() -> None:
"""Downgrade schema - remove notifications table."""

View file

@ -30,6 +30,7 @@ from app.db import (
async_session_maker,
get_async_session,
)
from app.services.notification_service import NotificationService
from app.schemas import (
GoogleDriveIndexRequest,
SearchSourceConnectorBase,
@ -973,6 +974,118 @@ async def run_slack_indexing(
logger.error(f"Error in background Slack indexing task: {e!s}")
async def _run_indexing_with_notifications(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
indexing_function,
update_timestamp_func=None,
):
"""
Generic helper to run indexing with real-time notifications.
Args:
session: Database session
connector_id: ID of the connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
indexing_function: Async function that performs the indexing
update_timestamp_func: Optional function to update connector timestamp
"""
from uuid import UUID
notification = None
try:
# Get connector info for notification
connector_result = await session.execute(
select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
)
connector = connector_result.scalar_one_or_none()
if connector:
# Create notification when indexing starts
notification = await NotificationService.connector_indexing.notify_indexing_started(
session=session,
user_id=UUID(user_id),
connector_id=connector_id,
connector_name=connector.name,
connector_type=connector.connector_type.value,
search_space_id=search_space_id,
start_date=start_date,
end_date=end_date,
)
# Run the indexing function
documents_processed, error_or_warning = await indexing_function(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False,
)
# Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func:
await update_timestamp_func(session, connector_id)
logger.info(
f"Indexing completed successfully: {documents_processed} documents processed"
)
# Update notification on success
if notification:
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=documents_processed,
error_message=None,
)
elif documents_processed > 0:
# Success but no timestamp update function
logger.info(
f"Indexing completed successfully: {documents_processed} documents processed"
)
if notification:
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=documents_processed,
error_message=None,
)
else:
# Failure or no documents processed
logger.error(
f"Indexing failed or no documents processed: {error_or_warning}"
)
if notification:
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=error_or_warning or "No documents processed",
)
except Exception as e:
logger.error(f"Error in indexing task: {e!s}", exc_info=True)
# Update notification on exception
if notification:
try:
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=str(e),
)
except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}")
async def run_notion_indexing_with_new_session(
connector_id: int,
search_space_id: int,
@ -985,8 +1098,15 @@ async def run_notion_indexing_with_new_session(
This prevents session leaks by creating a dedicated session for the background task.
"""
async with async_session_maker() as session:
await run_notion_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_notion_pages,
update_timestamp_func=_update_connector_timestamp_by_id,
)
@ -1009,30 +1129,16 @@ async def run_notion_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
try:
# Index Notion pages without updating last_indexed_at (we'll do it separately)
documents_processed, error_or_warning = await index_notion_pages(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False, # Don't update timestamp in the indexing function
)
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
if documents_processed > 0:
await _update_connector_timestamp_by_id(session, connector_id)
logger.info(
f"Notion indexing completed successfully: {documents_processed} documents processed"
)
else:
logger.error(
f"Notion indexing failed or no documents processed: {error_or_warning}"
)
except Exception as e:
logger.error(f"Error in background Notion indexing task: {e!s}")
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_notion_pages,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for GitHub indexing

View file

@ -1,18 +1,343 @@
"""Service for creating and managing notifications."""
"""Service for creating and managing notifications with Electric SQL sync."""
import logging
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.db import Notification
logger = logging.getLogger(__name__)
class BaseNotificationHandler:
"""Base class for notification handlers - provides common functionality."""
def __init__(self, notification_type: str):
"""
Initialize the notification handler.
Args:
notification_type: Type of notification (e.g., 'connector_indexing', 'document_processing')
"""
self.notification_type = notification_type
async def find_notification_by_operation(
self,
session: AsyncSession,
user_id: UUID,
operation_id: str,
search_space_id: int | None = None,
) -> Notification | None:
"""
Find an existing notification by operation ID.
Args:
session: Database session
user_id: User ID
operation_id: Unique operation identifier
search_space_id: Optional search space ID
Returns:
Notification if found, None otherwise
"""
query = select(Notification).where(
Notification.user_id == user_id,
Notification.type == self.notification_type,
Notification.notification_metadata["operation_id"].astext == operation_id,
)
if search_space_id is not None:
query = query.where(Notification.search_space_id == search_space_id)
result = await session.execute(query)
return result.scalar_one_or_none()
async def find_or_create_notification(
self,
session: AsyncSession,
user_id: UUID,
operation_id: str,
title: str,
message: str,
search_space_id: int | None = None,
initial_metadata: dict[str, Any] | None = None,
) -> Notification:
"""
Find an existing notification or create a new one.
Args:
session: Database session
user_id: User ID
operation_id: Unique operation identifier
title: Notification title
message: Notification message
search_space_id: Optional search space ID
initial_metadata: Initial metadata dictionary
Returns:
Notification: The found or created notification
"""
# Try to find existing notification
notification = await self.find_notification_by_operation(
session, user_id, operation_id, search_space_id
)
if notification:
# Update existing notification
notification.title = title
notification.message = message
if initial_metadata:
notification.notification_metadata = {
**notification.notification_metadata,
**initial_metadata,
}
# Mark JSONB column as modified so SQLAlchemy detects the change
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
logger.info(f"Updated notification {notification.id} for operation {operation_id}")
return notification
# Create new notification
metadata = initial_metadata or {}
metadata["operation_id"] = operation_id
metadata["status"] = "in_progress"
metadata["started_at"] = datetime.now(UTC).isoformat()
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=self.notification_type,
title=title,
message=message,
notification_metadata=metadata,
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(f"Created notification {notification.id} for operation {operation_id}")
return notification
async def update_notification(
self,
session: AsyncSession,
notification: Notification,
title: str | None = None,
message: str | None = None,
status: str | None = None,
metadata_updates: dict[str, Any] | None = None,
) -> Notification:
"""
Update an existing notification.
Args:
session: Database session
notification: Notification to update
title: New title (optional)
message: New message (optional)
status: New status (optional)
metadata_updates: Additional metadata to merge (optional)
Returns:
Updated notification
"""
if title is not None:
notification.title = title
if message is not None:
notification.message = message
if status is not None:
notification.notification_metadata["status"] = status
if status in ("completed", "failed"):
notification.notification_metadata["completed_at"] = (
datetime.now(UTC).isoformat()
)
# Mark JSONB column as modified so SQLAlchemy detects the change
flag_modified(notification, "notification_metadata")
if metadata_updates:
notification.notification_metadata = {
**notification.notification_metadata,
**metadata_updates,
}
# Mark JSONB column as modified
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
logger.info(f"Updated notification {notification.id}")
return notification
class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"""Handler for connector indexing notifications."""
def __init__(self):
super().__init__("connector_indexing")
def _generate_operation_id(
self, connector_id: int, start_date: str | None = None, end_date: str | None = None
) -> str:
"""
Generate a unique operation ID for a connector indexing operation.
Args:
connector_id: Connector ID
start_date: Start date (optional)
end_date: End date (optional)
Returns:
Unique operation ID string
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
date_range = ""
if start_date or end_date:
date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
return f"connector_{connector_id}_{timestamp}{date_range}"
async def notify_indexing_started(
self,
session: AsyncSession,
user_id: UUID,
connector_id: int,
connector_name: str,
connector_type: str,
search_space_id: int,
start_date: str | None = None,
end_date: str | None = None,
) -> Notification:
"""
Create or update notification when connector indexing starts.
Args:
session: Database session
user_id: User ID
connector_id: Connector ID
connector_name: Connector name
connector_type: Connector type
search_space_id: Search space ID
start_date: Start date for indexing
end_date: End date for indexing
Returns:
Notification: The created or updated notification
"""
operation_id = self._generate_operation_id(connector_id, start_date, end_date)
title = f"Indexing: {connector_name}"
message = f'Indexing "{connector_name}" in progress...'
metadata = {
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"start_date": start_date,
"end_date": end_date,
"indexed_count": 0,
}
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
async def notify_indexing_progress(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
total_count: int | None = None,
) -> Notification:
"""
Update notification with indexing progress.
Args:
session: Database session
notification: Notification to update
indexed_count: Number of items indexed so far
total_count: Total number of items (optional)
Returns:
Updated notification
"""
connector_name = notification.notification_metadata.get("connector_name", "Connector")
progress_msg = f'Indexing "{connector_name}": {indexed_count} items'
if total_count is not None:
progress_msg += f" of {total_count}"
progress_msg += " indexed..."
metadata_updates = {"indexed_count": indexed_count}
if total_count is not None:
metadata_updates["total_count"] = total_count
progress_percent = int((indexed_count / total_count) * 100)
metadata_updates["progress_percent"] = progress_percent
return await self.update_notification(
session=session,
notification=notification,
message=progress_msg,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_indexing_completed(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
error_message: str | None = None,
) -> Notification:
"""
Update notification when connector indexing completes.
Args:
session: Database session
notification: Notification to update
indexed_count: Total number of items indexed
error_message: Error message if indexing failed (optional)
Returns:
Updated notification
"""
connector_name = notification.notification_metadata.get("connector_name", "Connector")
if error_message:
title = f"Indexing failed: {connector_name}"
message = f'Indexing "{connector_name}" failed: {error_message}'
status = "failed"
else:
title = f"Indexing completed: {connector_name}"
message = f'Indexing "{connector_name}" completed successfully. {indexed_count} items indexed.'
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"error_message": error_message,
}
return await self.update_notification(
session=session,
notification=notification,
title=title,
message=message,
status=status,
metadata_updates=metadata_updates,
)
class NotificationService:
"""Service for creating notifications that sync via Electric SQL."""
"""Service for creating and managing notifications that sync via Electric SQL."""
# Handler instances
connector_indexing = ConnectorIndexingNotificationHandler()
@staticmethod
async def create_notification(
@ -105,6 +430,7 @@ class NotificationService:
) -> Notification:
"""
Create notification when connector indexing completes.
DEPRECATED: Use NotificationService.connector_indexing methods instead.
Args:
session: Database session

View file

@ -3,6 +3,7 @@
import { Moon, Sun } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { NotificationButton } from "@/components/notifications/NotificationButton";
interface HeaderProps {
breadcrumb?: React.ReactNode;
@ -29,6 +30,9 @@ export function Header({
{/* Right side - Actions */}
<div className="flex items-center gap-2">
{/* Notifications */}
<NotificationButton />
{/* Theme toggle */}
{onToggleTheme && (
<Tooltip>

View file

@ -0,0 +1,53 @@
"use client";
import { Bell } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { useNotifications } from "@/hooks/use-notifications";
import { useAtomValue } from "jotai";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
import { NotificationPopup } from "./NotificationPopup";
import { cn } from "@/lib/utils";
export function NotificationButton() {
const { data: user } = useAtomValue(currentUserAtom);
const userId = user?.id ? String(user.id) : null;
const { notifications, unreadCount, loading, markAsRead, markAllAsRead } = useNotifications(userId);
return (
<Popover>
<Tooltip>
<TooltipTrigger asChild>
<PopoverTrigger asChild>
<Button variant="outline" size="icon" className="h-8 w-8 relative">
<Bell className="h-4 w-4" />
{unreadCount > 0 && (
<span
className={cn(
"absolute -top-1 -right-1 flex h-5 w-5 items-center justify-center rounded-full bg-destructive text-[10px] font-medium text-destructive-foreground",
unreadCount > 9 && "px-1"
)}
>
{unreadCount > 99 ? "99+" : unreadCount}
</span>
)}
<span className="sr-only">Notifications</span>
</Button>
</PopoverTrigger>
</TooltipTrigger>
<TooltipContent>Notifications</TooltipContent>
</Tooltip>
<PopoverContent align="end" className="w-80 p-0">
<NotificationPopup
notifications={notifications}
unreadCount={unreadCount}
loading={loading}
markAsRead={markAsRead}
markAllAsRead={markAllAsRead}
/>
</PopoverContent>
</Popover>
);
}

View file

@ -0,0 +1,175 @@
"use client";
import { Bell, Check, CheckCheck, Loader2, AlertCircle, CheckCircle2 } from "lucide-react";
import { Button } from "@/components/ui/button";
import { ScrollArea } from "@/components/ui/scroll-area";
import { Separator } from "@/components/ui/separator";
import { Badge } from "@/components/ui/badge";
import type { Notification } from "@/hooks/use-notifications";
import { formatDistanceToNow } from "date-fns";
import { cn } from "@/lib/utils";
interface NotificationPopupProps {
notifications: Notification[];
unreadCount: number;
loading: boolean;
markAsRead: (id: number) => Promise<boolean>;
markAllAsRead: () => Promise<boolean>;
}
export function NotificationPopup({
notifications,
unreadCount,
loading,
markAsRead,
markAllAsRead,
}: NotificationPopupProps) {
const handleMarkAsRead = async (id: number) => {
await markAsRead(id);
};
const handleMarkAllAsRead = async () => {
await markAllAsRead();
};
const formatTime = (dateString: string) => {
try {
return formatDistanceToNow(new Date(dateString), { addSuffix: true });
} catch {
return "Recently";
}
};
const getStatusBadge = (notification: Notification) => {
const status = notification.metadata?.status as string | undefined;
if (!status) return null;
switch (status) {
case "in_progress":
return (
<Badge variant="secondary" className="text-xs">
<Loader2 className="h-3 w-3 mr-1 animate-spin" />
In Progress
</Badge>
);
case "completed":
return (
<Badge variant="default" className="text-xs bg-green-600 hover:bg-green-700">
<CheckCircle2 className="h-3 w-3 mr-1" />
Completed
</Badge>
);
case "failed":
return (
<Badge variant="destructive" className="text-xs">
<AlertCircle className="h-3 w-3 mr-1" />
Failed
</Badge>
);
default:
return null;
}
};
return (
<div className="flex flex-col">
{/* Header */}
<div className="flex items-center justify-between px-4 py-3 border-b">
<div className="flex items-center gap-2">
<Bell className="h-4 w-4" />
<h3 className="font-semibold text-sm">Notifications</h3>
{unreadCount > 0 && (
<span className="flex h-5 w-5 items-center justify-center rounded-full bg-destructive text-[10px] font-medium text-destructive-foreground">
{unreadCount > 99 ? "99+" : unreadCount}
</span>
)}
</div>
{unreadCount > 0 && (
<Button
variant="ghost"
size="sm"
onClick={handleMarkAllAsRead}
className="h-7 text-xs"
>
<CheckCheck className="h-3.5 w-3.5 mr-1" />
Mark all read
</Button>
)}
</div>
{/* Notifications List */}
<ScrollArea className="h-[400px]">
{loading ? (
<div className="flex items-center justify-center py-8">
<Loader2 className="h-5 w-5 animate-spin text-muted-foreground" />
</div>
) : notifications.length === 0 ? (
<div className="flex flex-col items-center justify-center py-8 px-4 text-center">
<Bell className="h-8 w-8 text-muted-foreground mb-2" />
<p className="text-sm text-muted-foreground">No notifications</p>
</div>
) : (
<div className="py-2">
{notifications.map((notification, index) => (
<div key={notification.id}>
<button
type="button"
onClick={() => !notification.read && handleMarkAsRead(notification.id)}
className={cn(
"w-full px-4 py-3 text-left hover:bg-accent transition-colors",
!notification.read && "bg-accent/50"
)}
>
<div className="flex items-start gap-3">
<div className="flex-1 min-w-0">
<div className="flex items-start justify-between gap-2 mb-1">
<p
className={cn(
"text-sm font-medium truncate",
!notification.read && "font-semibold"
)}
>
{notification.title}
</p>
<div className="flex items-center gap-2 shrink-0">
{getStatusBadge(notification)}
{!notification.read && (
<div className="h-2 w-2 rounded-full bg-primary mt-1.5" />
)}
</div>
</div>
<p className="text-xs text-muted-foreground line-clamp-2">
{notification.message}
</p>
<div className="flex items-center justify-between mt-2">
<span className="text-xs text-muted-foreground">
{formatTime(notification.created_at)}
</span>
{!notification.read && (
<Button
variant="ghost"
size="sm"
className="h-6 px-2 text-xs"
onClick={(e) => {
e.stopPropagation();
handleMarkAsRead(notification.id);
}}
>
<Check className="h-3 w-3 mr-1" />
Mark read
</Button>
)}
</div>
</div>
</div>
</button>
{index < notifications.length - 1 && <Separator />}
</div>
))}
</div>
)}
</ScrollArea>
</div>
);
}

View file

@ -1,7 +1,7 @@
"use client"
import { useEffect, useState, useCallback, useRef } from 'react'
import { initElectric, getElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
export interface Notification {
id: number
@ -22,9 +22,9 @@ export function useNotifications(userId: string | null) {
const [initialized, setInitialized] = useState(false)
const [error, setError] = useState<Error | null>(null)
const syncHandleRef = useRef<SyncHandle | null>(null)
const pollIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null)
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null)
// Initialize Electric SQL and start syncing
// Initialize Electric SQL and start syncing with real-time updates
useEffect(() => {
if (!userId || initialized) return
@ -37,12 +37,42 @@ export function useNotifications(userId: string | null) {
setElectric(electricClient)
// Start syncing notifications for this user
const handle = await electricClient.syncShape<Notification>({
// Start syncing notifications for this user via Electric SQL
// Note: user_id is stored as TEXT in PGlite (UUID from backend is converted)
console.log('Starting Electric SQL sync for user:', userId)
// Use string format for WHERE clause (PGlite sync plugin expects this format)
// The user_id is a UUID string, so we need to quote it properly
const handle = await electricClient.syncShape({
table: 'notifications',
where: `user_id = '${userId}'`,
primaryKey: ['id'],
})
console.log('Electric SQL sync started:', {
isUpToDate: handle.isUpToDate,
hasStream: !!handle.stream,
hasInitialSyncPromise: !!handle.initialSyncPromise,
})
// Wait for initial sync to complete if the promise is available
if (handle.initialSyncPromise) {
console.log('Waiting for initial sync to complete...')
try {
await handle.initialSyncPromise
console.log('Initial sync promise resolved, checking status:', {
isUpToDate: handle.isUpToDate,
})
} catch (syncErr) {
console.error('Initial sync failed:', syncErr)
}
}
// Check status after waiting
console.log('Sync status after waiting:', {
isUpToDate: handle.isUpToDate,
hasStream: !!handle.stream,
})
if (!mounted) {
handle.unsubscribe()
@ -53,8 +83,113 @@ export function useNotifications(userId: string | null) {
setInitialized(true)
setError(null)
// Initial fetch
// Fetch notifications after sync is complete (we already waited above)
await fetchNotifications(electricClient.db)
// Set up real-time updates using PGlite live queries
// Electric SQL syncs data to PGlite in real-time via WebSocket/HTTP
// PGlite live queries detect when the synced data changes and trigger callbacks
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const db = electricClient.db as any
// Use PGlite's live query API for real-time updates
// Based on latest PGlite docs: db.live.query(query, params, callback)
if (db.live?.query && typeof db.live.query === 'function') {
const liveQuery = db.live.query(
`SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`,
[userId],
(result: { rows: Notification[] }) => {
// This callback fires automatically when Electric SQL syncs changes
if (mounted) {
setNotifications(result.rows)
}
}
)
// Set initial results immediately
if (liveQuery.initialResults) {
setNotifications(liveQuery.initialResults.rows)
}
if (mounted && liveQuery && typeof liveQuery.unsubscribe === 'function') {
liveQueryRef.current = liveQuery
console.log('✅ Real-time notifications enabled via PGlite live queries')
}
} else {
// Fallback: Monitor sync handle for updates
// Electric SQL's syncShape should trigger updates, but we need to detect them
// This is a lightweight approach that only checks when sync indicates changes
console.warn('PGlite live queries not available - using sync-based change detection')
let lastNotificationIds = new Set<number>()
const checkForSyncUpdates = async () => {
if (!mounted) return
try {
const result = await electricClient.db.query<Notification>(
`SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`,
[userId]
)
// PGlite query returns { rows: [] } format
const rows = result.rows || []
// Only update if data actually changed
const currentIds = new Set(rows.map(r => r.id))
const currentHash = JSON.stringify(
rows.map(r => ({ id: r.id, read: r.read, updated_at: r.updated_at }))
)
// Check if IDs changed (new/deleted notifications)
const idsChanged =
currentIds.size !== lastNotificationIds.size ||
[...currentIds].some(id => !lastNotificationIds.has(id)) ||
[...lastNotificationIds].some(id => !currentIds.has(id))
if (idsChanged) {
setNotifications(rows)
lastNotificationIds = currentIds
} else {
// Check if any notification properties changed (e.g., read status)
// Compare with current state
setNotifications(prev => {
const prevHash = JSON.stringify(
prev.map(r => ({ id: r.id, read: r.read, updated_at: r.updated_at }))
)
if (prevHash !== currentHash) {
return rows
}
return prev
})
}
} catch (err) {
console.error('Failed to check for notification updates:', err)
}
// Check again after a short delay (Electric SQL syncs are fast)
if (mounted) {
setTimeout(checkForSyncUpdates, 500) // Check every 500ms - Electric SQL syncs are near-instant
}
}
// Start monitoring
checkForSyncUpdates()
liveQueryRef.current = {
unsubscribe: () => {
mounted = false
}
}
}
} catch (liveErr) {
console.warn('Failed to set up real-time updates:', liveErr)
// Minimal fallback - this should rarely be needed
liveQueryRef.current = {
unsubscribe: () => {}
}
}
} catch (err) {
if (!mounted) return
console.error('Failed to initialize Electric SQL:', err)
@ -66,17 +201,29 @@ export function useNotifications(userId: string | null) {
async function fetchNotifications(db: InstanceType<typeof import('@electric-sql/pglite').PGlite>) {
try {
// Debug: Check all notifications first
const allNotifications = await db.query<Notification>(
`SELECT * FROM notifications ORDER BY created_at DESC`
)
console.log('All notifications in PGlite:', allNotifications.rows?.length || 0, allNotifications.rows)
// Use PGlite's query method (not exec for SELECT queries)
const result = await db.query<Notification>(
`SELECT * FROM notifications
WHERE user_id = $1
ORDER BY created_at DESC`,
[userId]
)
console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows)
if (mounted) {
setNotifications(result.rows)
// PGlite query returns { rows: [] } format
setNotifications(result.rows || [])
}
} catch (err) {
console.error('Failed to fetch notifications:', err)
// Log more details for debugging
console.error('Error details:', err)
}
}
@ -88,38 +235,13 @@ export function useNotifications(userId: string | null) {
syncHandleRef.current.unsubscribe()
syncHandleRef.current = null
}
if (liveQueryRef.current) {
liveQueryRef.current.unsubscribe()
liveQueryRef.current = null
}
}
}, [userId, initialized])
// Poll for updates (PGlite doesn't have live queries like the old electric-sql)
useEffect(() => {
if (!electric || !userId || !initialized) return
const fetchNotifications = async () => {
try {
const result = await electric.db.query<Notification>(
`SELECT * FROM notifications
WHERE user_id = $1
ORDER BY created_at DESC`,
[userId]
)
setNotifications(result.rows)
} catch (err) {
console.error('Failed to fetch notifications:', err)
}
}
// Poll every 2 seconds for updates
pollIntervalRef.current = setInterval(fetchNotifications, 2000)
return () => {
if (pollIntervalRef.current) {
clearInterval(pollIntervalRef.current)
pollIntervalRef.current = null
}
}
}, [electric, userId, initialized])
// Mark notification as read (local only - needs backend sync)
const markAsRead = useCallback(
async (notificationId: number) => {
@ -130,7 +252,7 @@ export function useNotifications(userId: string | null) {
try {
// Update locally in PGlite
await electric.db.exec(
await electric.db.query(
`UPDATE notifications SET read = true, updated_at = NOW() WHERE id = $1`,
[notificationId]
)

View file

@ -9,11 +9,12 @@
import { PGlite } from '@electric-sql/pglite'
import { electricSync } from '@electric-sql/pglite-sync'
import { live } from '@electric-sql/pglite/live'
// Types
export interface ElectricClient {
db: PGlite
syncShape: <T = Record<string, unknown>>(options: SyncShapeOptions) => Promise<SyncHandle<T>>
syncShape: (options: SyncShapeOptions) => Promise<SyncHandle>
}
export interface SyncShapeOptions {
@ -23,13 +24,13 @@ export interface SyncShapeOptions {
primaryKey?: string[]
}
export interface SyncHandle<T = Record<string, unknown>> {
export interface SyncHandle {
unsubscribe: () => void
isUpToDate: boolean
shape: {
handle?: string
offset?: string
}
readonly isUpToDate: boolean
// The stream property contains the ShapeStreamInterface from pglite-sync
stream?: unknown
// Promise that resolves when initial sync is complete
initialSyncPromise?: Promise<void>
}
// Singleton instance
@ -37,6 +38,10 @@ let electricClient: ElectricClient | null = null
let isInitializing = false
let initPromise: Promise<ElectricClient> | null = null
// Version for sync state - increment this to force fresh sync when Electric config changes
// Incremented to v4 to fix sync completion issues
const SYNC_VERSION = 4
// Get Electric URL from environment
function getElectricUrl(): string {
if (typeof window !== 'undefined') {
@ -60,11 +65,15 @@ export async function initElectric(): Promise<ElectricClient> {
isInitializing = true
initPromise = (async () => {
try {
// Create PGlite instance with Electric sync plugin
const db = await PGlite.create('idb://surfsense-notifications', {
// Create PGlite instance with Electric sync plugin and live queries
// Include version in database name to force fresh sync when Electric config changes
const db = await PGlite.create({
dataDir: `idb://surfsense-notifications-v${SYNC_VERSION}`,
relaxedDurability: true,
extensions: {
electric: electricSync(),
// Enable debug mode in electricSync to see detailed sync logs
electric: electricSync({ debug: true }),
live, // Enable live queries for real-time updates
},
})
@ -93,36 +102,185 @@ export async function initElectric(): Promise<ElectricClient> {
// Create the client wrapper
electricClient = {
db,
syncShape: async <T = Record<string, unknown>>(options: SyncShapeOptions): Promise<SyncHandle<T>> => {
syncShape: async (options: SyncShapeOptions): Promise<SyncHandle> => {
const { table, where, columns, primaryKey = ['id'] } = options
// Build params for the shape request
const params: Record<string, string> = { table }
if (where) params.where = where
if (columns) params.columns = columns.join(',')
// Build params for the shape request
// Electric SQL expects params as URL query parameters
const params: Record<string, string> = { table }
// Validate and fix WHERE clause to ensure string literals are properly quoted
let validatedWhere = where
if (where) {
// Check if where uses positional parameters
if (where.includes('$1')) {
// Extract the value from the where clause if it's embedded
// For now, we'll use the where clause as-is and let Electric handle it
params.where = where
validatedWhere = where
} else {
// Validate that string literals are properly quoted
// Count single quotes - should be even (pairs) for properly quoted strings
const singleQuoteCount = (where.match(/'/g) || []).length
if (singleQuoteCount % 2 !== 0) {
// Odd number of quotes means unterminated string literal
console.warn('Where clause has unmatched quotes, fixing:', where)
// Add closing quote at the end
validatedWhere = `${where}'`
params.where = validatedWhere
} else {
// Use the where clause directly (already formatted)
params.where = where
validatedWhere = where
}
}
}
if (columns) params.columns = columns.join(',')
// Use PGlite's electric sync plugin to sync the shape
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const shape = await (db as any).electric.syncShapeToTable({
console.log('Syncing shape with params:', params)
console.log('Electric URL:', `${electricUrl}/v1/shape`)
console.log('Where clause:', where, 'Validated:', validatedWhere)
try {
// Debug: Test Electric SQL connection directly first
// Use validatedWhere to ensure proper URL encoding
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ''}`
console.log('Testing Electric SQL directly:', testUrl)
try {
const testResponse = await fetch(testUrl)
const testHeaders = {
handle: testResponse.headers.get('electric-handle'),
offset: testResponse.headers.get('electric-offset'),
upToDate: testResponse.headers.get('electric-up-to-date'),
}
console.log('Direct Electric SQL response headers:', testHeaders)
const testData = await testResponse.json()
console.log('Direct Electric SQL data count:', Array.isArray(testData) ? testData.length : 'not array', testData)
} catch (testErr) {
console.error('Direct Electric SQL test failed:', testErr)
}
// Use PGlite's electric sync plugin to sync the shape
// According to Electric SQL docs, the shape config uses params for table, where, columns
// Note: mapColumns is OPTIONAL per pglite-sync types.ts
// Create a promise that resolves when initial sync is complete
let resolveInitialSync: () => void
let rejectInitialSync: (error: Error) => void
const initialSyncPromise = new Promise<void>((resolve, reject) => {
resolveInitialSync = resolve
rejectInitialSync = reject
// Safety timeout - if sync doesn't complete in 30s, something is wrong
setTimeout(() => {
console.warn(`⚠️ Sync timeout for ${table} - sync did not complete in 30s`)
resolve() // Resolve anyway to not block, but log warning
}, 30000)
})
const shapeConfig = {
shape: {
url: `${electricUrl}/v1/shape`,
params,
params: {
table,
...(validatedWhere ? { where: validatedWhere } : {}),
...(columns ? { columns: columns.join(',') } : {}),
},
},
table,
primaryKey,
shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, '_') || 'all'}`, // Versioned key to force fresh sync when needed
onInitialSync: () => {
console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`)
resolveInitialSync()
},
onError: (error: Error) => {
console.error(`❌ Shape sync error for ${table}:`, error)
console.error('Error details:', JSON.stringify(error, Object.getOwnPropertyNames(error)))
rejectInitialSync(error)
},
}
console.log('syncShapeToTable config:', JSON.stringify(shapeConfig, null, 2))
// Type assertion to PGlite with electric extension
const pgWithElectric = db as PGlite & { electric: { syncShapeToTable: (config: typeof shapeConfig) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }> } }
const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig)
if (!shape) {
throw new Error('syncShapeToTable returned undefined')
}
// Log the actual shape result structure
console.log('Shape sync result (initial):', {
hasUnsubscribe: typeof shape?.unsubscribe === 'function',
isUpToDate: shape?.isUpToDate,
hasStream: !!shape?.stream,
streamType: typeof shape?.stream,
})
// Debug the stream if available
if (shape?.stream) {
const stream = shape.stream as any
console.log('Shape stream details:', {
shapeHandle: stream?.shapeHandle,
lastOffset: stream?.lastOffset,
isUpToDate: stream?.isUpToDate,
error: stream?.error,
hasSubscribe: typeof stream?.subscribe === 'function',
hasUnsubscribe: typeof stream?.unsubscribe === 'function',
})
// Try to subscribe to the stream to see if it's receiving messages
if (typeof stream?.subscribe === 'function') {
console.log('Subscribing to shape stream for debugging...')
stream.subscribe((messages: unknown[]) => {
console.log('🔵 Shape stream received messages:', messages?.length || 0)
if (messages && messages.length > 0) {
console.log('First message:', JSON.stringify(messages[0], null, 2))
}
})
}
}
// Wait briefly to see if sync starts
await new Promise(resolve => setTimeout(resolve, 100))
console.log('Shape sync result (after 100ms):', {
isUpToDate: shape?.isUpToDate,
})
// Return the shape handle - isUpToDate is a getter that reflects current state
return {
unsubscribe: () => {
console.log('unsubscribing')
if (shape && typeof shape.unsubscribe === 'function') {
shape.unsubscribe()
}
},
isUpToDate: shape?.isUpToDate ?? false,
shape: {
handle: shape?.handle,
offset: shape?.offset,
// Use getter to always return current state
get isUpToDate() {
return shape?.isUpToDate ?? false
},
stream: shape?.stream,
initialSyncPromise, // Expose promise so callers can wait for sync
}
} catch (error) {
console.error('Failed to sync shape:', error)
// Check if Electric SQL server is reachable
try {
const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
method: 'GET',
})
console.log('Electric SQL server response:', response.status, response.statusText)
if (!response.ok) {
console.error('Electric SQL server error:', await response.text())
}
} catch (fetchError) {
console.error('Cannot reach Electric SQL server:', fetchError)
console.error('Make sure Electric SQL is running at:', electricUrl)
}
throw error
}
},
}

View file

@ -30,7 +30,7 @@
"@blocknote/react": "^0.45.0",
"@blocknote/server-util": "^0.45.0",
"@electric-sql/client": "^1.4.0",
"@electric-sql/pglite": "^0.2.17",
"@electric-sql/pglite": "^0.3.14",
"@electric-sql/pglite-sync": "^0.4.0",
"@electric-sql/react": "^1.0.26",
"@hookform/resolvers": "^5.2.2",

View file

@ -36,11 +36,11 @@ importers:
specifier: ^1.4.0
version: 1.4.0
'@electric-sql/pglite':
specifier: ^0.2.17
version: 0.2.17
specifier: ^0.3.14
version: 0.3.14
'@electric-sql/pglite-sync':
specifier: ^0.4.0
version: 0.4.0(@electric-sql/pglite@0.2.17)
version: 0.4.0(@electric-sql/pglite@0.3.14)
'@electric-sql/react':
specifier: ^1.0.26
version: 1.0.26(react@19.2.3)
@ -157,7 +157,7 @@ importers:
version: 17.2.3
drizzle-orm:
specifier: ^0.44.5
version: 0.44.7(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7)
version: 0.44.7(@electric-sql/pglite@0.3.14)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7)
emblor:
specifier: ^1.4.8
version: 1.4.8(@types/react-dom@19.2.3(@types/react@19.2.7))(@types/react@19.2.7)(react-dom@19.2.3(react@19.2.3))(react@19.2.3)
@ -575,8 +575,8 @@ packages:
peerDependencies:
'@electric-sql/pglite': 0.3.14
'@electric-sql/pglite@0.2.17':
resolution: {integrity: sha512-qEpKRT2oUaWDH6tjRxLHjdzMqRUGYDnGZlKrnL4dJ77JVMcP2Hpo3NYnOSPKdZdeec57B6QPprCUFg0picx5Pw==}
'@electric-sql/pglite@0.3.14':
resolution: {integrity: sha512-3DB258dhqdsArOI1fIt7cb9RpUOgcDg5hXWVgVHAeqVQ/qxtFy605QKs4gx6mFq3jWsSPqDN8TgSEsqC3OfV9Q==}
'@electric-sql/react@1.0.26':
resolution: {integrity: sha512-cCKLQrtGNaAPBzdLZk97bK/Hue3fKkfL0/aA5HAPzoo7U07/TRzzs4EVRy7q+BV6AONEK+YXxxrzH9gEH8YVQA==}
@ -6760,13 +6760,13 @@ snapshots:
optionalDependencies:
'@rollup/rollup-darwin-arm64': 4.55.1
'@electric-sql/pglite-sync@0.4.0(@electric-sql/pglite@0.2.17)':
'@electric-sql/pglite-sync@0.4.0(@electric-sql/pglite@0.3.14)':
dependencies:
'@electric-sql/client': 1.4.0
'@electric-sql/experimental': 1.0.14(@electric-sql/client@1.4.0)
'@electric-sql/pglite': 0.2.17
'@electric-sql/pglite': 0.3.14
'@electric-sql/pglite@0.2.17': {}
'@electric-sql/pglite@0.3.14': {}
'@electric-sql/react@1.0.26(react@19.2.3)':
dependencies:
@ -9701,9 +9701,9 @@ snapshots:
transitivePeerDependencies:
- supports-color
drizzle-orm@0.44.7(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7):
drizzle-orm@0.44.7(@electric-sql/pglite@0.3.14)(@opentelemetry/api@1.9.0)(@prisma/client@4.8.1)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(pg@8.16.3)(postgres@3.4.7):
optionalDependencies:
'@electric-sql/pglite': 0.2.17
'@electric-sql/pglite': 0.3.14
'@opentelemetry/api': 1.9.0
'@prisma/client': 4.8.1
'@types/pg': 8.16.0