feat: Enhance Electric SQL integration and update notification handling

- Added initialization script for Electric SQL user in Docker setup.
- Updated Electric SQL client to support new PGlite architecture and sync functionality.
- Improved notification fetching and syncing logic in useNotifications hook.
- Refactored ElectricProvider to handle initialization state and errors more gracefully.
- Removed deprecated electric.config.ts file and adjusted package dependencies accordingly.
This commit is contained in:
Anish Sarkar 2026-01-12 14:53:18 +05:30
parent 82c6dd0221
commit f441c7b0ce
10 changed files with 376 additions and 1046 deletions

View file

@ -8,6 +8,7 @@ services:
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/docker/postgresql.conf:/etc/postgresql/postgresql.conf:ro
- ./scripts/docker/init-electric-user.sql:/docker-entrypoint-initdb.d/init-electric-user.sql:ro
environment:
- POSTGRES_USER=${POSTGRES_USER:-postgres}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-postgres}
@ -115,17 +116,16 @@ services:
electric:
image: electricsql/electric:latest
ports:
- "${ELECTRIC_PORT:-5133}:5133"
- "${ELECTRIC_PORT:-5133}:3000"
environment:
- DATABASE_URL=postgresql://electric:electric_password@db:5432/${POSTGRES_DB:-surfsense}?sslmode=disable
- AUTH_MODE=insecure
- ELECTRIC_INSECURE=true
- ELECTRIC_WRITE_TO_PG_MODE=direct
- ELECTRIC_READ_FROM_PG_MODE=direct
depends_on:
- db
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5133/api/health"]
test: ["CMD", "curl", "-f", "http://localhost:3000/v1/health"]
interval: 10s
timeout: 5s
retries: 5

View file

@ -19,5 +19,12 @@ GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO electric;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO electric;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON SEQUENCES TO electric;
-- Note: Electric SQL will create its own publications automatically
-- We don't need to create publications here
-- Create the publication that Electric SQL expects
-- Electric will add tables to this publication when shapes are subscribed
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_publication WHERE pubname = 'electric_publication_default') THEN
CREATE PUBLICATION electric_publication_default;
END IF;
END
$$;

View file

@ -40,6 +40,13 @@ def upgrade() -> None:
op.create_index("ix_notifications_created_at", "notifications", ["created_at"])
op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"])
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
# This allows Electric SQL to track all column values for updates/deletes
op.execute("ALTER TABLE notifications REPLICA IDENTITY FULL;")
# Note: ElectricSQL 1.x dynamically adds tables to the publication when
# clients subscribe to shapes. No need to manually create publications.
def downgrade() -> None:
"""Downgrade schema - remove notifications table."""
@ -48,4 +55,3 @@ def downgrade() -> None:
op.drop_index("ix_notifications_read", table_name="notifications")
op.drop_index("ix_notifications_user_id", table_name="notifications")
op.drop_table("notifications")

View file

@ -1,31 +1,53 @@
"use client"
import { useEffect, useState } from 'react'
import { initElectric } from '@/lib/electric/client'
import { initElectric, isElectricInitialized } from '@/lib/electric/client'
interface ElectricProviderProps {
children: React.ReactNode
}
/**
* ElectricProvider initializes the Electric SQL client with PGlite
*
* This provider ensures Electric is initialized before rendering children,
* but doesn't block if initialization fails (app can still work without real-time sync)
*/
export function ElectricProvider({ children }: ElectricProviderProps) {
const [initialized, setInitialized] = useState(false)
const [error, setError] = useState<Error | null>(null)
useEffect(() => {
// Skip if already initialized
if (isElectricInitialized()) {
setInitialized(true)
return
}
let mounted = true
async function init() {
try {
await initElectric()
setInitialized(true)
setError(null)
if (mounted) {
setInitialized(true)
setError(null)
}
} catch (err) {
console.error('Failed to initialize Electric SQL:', err)
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
// Don't block rendering if Electric SQL fails - app can still work
setInitialized(true)
if (mounted) {
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
// Don't block rendering if Electric SQL fails - app can still work
setInitialized(true)
}
}
}
init()
return () => {
mounted = false
}
}, [])
// Show loading state only briefly, then render children
@ -38,6 +60,10 @@ export function ElectricProvider({ children }: ElectricProviderProps) {
)
}
// If there's an error, still render children but log the error
if (error) {
console.warn('Electric SQL initialization failed, notifications may not sync:', error.message)
}
return <>{children}</>
}

View file

@ -1,17 +0,0 @@
import { defineConfig } from '@electric-sql/cli'
export default defineConfig({
connection: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
database: process.env.POSTGRES_DB || 'surfsense',
user: process.env.ELECTRIC_USER || 'electric',
password: process.env.ELECTRIC_PASSWORD || 'electric_password',
},
outDir: './lib/electric/generated',
service: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
},
})

View file

@ -1,8 +1,7 @@
"use client"
import { useEffect, useState, useCallback } from 'react'
import { useLiveQuery } from 'electric-sql/react'
import { initElectric, getElectric, isElectricInitialized } from '@/lib/electric/client'
import { useEffect, useState, useCallback, useRef } from 'react'
import { initElectric, getElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
export interface Notification {
id: number
@ -12,49 +11,116 @@ export interface Notification {
title: string
message: string
read: boolean
metadata: Record<string, any>
metadata: Record<string, unknown>
created_at: string
updated_at: string | null
}
export function useNotifications(userId: string | null) {
const [electric, setElectric] = useState<any>(null)
const [electric, setElectric] = useState<ElectricClient | null>(null)
const [notifications, setNotifications] = useState<Notification[]>([])
const [initialized, setInitialized] = useState(false)
const [error, setError] = useState<Error | null>(null)
const syncHandleRef = useRef<SyncHandle | null>(null)
const pollIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null)
// Initialize Electric SQL
// Initialize Electric SQL and start syncing
useEffect(() => {
if (!userId || initialized) return
let mounted = true
async function init() {
try {
const electricClient = await initElectric()
if (!mounted) return
setElectric(electricClient)
// Start syncing notifications for this user
const handle = await electricClient.syncShape<Notification>({
table: 'notifications',
where: `user_id = '${userId}'`,
primaryKey: ['id'],
})
if (!mounted) {
handle.unsubscribe()
return
}
syncHandleRef.current = handle
setInitialized(true)
setError(null)
// Initial fetch
await fetchNotifications(electricClient.db)
} catch (err) {
if (!mounted) return
console.error('Failed to initialize Electric SQL:', err)
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
// Still mark as initialized so the UI doesn't block
setInitialized(true)
}
}
async function fetchNotifications(db: InstanceType<typeof import('@electric-sql/pglite').PGlite>) {
try {
const result = await db.query<Notification>(
`SELECT * FROM notifications
WHERE user_id = $1
ORDER BY created_at DESC`,
[userId]
)
if (mounted) {
setNotifications(result.rows)
}
} catch (err) {
console.error('Failed to fetch notifications:', err)
}
}
init()
return () => {
mounted = false
if (syncHandleRef.current) {
syncHandleRef.current.unsubscribe()
syncHandleRef.current = null
}
}
}, [userId, initialized])
// Use live query to get notifications
const { results: notifications } = useLiveQuery(
electric?.db.notifications?.liveMany({
where: {
user_id: userId || '',
read: false,
},
orderBy: {
created_at: 'desc',
},
})
) ?? { results: [] }
// Poll for updates (PGlite doesn't have live queries like the old electric-sql)
useEffect(() => {
if (!electric || !userId || !initialized) return
// Mark notification as read
const fetchNotifications = async () => {
try {
const result = await electric.db.query<Notification>(
`SELECT * FROM notifications
WHERE user_id = $1
ORDER BY created_at DESC`,
[userId]
)
setNotifications(result.rows)
} catch (err) {
console.error('Failed to fetch notifications:', err)
}
}
// Poll every 2 seconds for updates
pollIntervalRef.current = setInterval(fetchNotifications, 2000)
return () => {
if (pollIntervalRef.current) {
clearInterval(pollIntervalRef.current)
pollIntervalRef.current = null
}
}
}, [electric, userId, initialized])
// Mark notification as read (local only - needs backend sync)
const markAsRead = useCallback(
async (notificationId: number) => {
if (!electric || !isElectricInitialized()) {
@ -63,10 +129,20 @@ export function useNotifications(userId: string | null) {
}
try {
await electric.db.notifications.update({
data: { read: true },
where: { id: notificationId },
})
// Update locally in PGlite
await electric.db.exec(
`UPDATE notifications SET read = true, updated_at = NOW() WHERE id = $1`,
[notificationId]
)
// Update local state
setNotifications(prev =>
prev.map(n => n.id === notificationId ? { ...n, read: true } : n)
)
// TODO: Also send to backend to persist the change
// This could be done via a REST API call
return true
} catch (err) {
console.error('Failed to mark notification as read:', err)
@ -84,7 +160,7 @@ export function useNotifications(userId: string | null) {
}
try {
const unread = (notifications || []).filter((n: Notification) => !n.read)
const unread = notifications.filter(n => !n.read)
for (const notification of unread) {
await markAsRead(notification.id)
}
@ -96,10 +172,10 @@ export function useNotifications(userId: string | null) {
}, [electric, notifications, markAsRead])
// Get unread count
const unreadCount = (notifications || []).filter((n: Notification) => !n.read).length
const unreadCount = notifications.filter(n => !n.read).length
return {
notifications: (notifications || []) as Notification[],
notifications,
unreadCount,
markAsRead,
markAllAsRead,
@ -107,4 +183,3 @@ export function useNotifications(userId: string | null) {
error,
}
}

View file

@ -1,24 +1,56 @@
/**
* Electric SQL client setup
* This initializes the Electric SQL client with local PGlite database (PostgreSQL in browser)
* Electric SQL client setup for ElectricSQL 1.x with PGlite
*
* This uses the new ElectricSQL 1.x architecture:
* - PGlite: In-browser PostgreSQL database (local storage)
* - @electric-sql/pglite-sync: Sync plugin to sync Electric shapes into PGlite
* - @electric-sql/client: HTTP client for subscribing to shapes
*/
import { PGlite } from '@electric-sql/pglite'
import { electrify } from 'electric-sql/pglite'
import { getElectricAuthToken } from './auth'
import { electricSync } from '@electric-sql/pglite-sync'
// We'll generate the schema after running electric:generate
// For now, we'll use a placeholder type
type Electric = any
type Schema = any
// Types
export interface ElectricClient {
db: PGlite
syncShape: <T = Record<string, unknown>>(options: SyncShapeOptions) => Promise<SyncHandle<T>>
}
let electric: Electric | null = null
export interface SyncShapeOptions {
table: string
where?: string
columns?: string[]
primaryKey?: string[]
}
export interface SyncHandle<T = Record<string, unknown>> {
unsubscribe: () => void
isUpToDate: boolean
shape: {
handle?: string
offset?: string
}
}
// Singleton instance
let electricClient: ElectricClient | null = null
let isInitializing = false
let initPromise: Promise<Electric> | null = null
let initPromise: Promise<ElectricClient> | null = null
export async function initElectric(): Promise<Electric> {
if (electric) {
return electric
// Get Electric URL from environment
function getElectricUrl(): string {
if (typeof window !== 'undefined') {
return process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133'
}
return 'http://localhost:5133'
}
/**
* Initialize the Electric SQL client with PGlite and sync plugin
*/
export async function initElectric(): Promise<ElectricClient> {
if (electricClient) {
return electricClient
}
if (isInitializing && initPromise) {
@ -28,40 +60,75 @@ export async function initElectric(): Promise<Electric> {
isInitializing = true
initPromise = (async () => {
try {
const config = {
auth: {
token: await getElectricAuthToken(),
},
url: process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133',
}
// Initialize PGlite database (PostgreSQL in browser)
// Use idb:// prefix for IndexedDB storage in browser
// relaxedDurability improves responsiveness by scheduling flush after query returns
const conn = new PGlite('idb://surfsense.db', {
// Create PGlite instance with Electric sync plugin
const db = await PGlite.create('idb://surfsense-notifications', {
relaxedDurability: true,
extensions: {
electric: electricSync(),
},
})
// Import schema (will be generated by electric:generate)
// For now, we'll use a dynamic import that will work after schema generation
let schema: Schema
try {
const schemaModule = await import('./generated/schema')
schema = schemaModule.schema
} catch (error) {
console.warn(
'Electric SQL schema not found. Run "pnpm electric:generate" to generate it.',
error
)
// Return a mock electric client for now
return null as any
// Create the notifications table schema in PGlite
// This matches the backend schema
await db.exec(`
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY,
user_id TEXT NOT NULL,
search_space_id INTEGER,
type TEXT NOT NULL,
title TEXT NOT NULL,
message TEXT NOT NULL,
read BOOLEAN NOT NULL DEFAULT FALSE,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notifications(user_id);
CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(read);
`)
const electricUrl = getElectricUrl()
// Create the client wrapper
electricClient = {
db,
syncShape: async <T = Record<string, unknown>>(options: SyncShapeOptions): Promise<SyncHandle<T>> => {
const { table, where, columns, primaryKey = ['id'] } = options
// Build params for the shape request
const params: Record<string, string> = { table }
if (where) params.where = where
if (columns) params.columns = columns.join(',')
// Use PGlite's electric sync plugin to sync the shape
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const shape = await (db as any).electric.syncShapeToTable({
shape: {
url: `${electricUrl}/v1/shape`,
params,
},
table,
primaryKey,
})
return {
unsubscribe: () => {
if (shape && typeof shape.unsubscribe === 'function') {
shape.unsubscribe()
}
},
isUpToDate: shape?.isUpToDate ?? false,
shape: {
handle: shape?.handle,
offset: shape?.offset,
},
}
},
}
// Electrify the PGlite database connection
electric = await electrify(conn, schema, config)
console.log('Electric SQL initialized successfully with PGlite')
return electric
return electricClient
} catch (error) {
console.error('Failed to initialize Electric SQL:', error)
throw error
@ -73,14 +140,26 @@ export async function initElectric(): Promise<Electric> {
return initPromise
}
export function getElectric(): Electric {
if (!electric) {
/**
* Get the Electric client (throws if not initialized)
*/
export function getElectric(): ElectricClient {
if (!electricClient) {
throw new Error('Electric not initialized. Call initElectric() first.')
}
return electric
return electricClient
}
/**
* Check if Electric is initialized
*/
export function isElectricInitialized(): boolean {
return electric !== null
return electricClient !== null
}
/**
* Get the PGlite database instance
*/
export function getDb(): PGlite | null {
return electricClient?.db ?? null
}

View file

@ -1,19 +0,0 @@
/**
* Electric SQL configuration
* This file will be used by @electric-sql/cli to generate the schema
*/
export const electricConfig = {
connection: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
database: process.env.POSTGRES_DB || 'surfsense',
user: process.env.ELECTRIC_USER || 'electric',
password: process.env.ELECTRIC_PASSWORD || 'electric_password',
},
service: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
},
}

View file

@ -18,9 +18,7 @@
"db:migrate": "drizzle-kit migrate",
"db:push": "drizzle-kit push",
"db:studio": "drizzle-kit studio",
"format:fix": "npx @biomejs/biome check --fix",
"electric:generate": "electric generate",
"electric:watch": "electric watch"
"format:fix": "npx @biomejs/biome check --fix"
},
"dependencies": {
"@ai-sdk/react": "^1.2.12",
@ -33,6 +31,8 @@
"@blocknote/server-util": "^0.45.0",
"@electric-sql/client": "^1.4.0",
"@electric-sql/pglite": "^0.2.17",
"@electric-sql/pglite-sync": "^0.4.0",
"@electric-sql/react": "^1.0.26",
"@hookform/resolvers": "^5.2.2",
"@number-flow/react": "^0.5.10",
"@posthog/react": "^1.5.2",
@ -71,7 +71,6 @@
"date-fns": "^4.1.0",
"dotenv": "^17.2.3",
"drizzle-orm": "^0.44.5",
"electric-sql": "^0.12.1",
"emblor": "^1.4.8",
"fumadocs-core": "^16.3.1",
"fumadocs-mdx": "^14.2.1",
@ -110,7 +109,6 @@
},
"devDependencies": {
"@biomejs/biome": "2.1.2",
"@electric-sql/cli": "0.11.4-canary.cb19c58",
"@eslint/eslintrc": "^3.3.1",
"@tailwindcss/postcss": "^4.1.11",
"@tailwindcss/typography": "^0.5.16",

File diff suppressed because it is too large Load diff