SurfSense/surfsense_web/lib/electric/client.ts
Anish Sarkar e38e6d90e0 feat: Add notifications table and integrate Electric SQL for real-time updates
- Introduced a new notifications table in the database schema to manage user notifications.
- Implemented Electric SQL replication setup for the notifications table, ensuring real-time synchronization.
- Updated existing database functions to support real-time updates for connectors and documents using Electric SQL.
- Refactored UI components to utilize new hooks for fetching connectors and documents, enhancing performance and user experience.
2026-01-13 16:54:06 +05:30

446 lines
16 KiB
TypeScript

/**
* Electric SQL client setup for ElectricSQL 1.x with PGlite
*
* This uses the new ElectricSQL 1.x architecture:
* - PGlite: In-browser PostgreSQL database (local storage)
* - @electric-sql/pglite-sync: Sync plugin to sync Electric shapes into PGlite
* - @electric-sql/client: HTTP client for subscribing to shapes
*/
import { PGlite } from '@electric-sql/pglite'
import { electricSync } from '@electric-sql/pglite-sync'
import { live } from '@electric-sql/pglite/live'
// Types
export interface ElectricClient {
db: PGlite
syncShape: (options: SyncShapeOptions) => Promise<SyncHandle>
}
export interface SyncShapeOptions {
table: string
where?: string
columns?: string[]
primaryKey?: string[]
}
export interface SyncHandle {
unsubscribe: () => void
readonly isUpToDate: boolean
// The stream property contains the ShapeStreamInterface from pglite-sync
stream?: unknown
// Promise that resolves when initial sync is complete
initialSyncPromise?: Promise<void>
}
// Singleton instance
let electricClient: ElectricClient | null = null
let isInitializing = false
let initPromise: Promise<ElectricClient> | null = null
// Version for sync state - increment this to force fresh sync when Electric config changes
// Incremented to v4 to fix sync completion issues
const SYNC_VERSION = 4
// Get Electric URL from environment
function getElectricUrl(): string {
if (typeof window !== 'undefined') {
return process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133'
}
return 'http://localhost:5133'
}
/**
* Initialize the Electric SQL client with PGlite and sync plugin
*/
export async function initElectric(): Promise<ElectricClient> {
if (electricClient) {
return electricClient
}
if (isInitializing && initPromise) {
return initPromise
}
isInitializing = true
initPromise = (async () => {
try {
// Create PGlite instance with Electric sync plugin and live queries
// Include version in database name to force fresh sync when Electric config changes
const db = await PGlite.create({
dataDir: `idb://surfsense-notifications-v${SYNC_VERSION}`,
relaxedDurability: true,
extensions: {
// Enable debug mode in electricSync to see detailed sync logs
electric: electricSync({ debug: true }),
live, // Enable live queries for real-time updates
},
})
// Create the notifications table schema in PGlite
// This matches the backend schema
await db.exec(`
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY,
user_id TEXT NOT NULL,
search_space_id INTEGER,
type TEXT NOT NULL,
title TEXT NOT NULL,
message TEXT NOT NULL,
read BOOLEAN NOT NULL DEFAULT FALSE,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notifications(user_id);
CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(read);
`)
// Create the search_source_connectors table schema in PGlite
// This matches the backend schema
await db.exec(`
CREATE TABLE IF NOT EXISTS search_source_connectors (
id INTEGER PRIMARY KEY,
search_space_id INTEGER NOT NULL,
user_id TEXT NOT NULL,
connector_type TEXT NOT NULL,
name TEXT NOT NULL,
is_indexable BOOLEAN NOT NULL DEFAULT FALSE,
last_indexed_at TIMESTAMPTZ,
config JSONB DEFAULT '{}',
periodic_indexing_enabled BOOLEAN NOT NULL DEFAULT FALSE,
indexing_frequency_minutes INTEGER,
next_scheduled_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_connectors_search_space_id ON search_source_connectors(search_space_id);
CREATE INDEX IF NOT EXISTS idx_connectors_type ON search_source_connectors(connector_type);
CREATE INDEX IF NOT EXISTS idx_connectors_user_id ON search_source_connectors(user_id);
`)
// Create the documents table schema in PGlite
// Only sync minimal fields needed for type counts: id, document_type, search_space_id
await db.exec(`
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY,
search_space_id INTEGER NOT NULL,
document_type TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents(search_space_id);
CREATE INDEX IF NOT EXISTS idx_documents_type ON documents(document_type);
CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type);
`)
const electricUrl = getElectricUrl()
// Create the client wrapper
electricClient = {
db,
syncShape: async (options: SyncShapeOptions): Promise<SyncHandle> => {
const { table, where, columns, primaryKey = ['id'] } = options
// Build params for the shape request
// Electric SQL expects params as URL query parameters
const params: Record<string, string> = { table }
// Validate and fix WHERE clause to ensure string literals are properly quoted
let validatedWhere = where
if (where) {
// Check if where uses positional parameters
if (where.includes('$1')) {
// Extract the value from the where clause if it's embedded
// For now, we'll use the where clause as-is and let Electric handle it
params.where = where
validatedWhere = where
} else {
// Validate that string literals are properly quoted
// Count single quotes - should be even (pairs) for properly quoted strings
const singleQuoteCount = (where.match(/'/g) || []).length
if (singleQuoteCount % 2 !== 0) {
// Odd number of quotes means unterminated string literal
console.warn('Where clause has unmatched quotes, fixing:', where)
// Add closing quote at the end
validatedWhere = `${where}'`
params.where = validatedWhere
} else {
// Use the where clause directly (already formatted)
params.where = where
validatedWhere = where
}
}
}
if (columns) params.columns = columns.join(',')
console.log('Syncing shape with params:', params)
console.log('Electric URL:', `${electricUrl}/v1/shape`)
console.log('Where clause:', where, 'Validated:', validatedWhere)
try {
// Debug: Test Electric SQL connection directly first
// Use validatedWhere to ensure proper URL encoding
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ''}`
console.log('Testing Electric SQL directly:', testUrl)
try {
const testResponse = await fetch(testUrl)
const testHeaders = {
handle: testResponse.headers.get('electric-handle'),
offset: testResponse.headers.get('electric-offset'),
upToDate: testResponse.headers.get('electric-up-to-date'),
}
console.log('Direct Electric SQL response headers:', testHeaders)
const testData = await testResponse.json()
console.log('Direct Electric SQL data count:', Array.isArray(testData) ? testData.length : 'not array', testData)
} catch (testErr) {
console.error('Direct Electric SQL test failed:', testErr)
}
// Use PGlite's electric sync plugin to sync the shape
// According to Electric SQL docs, the shape config uses params for table, where, columns
// Note: mapColumns is OPTIONAL per pglite-sync types.ts
// Create a promise that resolves when initial sync is complete
// Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout
// IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates
let resolveInitialSync: () => void
let rejectInitialSync: (error: Error) => void
let syncResolved = false
const initialSyncPromise = new Promise<void>((resolve, reject) => {
resolveInitialSync = () => {
if (!syncResolved) {
syncResolved = true
// DON'T unsubscribe from stream - it needs to stay active for real-time updates
resolve()
}
}
rejectInitialSync = (error: Error) => {
if (!syncResolved) {
syncResolved = true
// DON'T unsubscribe from stream even on error - let Electric handle it
reject(error)
}
}
// Shorter timeout (5 seconds) as fallback
const timeoutId = setTimeout(() => {
if (!syncResolved) {
console.warn(`⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`)
// Check isUpToDate one more time before resolving
// This will be checked after shape is created
setTimeout(() => {
if (!syncResolved) {
console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`)
resolveInitialSync()
}
}, 100)
}
}, 5000)
// Store timeout ID for cleanup if needed
// Note: timeout will be cleared if sync completes early
})
const shapeConfig = {
shape: {
url: `${electricUrl}/v1/shape`,
params: {
table,
...(validatedWhere ? { where: validatedWhere } : {}),
...(columns ? { columns: columns.join(',') } : {}),
},
},
table,
primaryKey,
shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, '_') || 'all'}`, // Versioned key to force fresh sync when needed
onInitialSync: () => {
console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`)
resolveInitialSync()
},
onError: (error: Error) => {
console.error(`❌ Shape sync error for ${table}:`, error)
console.error('Error details:', JSON.stringify(error, Object.getOwnPropertyNames(error)))
rejectInitialSync(error)
},
}
console.log('syncShapeToTable config:', JSON.stringify(shapeConfig, null, 2))
// Type assertion to PGlite with electric extension
const pgWithElectric = db as PGlite & { electric: { syncShapeToTable: (config: typeof shapeConfig) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }> } }
const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig)
if (!shape) {
throw new Error('syncShapeToTable returned undefined')
}
// Log the actual shape result structure
console.log('Shape sync result (initial):', {
hasUnsubscribe: typeof shape?.unsubscribe === 'function',
isUpToDate: shape?.isUpToDate,
hasStream: !!shape?.stream,
streamType: typeof shape?.stream,
})
// Recommended Approach Step 1: Check isUpToDate immediately
if (shape.isUpToDate) {
console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`)
resolveInitialSync()
} else {
// Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message
if (shape?.stream) {
const stream = shape.stream as any
console.log('Shape stream details:', {
shapeHandle: stream?.shapeHandle,
lastOffset: stream?.lastOffset,
isUpToDate: stream?.isUpToDate,
error: stream?.error,
hasSubscribe: typeof stream?.subscribe === 'function',
hasUnsubscribe: typeof stream?.unsubscribe === 'function',
})
// Subscribe to the stream to watch for "up-to-date" control message
// NOTE: We keep this subscription active - don't unsubscribe!
// The stream is what Electric SQL uses for real-time updates
if (typeof stream?.subscribe === 'function') {
console.log('Subscribing to shape stream to watch for up-to-date message...')
// Subscribe but don't store unsubscribe - we want it to stay active
stream.subscribe((messages: unknown[]) => {
// Continue receiving updates even after sync is resolved
if (!syncResolved) {
console.log('🔵 Shape stream received messages:', messages?.length || 0)
}
// Check if any message indicates sync is complete
if (messages && messages.length > 0) {
for (const message of messages) {
const msg = message as any
// Check for "up-to-date" control message
if (msg?.headers?.control === 'up-to-date' ||
msg?.headers?.electric_up_to_date === 'true' ||
(typeof msg === 'object' && 'up-to-date' in msg)) {
if (!syncResolved) {
console.log(`✅ Received up-to-date message for ${table}`)
resolveInitialSync()
}
// Continue listening for real-time updates - don't return!
}
}
if (!syncResolved && messages.length > 0) {
console.log('First message:', JSON.stringify(messages[0], null, 2))
}
}
// Also check stream's isUpToDate property after receiving messages
if (!syncResolved && stream?.isUpToDate) {
console.log(`✅ Stream isUpToDate is true for ${table}`)
resolveInitialSync()
}
})
// Also check stream's isUpToDate property immediately
if (stream?.isUpToDate) {
console.log(`✅ Stream isUpToDate is true immediately for ${table}`)
resolveInitialSync()
}
}
// Also poll isUpToDate periodically as a backup (every 200ms)
const pollInterval = setInterval(() => {
if (syncResolved) {
clearInterval(pollInterval)
return
}
if (shape.isUpToDate || stream?.isUpToDate) {
console.log(`✅ Sync completed (detected via polling) for ${table}`)
clearInterval(pollInterval)
resolveInitialSync()
}
}, 200)
// Clean up polling when promise resolves
initialSyncPromise.finally(() => {
clearInterval(pollInterval)
})
} else {
console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`)
}
}
// Return the shape handle - isUpToDate is a getter that reflects current state
return {
unsubscribe: () => {
console.log('unsubscribing')
if (shape && typeof shape.unsubscribe === 'function') {
shape.unsubscribe()
}
},
// Use getter to always return current state
get isUpToDate() {
return shape?.isUpToDate ?? false
},
stream: shape?.stream,
initialSyncPromise, // Expose promise so callers can wait for sync
}
} catch (error) {
console.error('Failed to sync shape:', error)
// Check if Electric SQL server is reachable
try {
const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
method: 'GET',
})
console.log('Electric SQL server response:', response.status, response.statusText)
if (!response.ok) {
console.error('Electric SQL server error:', await response.text())
}
} catch (fetchError) {
console.error('Cannot reach Electric SQL server:', fetchError)
console.error('Make sure Electric SQL is running at:', electricUrl)
}
throw error
}
},
}
console.log('Electric SQL initialized successfully with PGlite')
return electricClient
} catch (error) {
console.error('Failed to initialize Electric SQL:', error)
throw error
} finally {
isInitializing = false
}
})()
return initPromise
}
/**
* Get the Electric client (throws if not initialized)
*/
export function getElectric(): ElectricClient {
if (!electricClient) {
throw new Error('Electric not initialized. Call initElectric() first.')
}
return electricClient
}
/**
* Check if Electric is initialized
*/
export function isElectricInitialized(): boolean {
return electricClient !== null
}
/**
* Get the PGlite database instance
*/
export function getDb(): PGlite | null {
return electricClient?.db ?? null
}