/** * Electric SQL client setup for ElectricSQL 1.x with PGlite * * This uses the new ElectricSQL 1.x architecture: * - PGlite: In-browser PostgreSQL database (local storage) * - @electric-sql/pglite-sync: Sync plugin to sync Electric shapes into PGlite * - @electric-sql/client: HTTP client for subscribing to shapes */ import { PGlite } from '@electric-sql/pglite' import { electricSync } from '@electric-sql/pglite-sync' import { live } from '@electric-sql/pglite/live' // Types export interface ElectricClient { db: PGlite syncShape: (options: SyncShapeOptions) => Promise } export interface SyncShapeOptions { table: string where?: string columns?: string[] primaryKey?: string[] } export interface SyncHandle { unsubscribe: () => void readonly isUpToDate: boolean // The stream property contains the ShapeStreamInterface from pglite-sync stream?: unknown // Promise that resolves when initial sync is complete initialSyncPromise?: Promise } // Singleton instance let electricClient: ElectricClient | null = null let isInitializing = false let initPromise: Promise | null = null // Version for sync state - increment this to force fresh sync when Electric config changes // Incremented to v4 to fix sync completion issues const SYNC_VERSION = 4 // Get Electric URL from environment function getElectricUrl(): string { if (typeof window !== 'undefined') { return process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133' } return 'http://localhost:5133' } /** * Initialize the Electric SQL client with PGlite and sync plugin */ export async function initElectric(): Promise { if (electricClient) { return electricClient } if (isInitializing && initPromise) { return initPromise } isInitializing = true initPromise = (async () => { try { // Create PGlite instance with Electric sync plugin and live queries // Include version in database name to force fresh sync when Electric config changes const db = await PGlite.create({ dataDir: `idb://surfsense-notifications-v${SYNC_VERSION}`, relaxedDurability: true, extensions: { // Enable debug mode in electricSync to see detailed sync logs electric: electricSync({ debug: true }), live, // Enable live queries for real-time updates }, }) // Create the notifications table schema in PGlite // This matches the backend schema await db.exec(` CREATE TABLE IF NOT EXISTS notifications ( id INTEGER PRIMARY KEY, user_id TEXT NOT NULL, search_space_id INTEGER, type TEXT NOT NULL, title TEXT NOT NULL, message TEXT NOT NULL, read BOOLEAN NOT NULL DEFAULT FALSE, metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notifications(user_id); CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(read); `) // Create the search_source_connectors table schema in PGlite // This matches the backend schema await db.exec(` CREATE TABLE IF NOT EXISTS search_source_connectors ( id INTEGER PRIMARY KEY, search_space_id INTEGER NOT NULL, user_id TEXT NOT NULL, connector_type TEXT NOT NULL, name TEXT NOT NULL, is_indexable BOOLEAN NOT NULL DEFAULT FALSE, last_indexed_at TIMESTAMPTZ, config JSONB DEFAULT '{}', periodic_indexing_enabled BOOLEAN NOT NULL DEFAULT FALSE, indexing_frequency_minutes INTEGER, next_scheduled_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_connectors_search_space_id ON search_source_connectors(search_space_id); CREATE INDEX IF NOT EXISTS idx_connectors_type ON search_source_connectors(connector_type); CREATE INDEX IF NOT EXISTS idx_connectors_user_id ON search_source_connectors(user_id); `) // Create the documents table schema in PGlite // Only sync minimal fields needed for type counts: id, document_type, search_space_id await db.exec(` CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY, search_space_id INTEGER NOT NULL, document_type TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents(search_space_id); CREATE INDEX IF NOT EXISTS idx_documents_type ON documents(document_type); CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type); `) const electricUrl = getElectricUrl() // Create the client wrapper electricClient = { db, syncShape: async (options: SyncShapeOptions): Promise => { const { table, where, columns, primaryKey = ['id'] } = options // Build params for the shape request // Electric SQL expects params as URL query parameters const params: Record = { table } // Validate and fix WHERE clause to ensure string literals are properly quoted let validatedWhere = where if (where) { // Check if where uses positional parameters if (where.includes('$1')) { // Extract the value from the where clause if it's embedded // For now, we'll use the where clause as-is and let Electric handle it params.where = where validatedWhere = where } else { // Validate that string literals are properly quoted // Count single quotes - should be even (pairs) for properly quoted strings const singleQuoteCount = (where.match(/'/g) || []).length if (singleQuoteCount % 2 !== 0) { // Odd number of quotes means unterminated string literal console.warn('Where clause has unmatched quotes, fixing:', where) // Add closing quote at the end validatedWhere = `${where}'` params.where = validatedWhere } else { // Use the where clause directly (already formatted) params.where = where validatedWhere = where } } } if (columns) params.columns = columns.join(',') console.log('Syncing shape with params:', params) console.log('Electric URL:', `${electricUrl}/v1/shape`) console.log('Where clause:', where, 'Validated:', validatedWhere) try { // Debug: Test Electric SQL connection directly first // Use validatedWhere to ensure proper URL encoding const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ''}` console.log('Testing Electric SQL directly:', testUrl) try { const testResponse = await fetch(testUrl) const testHeaders = { handle: testResponse.headers.get('electric-handle'), offset: testResponse.headers.get('electric-offset'), upToDate: testResponse.headers.get('electric-up-to-date'), } console.log('Direct Electric SQL response headers:', testHeaders) const testData = await testResponse.json() console.log('Direct Electric SQL data count:', Array.isArray(testData) ? testData.length : 'not array', testData) } catch (testErr) { console.error('Direct Electric SQL test failed:', testErr) } // Use PGlite's electric sync plugin to sync the shape // According to Electric SQL docs, the shape config uses params for table, where, columns // Note: mapColumns is OPTIONAL per pglite-sync types.ts // Create a promise that resolves when initial sync is complete // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates let resolveInitialSync: () => void let rejectInitialSync: (error: Error) => void let syncResolved = false const initialSyncPromise = new Promise((resolve, reject) => { resolveInitialSync = () => { if (!syncResolved) { syncResolved = true // DON'T unsubscribe from stream - it needs to stay active for real-time updates resolve() } } rejectInitialSync = (error: Error) => { if (!syncResolved) { syncResolved = true // DON'T unsubscribe from stream even on error - let Electric handle it reject(error) } } // Shorter timeout (5 seconds) as fallback const timeoutId = setTimeout(() => { if (!syncResolved) { console.warn(`⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`) // Check isUpToDate one more time before resolving // This will be checked after shape is created setTimeout(() => { if (!syncResolved) { console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`) resolveInitialSync() } }, 100) } }, 5000) // Store timeout ID for cleanup if needed // Note: timeout will be cleared if sync completes early }) const shapeConfig = { shape: { url: `${electricUrl}/v1/shape`, params: { table, ...(validatedWhere ? { where: validatedWhere } : {}), ...(columns ? { columns: columns.join(',') } : {}), }, }, table, primaryKey, shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, '_') || 'all'}`, // Versioned key to force fresh sync when needed onInitialSync: () => { console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`) resolveInitialSync() }, onError: (error: Error) => { console.error(`❌ Shape sync error for ${table}:`, error) console.error('Error details:', JSON.stringify(error, Object.getOwnPropertyNames(error))) rejectInitialSync(error) }, } console.log('syncShapeToTable config:', JSON.stringify(shapeConfig, null, 2)) // Type assertion to PGlite with electric extension const pgWithElectric = db as PGlite & { electric: { syncShapeToTable: (config: typeof shapeConfig) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }> } } const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig) if (!shape) { throw new Error('syncShapeToTable returned undefined') } // Log the actual shape result structure console.log('Shape sync result (initial):', { hasUnsubscribe: typeof shape?.unsubscribe === 'function', isUpToDate: shape?.isUpToDate, hasStream: !!shape?.stream, streamType: typeof shape?.stream, }) // Recommended Approach Step 1: Check isUpToDate immediately if (shape.isUpToDate) { console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`) resolveInitialSync() } else { // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message if (shape?.stream) { const stream = shape.stream as any console.log('Shape stream details:', { shapeHandle: stream?.shapeHandle, lastOffset: stream?.lastOffset, isUpToDate: stream?.isUpToDate, error: stream?.error, hasSubscribe: typeof stream?.subscribe === 'function', hasUnsubscribe: typeof stream?.unsubscribe === 'function', }) // Subscribe to the stream to watch for "up-to-date" control message // NOTE: We keep this subscription active - don't unsubscribe! // The stream is what Electric SQL uses for real-time updates if (typeof stream?.subscribe === 'function') { console.log('Subscribing to shape stream to watch for up-to-date message...') // Subscribe but don't store unsubscribe - we want it to stay active stream.subscribe((messages: unknown[]) => { // Continue receiving updates even after sync is resolved if (!syncResolved) { console.log('🔵 Shape stream received messages:', messages?.length || 0) } // Check if any message indicates sync is complete if (messages && messages.length > 0) { for (const message of messages) { const msg = message as any // Check for "up-to-date" control message if (msg?.headers?.control === 'up-to-date' || msg?.headers?.electric_up_to_date === 'true' || (typeof msg === 'object' && 'up-to-date' in msg)) { if (!syncResolved) { console.log(`✅ Received up-to-date message for ${table}`) resolveInitialSync() } // Continue listening for real-time updates - don't return! } } if (!syncResolved && messages.length > 0) { console.log('First message:', JSON.stringify(messages[0], null, 2)) } } // Also check stream's isUpToDate property after receiving messages if (!syncResolved && stream?.isUpToDate) { console.log(`✅ Stream isUpToDate is true for ${table}`) resolveInitialSync() } }) // Also check stream's isUpToDate property immediately if (stream?.isUpToDate) { console.log(`✅ Stream isUpToDate is true immediately for ${table}`) resolveInitialSync() } } // Also poll isUpToDate periodically as a backup (every 200ms) const pollInterval = setInterval(() => { if (syncResolved) { clearInterval(pollInterval) return } if (shape.isUpToDate || stream?.isUpToDate) { console.log(`✅ Sync completed (detected via polling) for ${table}`) clearInterval(pollInterval) resolveInitialSync() } }, 200) // Clean up polling when promise resolves initialSyncPromise.finally(() => { clearInterval(pollInterval) }) } else { console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`) } } // Return the shape handle - isUpToDate is a getter that reflects current state return { unsubscribe: () => { console.log('unsubscribing') if (shape && typeof shape.unsubscribe === 'function') { shape.unsubscribe() } }, // Use getter to always return current state get isUpToDate() { return shape?.isUpToDate ?? false }, stream: shape?.stream, initialSyncPromise, // Expose promise so callers can wait for sync } } catch (error) { console.error('Failed to sync shape:', error) // Check if Electric SQL server is reachable try { const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { method: 'GET', }) console.log('Electric SQL server response:', response.status, response.statusText) if (!response.ok) { console.error('Electric SQL server error:', await response.text()) } } catch (fetchError) { console.error('Cannot reach Electric SQL server:', fetchError) console.error('Make sure Electric SQL is running at:', electricUrl) } throw error } }, } console.log('Electric SQL initialized successfully with PGlite') return electricClient } catch (error) { console.error('Failed to initialize Electric SQL:', error) throw error } finally { isInitializing = false } })() return initPromise } /** * Get the Electric client (throws if not initialized) */ export function getElectric(): ElectricClient { if (!electricClient) { throw new Error('Electric not initialized. Call initElectric() first.') } return electricClient } /** * Check if Electric is initialized */ export function isElectricInitialized(): boolean { return electricClient !== null } /** * Get the PGlite database instance */ export function getDb(): PGlite | null { return electricClient?.db ?? null }