feat: Integrate Electric SQL for real-time notifications and enhance PostgreSQL configuration

- Added Electric SQL service to docker-compose for real-time data synchronization.
- Introduced PostgreSQL configuration for logical replication and performance tuning.
- Created scripts for initializing Electric SQL user and electrifying tables.
- Implemented notification model and service in the backend.
- Developed ElectricProvider and useNotifications hook in the frontend for managing notifications.
- Updated environment variables and package dependencies for Electric SQL integration.
This commit is contained in:
Anish Sarkar 2026-01-12 12:47:00 +05:30
parent 383592ce63
commit 82c6dd0221
18 changed files with 1844 additions and 6 deletions

View file

@ -7,10 +7,12 @@ services:
- "${POSTGRES_PORT:-5432}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/docker/postgresql.conf:/etc/postgresql/postgresql.conf:ro
environment:
- POSTGRES_USER=${POSTGRES_USER:-postgres}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-postgres}
- POSTGRES_DB=${POSTGRES_DB:-surfsense}
command: postgres -c config_file=/etc/postgresql/postgresql.conf
pgadmin:
image: dpage/pgadmin4
@ -110,6 +112,24 @@ services:
# - redis
# - celery_worker
electric:
image: electricsql/electric:latest
ports:
- "${ELECTRIC_PORT:-5133}:5133"
environment:
- DATABASE_URL=postgresql://electric:electric_password@db:5432/${POSTGRES_DB:-surfsense}?sslmode=disable
- AUTH_MODE=insecure
- ELECTRIC_WRITE_TO_PG_MODE=direct
- ELECTRIC_READ_FROM_PG_MODE=direct
depends_on:
- db
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5133/api/health"]
interval: 10s
timeout: 5s
retries: 5
frontend:
build:
context: ./surfsense_web
@ -122,8 +142,12 @@ services:
- "${FRONTEND_PORT:-3000}:3000"
env_file:
- ./surfsense_web/.env
environment:
- NEXT_PUBLIC_ELECTRIC_URL=${NEXT_PUBLIC_ELECTRIC_URL:-http://localhost:5133}
- NEXT_PUBLIC_ELECTRIC_AUTH_MODE=insecure
depends_on:
- backend
- electric
volumes:
postgres_data:

View file

@ -0,0 +1,11 @@
-- Electrify tables for Electric SQL sync
-- This tells Electric SQL which tables to sync
-- Run this after running migrations
-- Electrify notifications table
ALTER TABLE notifications ENABLE ELECTRIC;
-- You can electrify other tables as needed:
-- ALTER TABLE documents ENABLE ELECTRIC;
-- ALTER TABLE logs ENABLE ELECTRIC;

View file

@ -0,0 +1,23 @@
-- Create Electric SQL replication user
-- This script is run during PostgreSQL initialization
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'electric') THEN
CREATE USER electric WITH REPLICATION PASSWORD 'electric_password';
END IF;
END
$$;
-- Grant necessary permissions
GRANT CONNECT ON DATABASE surfsense TO electric;
GRANT USAGE ON SCHEMA public TO electric;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO electric;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO electric;
-- Grant permissions on future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO electric;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON SEQUENCES TO electric;
-- Note: Electric SQL will create its own publications automatically
-- We don't need to create publications here

View file

@ -23,8 +23,18 @@ fi
# Configure PostgreSQL
cat >> "$PGDATA/postgresql.conf" << EOF
listen_addresses = '*'
max_connections = 100
shared_buffers = 128MB
max_connections = 200
shared_buffers = 256MB
# Enable logical replication (required for Electric SQL)
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
# Performance settings
checkpoint_timeout = 10min
max_wal_size = 1GB
min_wal_size = 80MB
EOF
cat >> "$PGDATA/pg_hba.conf" << EOF
@ -45,6 +55,15 @@ CREATE USER $POSTGRES_USER WITH PASSWORD '$POSTGRES_PASSWORD' SUPERUSER;
CREATE DATABASE $POSTGRES_DB OWNER $POSTGRES_USER;
\c $POSTGRES_DB
CREATE EXTENSION IF NOT EXISTS vector;
-- Create Electric SQL replication user
CREATE USER electric WITH REPLICATION PASSWORD 'electric_password';
GRANT CONNECT ON DATABASE $POSTGRES_DB TO electric;
GRANT USAGE ON SCHEMA public TO electric;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO electric;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO electric;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO electric;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON SEQUENCES TO electric;
EOF
echo "PostgreSQL initialized successfully."

View file

@ -0,0 +1,20 @@
# PostgreSQL configuration for Electric SQL
# This file is mounted into the PostgreSQL container
listen_addresses = '*'
max_connections = 200
shared_buffers = 256MB
# Enable logical replication (required for Electric SQL)
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
# Performance settings
checkpoint_timeout = 10min
max_wal_size = 1GB
min_wal_size = 80MB
# Logging (optional, for debugging)
# log_statement = 'all'
# log_replication_commands = on

View file

@ -0,0 +1,51 @@
"""Add notifications table
Revision ID: 60
Revises: 59
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "60"
down_revision: str | None = "59"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema - add notifications table."""
# Create notifications table
op.execute(
"""
CREATE TABLE IF NOT EXISTS notifications (
id SERIAL PRIMARY KEY,
user_id UUID NOT NULL REFERENCES "user"(id) ON DELETE CASCADE,
search_space_id INTEGER REFERENCES searchspaces(id) ON DELETE CASCADE,
type VARCHAR(50) NOT NULL,
title VARCHAR(200) NOT NULL,
message TEXT NOT NULL,
read BOOLEAN NOT NULL DEFAULT FALSE,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
"""
)
# Create indexes
op.create_index("ix_notifications_user_id", "notifications", ["user_id"])
op.create_index("ix_notifications_read", "notifications", ["read"])
op.create_index("ix_notifications_created_at", "notifications", ["created_at"])
op.create_index("ix_notifications_user_read", "notifications", ["user_id", "read"])
def downgrade() -> None:
"""Downgrade schema - remove notifications table."""
op.drop_index("ix_notifications_user_read", table_name="notifications")
op.drop_index("ix_notifications_created_at", table_name="notifications")
op.drop_index("ix_notifications_read", table_name="notifications")
op.drop_index("ix_notifications_user_id", table_name="notifications")
op.drop_table("notifications")

View file

@ -492,6 +492,12 @@ class SearchSpace(BaseModel, TimestampMixin):
order_by="Log.id",
cascade="all, delete-orphan",
)
notifications = relationship(
"Notification",
back_populates="search_space",
order_by="Notification.created_at.desc()",
cascade="all, delete-orphan",
)
search_source_connectors = relationship(
"SearchSourceConnector",
back_populates="search_space",
@ -629,6 +635,25 @@ class Log(BaseModel, TimestampMixin):
search_space = relationship("SearchSpace", back_populates="logs")
class Notification(BaseModel, TimestampMixin):
__tablename__ = "notifications"
user_id = Column(
UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False, index=True
)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True
)
type = Column(String(50), nullable=False) # 'document_processed', 'connector_indexed', 'user_mentioned', etc.
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)
read = Column(Boolean, nullable=False, default=False, server_default=text("false"), index=True)
notification_metadata = Column("metadata", JSONB, nullable=True, default={})
user = relationship("User", back_populates="notifications")
search_space = relationship("SearchSpace", back_populates="notifications")
class SearchSpaceRole(BaseModel, TimestampMixin):
"""
Custom roles that can be defined per search space.
@ -773,6 +798,12 @@ if config.AUTH_TYPE == "GOOGLE":
"OAuthAccount", lazy="joined"
)
search_spaces = relationship("SearchSpace", back_populates="user")
notifications = relationship(
"Notification",
back_populates="user",
order_by="Notification.created_at.desc()",
cascade="all, delete-orphan",
)
# RBAC relationships
search_space_memberships = relationship(
@ -799,6 +830,12 @@ else:
class User(SQLAlchemyBaseUserTableUUID, Base):
search_spaces = relationship("SearchSpace", back_populates="user")
notifications = relationship(
"Notification",
back_populates="user",
order_by="Notification.created_at.desc()",
cascade="all, delete-orphan",
)
# RBAC relationships
search_space_memberships = relationship(

View file

@ -0,0 +1,140 @@
"""Service for creating and managing notifications."""
import logging
from typing import Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Notification
logger = logging.getLogger(__name__)
class NotificationService:
"""Service for creating notifications that sync via Electric SQL."""
@staticmethod
async def create_notification(
session: AsyncSession,
user_id: UUID,
notification_type: str,
title: str,
message: str,
search_space_id: int | None = None,
notification_metadata: dict[str, Any] | None = None,
) -> Notification:
"""
Create a notification - Electric SQL will automatically sync it to frontend.
Args:
session: Database session
user_id: User to notify
notification_type: Type of notification (e.g., 'document_processed', 'connector_indexed')
title: Notification title
message: Notification message
search_space_id: Optional search space ID
notification_metadata: Optional metadata dictionary
Returns:
Notification: The created notification
"""
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=notification_type,
title=title,
message=message,
notification_metadata=notification_metadata or {},
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(f"Created notification {notification.id} for user {user_id}")
return notification
@staticmethod
async def create_document_processed_notification(
session: AsyncSession,
user_id: UUID,
document_id: int,
document_title: str,
status: str,
search_space_id: int,
) -> Notification:
"""
Create notification when document processing completes.
Args:
session: Database session
user_id: User to notify
document_id: ID of the processed document
document_title: Title of the document
status: Processing status ('SUCCESS', 'FAILED')
search_space_id: Search space ID
Returns:
Notification: The created notification
"""
status_lower = status.lower()
title = f"Document processed: {document_title}"
message = f'Your document "{document_title}" has been {status_lower}.'
return await NotificationService.create_notification(
session=session,
user_id=user_id,
notification_type="document_processed",
title=title,
message=message,
search_space_id=search_space_id,
notification_metadata={
"document_id": document_id,
"status": status,
},
)
@staticmethod
async def create_connector_indexed_notification(
session: AsyncSession,
user_id: UUID,
connector_name: str,
connector_type: str,
status: str,
search_space_id: int,
indexed_count: int | None = None,
) -> Notification:
"""
Create notification when connector indexing completes.
Args:
session: Database session
user_id: User to notify
connector_name: Name of the connector
connector_type: Type of connector
status: Indexing status ('SUCCESS', 'FAILED')
search_space_id: Search space ID
indexed_count: Number of items indexed (optional)
Returns:
Notification: The created notification
"""
status_lower = status.lower()
title = f"Connector indexed: {connector_name}"
message = f'Your connector "{connector_name}" has finished indexing ({status_lower}).'
if indexed_count is not None:
message += f" {indexed_count} items indexed."
return await NotificationService.create_notification(
session=session,
user_id=user_id,
notification_type="connector_indexed",
title=title,
message=message,
search_space_id=search_space_id,
notification_metadata={
"connector_name": connector_name,
"connector_type": connector_type,
"status": status,
"indexed_count": indexed_count,
},
)

View file

@ -1,5 +1,10 @@
NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000
NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL or GOOGLE
NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING
# Electric SQL
NEXT_PUBLIC_ELECTRIC_URL=http://localhost:5133
NEXT_PUBLIC_ELECTRIC_AUTH_MODE=insecure
# Contact Form Vars - OPTIONAL
DATABASE_URL=postgresql://postgres:[YOUR-PASSWORD]@db.sdsf.supabase.co:5432/postgres

View file

@ -2,6 +2,7 @@ import type { Metadata } from "next";
import "./globals.css";
import { RootProvider } from "fumadocs-ui/provider/next";
import { Roboto } from "next/font/google";
import { ElectricProvider } from "@/components/providers/ElectricProvider";
import { I18nProvider } from "@/components/providers/I18nProvider";
import { PostHogProvider } from "@/components/providers/PostHogProvider";
import { ThemeProvider } from "@/components/theme/theme-provider";
@ -102,7 +103,9 @@ export default function RootLayout({
defaultTheme="light"
>
<RootProvider>
<ReactQueryClientProvider>{children}</ReactQueryClientProvider>
<ReactQueryClientProvider>
<ElectricProvider>{children}</ElectricProvider>
</ReactQueryClientProvider>
<Toaster />
</RootProvider>
</ThemeProvider>

View file

@ -0,0 +1,43 @@
"use client"
import { useEffect, useState } from 'react'
import { initElectric } from '@/lib/electric/client'
interface ElectricProviderProps {
children: React.ReactNode
}
export function ElectricProvider({ children }: ElectricProviderProps) {
const [initialized, setInitialized] = useState(false)
const [error, setError] = useState<Error | null>(null)
useEffect(() => {
async function init() {
try {
await initElectric()
setInitialized(true)
setError(null)
} catch (err) {
console.error('Failed to initialize Electric SQL:', err)
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
// Don't block rendering if Electric SQL fails - app can still work
setInitialized(true)
}
}
init()
}, [])
// Show loading state only briefly, then render children
// Electric SQL will sync in the background
if (!initialized) {
return (
<div className="flex items-center justify-center min-h-screen">
<div className="text-muted-foreground">Initializing...</div>
</div>
)
}
return <>{children}</>
}

View file

@ -0,0 +1,17 @@
import { defineConfig } from '@electric-sql/cli'
export default defineConfig({
connection: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
database: process.env.POSTGRES_DB || 'surfsense',
user: process.env.ELECTRIC_USER || 'electric',
password: process.env.ELECTRIC_PASSWORD || 'electric_password',
},
outDir: './lib/electric/generated',
service: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
},
})

View file

@ -0,0 +1,110 @@
"use client"
import { useEffect, useState, useCallback } from 'react'
import { useLiveQuery } from 'electric-sql/react'
import { initElectric, getElectric, isElectricInitialized } from '@/lib/electric/client'
export interface Notification {
id: number
user_id: string
search_space_id: number | null
type: string
title: string
message: string
read: boolean
metadata: Record<string, any>
created_at: string
updated_at: string | null
}
export function useNotifications(userId: string | null) {
const [electric, setElectric] = useState<any>(null)
const [initialized, setInitialized] = useState(false)
const [error, setError] = useState<Error | null>(null)
// Initialize Electric SQL
useEffect(() => {
if (!userId || initialized) return
async function init() {
try {
const electricClient = await initElectric()
setElectric(electricClient)
setInitialized(true)
setError(null)
} catch (err) {
console.error('Failed to initialize Electric SQL:', err)
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
}
}
init()
}, [userId, initialized])
// Use live query to get notifications
const { results: notifications } = useLiveQuery(
electric?.db.notifications?.liveMany({
where: {
user_id: userId || '',
read: false,
},
orderBy: {
created_at: 'desc',
},
})
) ?? { results: [] }
// Mark notification as read
const markAsRead = useCallback(
async (notificationId: number) => {
if (!electric || !isElectricInitialized()) {
console.warn('Electric SQL not initialized')
return false
}
try {
await electric.db.notifications.update({
data: { read: true },
where: { id: notificationId },
})
return true
} catch (err) {
console.error('Failed to mark notification as read:', err)
return false
}
},
[electric]
)
// Mark all notifications as read
const markAllAsRead = useCallback(async () => {
if (!electric || !isElectricInitialized()) {
console.warn('Electric SQL not initialized')
return false
}
try {
const unread = (notifications || []).filter((n: Notification) => !n.read)
for (const notification of unread) {
await markAsRead(notification.id)
}
return true
} catch (err) {
console.error('Failed to mark all notifications as read:', err)
return false
}
}, [electric, notifications, markAsRead])
// Get unread count
const unreadCount = (notifications || []).filter((n: Notification) => !n.read).length
return {
notifications: (notifications || []) as Notification[],
unreadCount,
markAsRead,
markAllAsRead,
loading: !initialized,
error,
}
}

View file

@ -0,0 +1,21 @@
/**
* Get auth token for Electric SQL
* In production, this should get the token from your auth system
*/
export async function getElectricAuthToken(): Promise<string> {
// For insecure mode (development), return empty string
if (process.env.NEXT_PUBLIC_ELECTRIC_AUTH_MODE === 'insecure') {
return ''
}
// In production, get token from your auth system
// This should match your backend auth token
if (typeof window !== 'undefined') {
const token = localStorage.getItem('surfsense_bearer_token')
return token || ''
}
return ''
}

View file

@ -0,0 +1,86 @@
/**
* Electric SQL client setup
* This initializes the Electric SQL client with local PGlite database (PostgreSQL in browser)
*/
import { PGlite } from '@electric-sql/pglite'
import { electrify } from 'electric-sql/pglite'
import { getElectricAuthToken } from './auth'
// We'll generate the schema after running electric:generate
// For now, we'll use a placeholder type
type Electric = any
type Schema = any
let electric: Electric | null = null
let isInitializing = false
let initPromise: Promise<Electric> | null = null
export async function initElectric(): Promise<Electric> {
if (electric) {
return electric
}
if (isInitializing && initPromise) {
return initPromise
}
isInitializing = true
initPromise = (async () => {
try {
const config = {
auth: {
token: await getElectricAuthToken(),
},
url: process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133',
}
// Initialize PGlite database (PostgreSQL in browser)
// Use idb:// prefix for IndexedDB storage in browser
// relaxedDurability improves responsiveness by scheduling flush after query returns
const conn = new PGlite('idb://surfsense.db', {
relaxedDurability: true,
})
// Import schema (will be generated by electric:generate)
// For now, we'll use a dynamic import that will work after schema generation
let schema: Schema
try {
const schemaModule = await import('./generated/schema')
schema = schemaModule.schema
} catch (error) {
console.warn(
'Electric SQL schema not found. Run "pnpm electric:generate" to generate it.',
error
)
// Return a mock electric client for now
return null as any
}
// Electrify the PGlite database connection
electric = await electrify(conn, schema, config)
console.log('Electric SQL initialized successfully with PGlite')
return electric
} catch (error) {
console.error('Failed to initialize Electric SQL:', error)
throw error
} finally {
isInitializing = false
}
})()
return initPromise
}
export function getElectric(): Electric {
if (!electric) {
throw new Error('Electric not initialized. Call initElectric() first.')
}
return electric
}
export function isElectricInitialized(): boolean {
return electric !== null
}

View file

@ -0,0 +1,19 @@
/**
* Electric SQL configuration
* This file will be used by @electric-sql/cli to generate the schema
*/
export const electricConfig = {
connection: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
database: process.env.POSTGRES_DB || 'surfsense',
user: process.env.ELECTRIC_USER || 'electric',
password: process.env.ELECTRIC_PASSWORD || 'electric_password',
},
service: {
host: process.env.ELECTRIC_HOST || 'localhost',
port: parseInt(process.env.ELECTRIC_PORT || '5133', 10),
},
}

View file

@ -18,7 +18,9 @@
"db:migrate": "drizzle-kit migrate",
"db:push": "drizzle-kit push",
"db:studio": "drizzle-kit studio",
"format:fix": "npx @biomejs/biome check --fix"
"format:fix": "npx @biomejs/biome check --fix",
"electric:generate": "electric generate",
"electric:watch": "electric watch"
},
"dependencies": {
"@ai-sdk/react": "^1.2.12",
@ -29,6 +31,8 @@
"@blocknote/mantine": "^0.45.0",
"@blocknote/react": "^0.45.0",
"@blocknote/server-util": "^0.45.0",
"@electric-sql/client": "^1.4.0",
"@electric-sql/pglite": "^0.2.17",
"@hookform/resolvers": "^5.2.2",
"@number-flow/react": "^0.5.10",
"@posthog/react": "^1.5.2",
@ -67,6 +71,7 @@
"date-fns": "^4.1.0",
"dotenv": "^17.2.3",
"drizzle-orm": "^0.44.5",
"electric-sql": "^0.12.1",
"emblor": "^1.4.8",
"fumadocs-core": "^16.3.1",
"fumadocs-mdx": "^14.2.1",
@ -105,6 +110,7 @@
},
"devDependencies": {
"@biomejs/biome": "2.1.2",
"@electric-sql/cli": "0.11.4-canary.cb19c58",
"@eslint/eslintrc": "^3.3.1",
"@tailwindcss/postcss": "^4.1.11",
"@tailwindcss/typography": "^0.5.16",

File diff suppressed because it is too large Load diff