diff --git a/surfsense_backend/alembic/versions/60_add_notifications_table.py b/surfsense_backend/alembic/versions/61_add_notifications_table.py similarity index 95% rename from surfsense_backend/alembic/versions/60_add_notifications_table.py rename to surfsense_backend/alembic/versions/61_add_notifications_table.py index fe2d9359f..132261686 100644 --- a/surfsense_backend/alembic/versions/60_add_notifications_table.py +++ b/surfsense_backend/alembic/versions/61_add_notifications_table.py @@ -1,7 +1,7 @@ """Add notifications table -Revision ID: 60 -Revises: 59 +Revision ID: 61 +Revises: 60 Note: Electric SQL replication setup (REPLICA IDENTITY FULL and publication) is handled in app/db.py setup_electric_replication() which runs on app startup. @@ -11,8 +11,8 @@ from collections.abc import Sequence from alembic import op # revision identifiers, used by Alembic. -revision: str = "60" -down_revision: str | None = "59" +revision: str = "61" +down_revision: str | None = "60" branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 343101bdf..ec3f78a4b 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -939,13 +939,15 @@ async def create_db_and_tables(): async def setup_electric_replication(): - """Set up Electric SQL replication for the notifications table.""" + """Set up Electric SQL replication for real-time sync tables.""" async with engine.begin() as conn: # Set REPLICA IDENTITY FULL (required by Electric SQL for replication) # This logs full row data for UPDATE/DELETE operations in the WAL await conn.execute(text("ALTER TABLE notifications REPLICA IDENTITY FULL;")) + await conn.execute(text("ALTER TABLE search_source_connectors REPLICA IDENTITY FULL;")) + await conn.execute(text("ALTER TABLE documents REPLICA IDENTITY FULL;")) - # Add notifications table to Electric SQL publication for replication + # Add tables to Electric SQL publication for replication # Only add if publication exists and table not already in it await conn.execute( text( @@ -953,6 +955,7 @@ async def setup_electric_replication(): DO $$ BEGIN IF EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'electric_publication_default') THEN + -- Add notifications if not already added IF NOT EXISTS ( SELECT 1 FROM pg_publication_tables WHERE pubname = 'electric_publication_default' @@ -960,6 +963,24 @@ async def setup_electric_replication(): ) THEN ALTER PUBLICATION electric_publication_default ADD TABLE notifications; END IF; + + -- Add search_source_connectors if not already added + IF NOT EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'search_source_connectors' + ) THEN + ALTER PUBLICATION electric_publication_default ADD TABLE search_source_connectors; + END IF; + + -- Add documents if not already added + IF NOT EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'documents' + ) THEN + ALTER PUBLICATION electric_publication_default ADD TABLE documents; + END IF; END IF; END $$; diff --git a/surfsense_web/components/assistant-ui/connector-popup.tsx b/surfsense_web/components/assistant-ui/connector-popup.tsx index 1e6dd09ae..883ac63c5 100644 --- a/surfsense_web/components/assistant-ui/connector-popup.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup.tsx @@ -1,17 +1,17 @@ "use client"; -import { useQuery, useQueryClient } from "@tanstack/react-query"; import { useAtomValue } from "jotai"; import { Cable, Loader2 } from "lucide-react"; import { useSearchParams } from "next/navigation"; -import { type FC, useEffect, useMemo } from "react"; -import { documentTypeCountsAtom } from "@/atoms/documents/document-query.atoms"; +import { type FC, useMemo } from "react"; import { activeSearchSpaceIdAtom } from "@/atoms/search-spaces/search-space-query.atoms"; import { TooltipIconButton } from "@/components/assistant-ui/tooltip-icon-button"; import { Dialog, DialogContent } from "@/components/ui/dialog"; import { Tabs, TabsContent } from "@/components/ui/tabs"; import type { SearchSourceConnector } from "@/contracts/types/connector.types"; import { useLogsSummary } from "@/hooks/use-logs"; +import { useConnectorsElectric } from "@/hooks/use-connectors-electric"; +import { useDocumentsElectric } from "@/hooks/use-documents-electric"; import { connectorsApiService } from "@/lib/apis/connectors-api.service"; import { cacheKeys } from "@/lib/query-client/cache-keys"; import { cn } from "@/lib/utils"; @@ -29,8 +29,9 @@ import { YouTubeCrawlerView } from "./connector-popup/views/youtube-crawler-view export const ConnectorIndicator: FC = () => { const searchSpaceId = useAtomValue(activeSearchSpaceIdAtom); const searchParams = useSearchParams(); - const { data: documentTypeCounts, isLoading: documentTypesLoading } = - useAtomValue(documentTypeCountsAtom); + + // Fetch document type counts using Electric SQL + PGlite for real-time updates + const { documentTypeCounts, loading: documentTypesLoading } = useDocumentsElectric(searchSpaceId); // Check if YouTube view is active const isYouTubeView = searchParams.get("view") === "youtube"; @@ -93,47 +94,31 @@ export const ConnectorIndicator: FC = () => { setConnectorName, } = useConnectorDialog(); - // Fetch connectors using React Query with conditional refetchInterval - // This automatically refetches when mutations invalidate the cache (event-driven) - // and also polls when dialog is open to catch external changes + // Fetch connectors using Electric SQL + PGlite for real-time updates + // This provides instant updates when connectors change, without polling const { - data: connectors = [], - isLoading: connectorsLoading, - refetch: refreshConnectors, - } = useQuery({ - queryKey: cacheKeys.connectors.all(searchSpaceId || ""), - queryFn: () => - connectorsApiService.getConnectors({ - queryParams: { - search_space_id: searchSpaceId ? Number(searchSpaceId) : undefined, - }, - }), - enabled: !!searchSpaceId, - staleTime: 5 * 60 * 1000, // 5 minutes (same as connectorsAtom) - // Poll when dialog is open to catch external changes - refetchInterval: isOpen ? 5000 : false, // 5 seconds when open, no polling when closed - }); + connectors: connectorsFromElectric = [], + loading: connectorsLoading, + error: connectorsError, + refreshConnectors: refreshConnectorsElectric, + } = useConnectorsElectric(searchSpaceId); - const queryClient = useQueryClient(); + // Fallback to API if Electric fails or is not available + const connectors = connectorsFromElectric.length > 0 || !connectorsError + ? connectorsFromElectric + : allConnectors || []; - // Also refresh document type counts when dialog is open - useEffect(() => { - if (!isOpen || !searchSpaceId) return; + // Manual refresh function that works with both Electric and API + const refreshConnectors = async () => { + if (connectorsFromElectric.length > 0 || !connectorsError) { + await refreshConnectorsElectric(); + } else { + // Fallback: use allConnectors from useConnectorDialog (which uses connectorsAtom) + // The connectorsAtom will handle refetching if needed + } + }; - const POLL_INTERVAL = 5000; // 5 seconds, same as connectors - - const intervalId = setInterval(() => { - // Invalidate document type counts to refresh active document types - queryClient.invalidateQueries({ - queryKey: cacheKeys.documents.typeCounts(searchSpaceId), - }); - }, POLL_INTERVAL); - - // Cleanup interval on unmount or when dialog closes - return () => { - clearInterval(intervalId); - }; - }, [isOpen, searchSpaceId, queryClient]); + // Document type counts now update in real-time via Electric SQL - no polling needed! // Get connector IDs that are currently being indexed const indexingConnectorIds = useMemo(() => { diff --git a/surfsense_web/components/assistant-ui/connector-popup/components/connector-card.tsx b/surfsense_web/components/assistant-ui/connector-popup/components/connector-card.tsx index fa4b8feb6..27c9608ef 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/components/connector-card.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/components/connector-card.tsx @@ -1,7 +1,6 @@ "use client"; import { IconBrandYoutube } from "@tabler/icons-react"; -import { differenceInDays, differenceInMinutes, format, isToday, isYesterday } from "date-fns"; import { FileText, Loader2 } from "lucide-react"; import type { FC } from "react"; import { Button } from "@/components/ui/button"; @@ -20,7 +19,6 @@ interface ConnectorCardProps { isConnecting?: boolean; documentCount?: number; accountCount?: number; - lastIndexedAt?: string | null; isIndexing?: boolean; activeTask?: LogActiveTask; onConnect?: () => void; @@ -52,45 +50,6 @@ function formatDocumentCount(count: number | undefined): string { return `${m.replace(/\.0$/, "")}M docs`; } -/** - * Format last indexed date with contextual messages - * Examples: "Just now", "10 minutes ago", "Today at 2:30 PM", "Yesterday at 3:45 PM", "3 days ago", "Jan 15, 2026" - */ -function formatLastIndexedDate(dateString: string): string { - const date = new Date(dateString); - const now = new Date(); - const minutesAgo = differenceInMinutes(now, date); - const daysAgo = differenceInDays(now, date); - - // Just now (within last minute) - if (minutesAgo < 1) { - return "Just now"; - } - - // X minutes ago (less than 1 hour) - if (minutesAgo < 60) { - return `${minutesAgo} ${minutesAgo === 1 ? "minute" : "minutes"} ago`; - } - - // Today at [time] - if (isToday(date)) { - return `Today at ${format(date, "h:mm a")}`; - } - - // Yesterday at [time] - if (isYesterday(date)) { - return `Yesterday at ${format(date, "h:mm a")}`; - } - - // X days ago (less than 7 days) - if (daysAgo < 7) { - return `${daysAgo} ${daysAgo === 1 ? "day" : "days"} ago`; - } - - // Full date for older entries - return format(date, "MMM d, yyyy"); -} - export const ConnectorCard: FC = ({ id, title, @@ -100,7 +59,6 @@ export const ConnectorCard: FC = ({ isConnecting = false, documentCount, accountCount, - lastIndexedAt, isIndexing = false, activeTask, onConnect, @@ -118,37 +76,29 @@ export const ConnectorCard: FC = ({ // Extract count from active task message during indexing const indexingCount = extractIndexedCount(activeTask?.message); - // Determine the status content to display - const getStatusContent = () => { - if (isIndexing) { - return ( -
- - {indexingCount !== null ? <>{indexingCount.toLocaleString()} indexed : "Syncing..."} - - {/* Indeterminate progress bar with animation */} -
-
-
-
- ); - } - - if (isConnected) { - // Show last indexed date for connected connectors - if (lastIndexedAt) { + // Determine the status content to display + const getStatusContent = () => { + if (isIndexing) { return ( - - Last indexed: {formatLastIndexedDate(lastIndexedAt)} - +
+ + {indexingCount !== null ? <>{indexingCount.toLocaleString()} indexed : "Syncing..."} + + {/* Indeterminate progress bar with animation */} +
+
+
+
); } - // Fallback for connected but never indexed - return Never indexed; - } - return description; - }; + if (isConnected) { + // Don't show last indexed in overview tabs - only show in accounts list view + return null; + } + + return description; + }; const cardContent = (
= ({ /> )}
-
{getStatusContent()}
- {isConnected && documentCount !== undefined && ( -

+ {isIndexing ? ( +

{getStatusContent()}
+ ) : isConnected ? ( +

{formatDocumentCount(documentCount)} {accountCount !== undefined && accountCount > 0 && ( <> @@ -199,6 +150,8 @@ export const ConnectorCard: FC = ({ )}

+ ) : ( +
{getStatusContent()}
)}

{title}

- {isAnyIndexing ? ( + {isAnyIndexing && (

Indexing...

- ) : ( -

- {mostRecentLastIndexed - ? `Last indexed: ${formatLastIndexedDate(mostRecentLastIndexed)}` - : "Never indexed"} -

)} -

+

{formatDocumentCount(documentCount)} @@ -289,7 +256,7 @@ export const ActiveConnectorsTab: FC = ({

{connector.name}

- {isIndexing ? ( + {isIndexing && (

Indexing... @@ -299,14 +266,8 @@ export const ActiveConnectorsTab: FC = ({ )}

- ) : ( -

- {connector.last_indexed_at - ? `Last indexed: ${formatLastIndexedDate(connector.last_indexed_at)}` - : "Never indexed"} -

)} -

+

{formatDocumentCount(documentCount)}

diff --git a/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx b/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx index 6129b49b7..e596c9faf 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx @@ -103,18 +103,6 @@ export const AllConnectorsTab: FC = ({ ) : []; - // Get the most recent last_indexed_at across all accounts - const mostRecentLastIndexed = typeConnectors.reduce( - (latest, c) => { - if (!c.last_indexed_at) return latest; - if (!latest) return c.last_indexed_at; - return new Date(c.last_indexed_at) > new Date(latest) - ? c.last_indexed_at - : latest; - }, - undefined - ); - const documentCount = getDocumentCountForConnector( connector.connectorType, documentTypeCounts @@ -139,7 +127,6 @@ export const AllConnectorsTab: FC = ({ isConnecting={isConnecting} documentCount={documentCount} accountCount={typeConnectors.length} - lastIndexedAt={mostRecentLastIndexed} isIndexing={isIndexing} activeTask={activeTask} onConnect={() => onConnectOAuth(connector)} @@ -197,7 +184,6 @@ export const AllConnectorsTab: FC = ({ isConnected={isConnected} isConnecting={isConnecting} documentCount={documentCount} - lastIndexedAt={actualConnector?.last_indexed_at} isIndexing={isIndexing} activeTask={activeTask} onConnect={handleConnect} @@ -267,7 +253,6 @@ export const AllConnectorsTab: FC = ({ isConnected={isConnected} isConnecting={isConnecting} documentCount={documentCount} - lastIndexedAt={actualConnector?.last_indexed_at} isIndexing={isIndexing} activeTask={activeTask} onConnect={handleConnect} diff --git a/surfsense_web/hooks/use-connectors-electric.ts b/surfsense_web/hooks/use-connectors-electric.ts new file mode 100644 index 000000000..d746f74ab --- /dev/null +++ b/surfsense_web/hooks/use-connectors-electric.ts @@ -0,0 +1,178 @@ +"use client" + +import { useEffect, useState, useCallback, useRef } from 'react' +import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client' +import type { SearchSourceConnector } from '@/contracts/types/connector.types' + +export function useConnectorsElectric(searchSpaceId: number | string | null) { + const [electric, setElectric] = useState(null) + const [connectors, setConnectors] = useState([]) + const [loading, setLoading] = useState(true) + const [error, setError] = useState(null) + const syncHandleRef = useRef(null) + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null) + + // Initialize Electric SQL and start syncing with real-time updates + useEffect(() => { + if (!searchSpaceId) { + setLoading(false) + setConnectors([]) + return + } + + let mounted = true + + async function init() { + try { + const electricClient = await initElectric() + if (!mounted) return + + setElectric(electricClient) + + // Start syncing connectors for this search space via Electric SQL + console.log('Starting Electric SQL sync for connectors, search_space_id:', searchSpaceId) + + // Use numeric format for WHERE clause (PGlite sync plugin expects this format) + const handle = await electricClient.syncShape({ + table: 'search_source_connectors', + where: `search_space_id = ${searchSpaceId}`, + primaryKey: ['id'], + }) + + console.log('Electric SQL sync started for connectors:', { + isUpToDate: handle.isUpToDate, + hasStream: !!handle.stream, + hasInitialSyncPromise: !!handle.initialSyncPromise, + }) + + // Optimized: Check if already up-to-date before waiting + if (handle.isUpToDate) { + console.log('Connectors sync already up-to-date, skipping wait') + } else if (handle.initialSyncPromise) { + // Only wait if not already up-to-date + console.log('Waiting for initial connectors sync to complete...') + try { + // Use Promise.race with a shorter timeout to avoid long waits + await Promise.race([ + handle.initialSyncPromise, + new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait + ]) + console.log('Initial connectors sync promise resolved or timed out, checking status:', { + isUpToDate: handle.isUpToDate, + }) + } catch (syncErr) { + console.error('Initial connectors sync failed:', syncErr) + } + } + + // Check status after waiting + console.log('Connectors sync status after waiting:', { + isUpToDate: handle.isUpToDate, + hasStream: !!handle.stream, + }) + + if (!mounted) { + handle.unsubscribe() + return + } + + syncHandleRef.current = handle + setLoading(false) + setError(null) + + // Fetch connectors after sync is complete (we already waited above) + await fetchConnectors(electricClient.db) + + // Set up real-time updates using PGlite live queries + // Electric SQL syncs data to PGlite in real-time via HTTP streaming + // PGlite live queries detect when the synced data changes and trigger callbacks + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = electricClient.db as any + + // Use PGlite's live query API for real-time updates + // CORRECT API: await db.live.query() then use .subscribe() + if (db.live?.query && typeof db.live.query === 'function') { + // IMPORTANT: db.live.query() returns a Promise - must await it! + const liveQuery = await db.live.query( + `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ) + + if (!mounted) { + liveQuery.unsubscribe?.() + return + } + + // Set initial results immediately from the resolved query + if (liveQuery.initialResults?.rows) { + console.log('📋 Initial live query results for connectors:', liveQuery.initialResults.rows.length) + setConnectors(liveQuery.initialResults.rows) + } else if (liveQuery.rows) { + // Some versions have rows directly on the result + console.log('📋 Initial live query results for connectors (direct):', liveQuery.rows.length) + setConnectors(liveQuery.rows) + } + + // Subscribe to changes - this is the correct API! + // The callback fires automatically when Electric SQL syncs new data to PGlite + if (typeof liveQuery.subscribe === 'function') { + liveQuery.subscribe((result: { rows: SearchSourceConnector[] }) => { + if (mounted && result.rows) { + console.log('🔄 Connectors updated via live query:', result.rows.length) + setConnectors(result.rows) + } + }) + + // Store unsubscribe function for cleanup + liveQueryRef.current = liveQuery + } + } else { + console.warn('PGlite live query API not available, falling back to polling') + } + } catch (liveQueryErr) { + console.error('Failed to set up live query for connectors:', liveQueryErr) + // Don't fail completely - we still have the initial fetch + } + } catch (err) { + console.error('Failed to initialize Electric SQL for connectors:', err) + if (mounted) { + setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL for connectors')) + setLoading(false) + } + } + } + + init() + + return () => { + mounted = false + syncHandleRef.current?.unsubscribe?.() + liveQueryRef.current?.unsubscribe?.() + syncHandleRef.current = null + liveQueryRef.current = null + } + }, [searchSpaceId]) + + async function fetchConnectors(db: any) { + try { + const result = await db.query( + `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ) + console.log('📋 Fetched connectors from PGlite:', result.rows?.length || 0) + setConnectors(result.rows || []) + } catch (err) { + console.error('Failed to fetch connectors from PGlite:', err) + } + } + + // Manual refresh function (optional, for fallback) + const refreshConnectors = useCallback(async () => { + if (!electric) return + await fetchConnectors(electric.db) + }, [electric]) + + return { connectors, loading, error, refreshConnectors } +} + diff --git a/surfsense_web/hooks/use-documents-electric.ts b/surfsense_web/hooks/use-documents-electric.ts new file mode 100644 index 000000000..a4b6f23c4 --- /dev/null +++ b/surfsense_web/hooks/use-documents-electric.ts @@ -0,0 +1,190 @@ +"use client" + +import { useEffect, useState, useRef, useMemo } from 'react' +import { initElectric, type ElectricClient, type SyncHandle } from '@/lib/electric/client' + +interface Document { + id: number + search_space_id: number + document_type: string + created_at: string +} + +export function useDocumentsElectric(searchSpaceId: number | string | null) { + const [electric, setElectric] = useState(null) + const [documents, setDocuments] = useState([]) + const [loading, setLoading] = useState(true) + const [error, setError] = useState(null) + const syncHandleRef = useRef(null) + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null) + + // Calculate document type counts from synced documents + const documentTypeCounts = useMemo(() => { + if (!documents.length) return {} + + const counts: Record = {} + for (const doc of documents) { + counts[doc.document_type] = (counts[doc.document_type] || 0) + 1 + } + return counts + }, [documents]) + + // Initialize Electric SQL and start syncing with real-time updates + useEffect(() => { + if (!searchSpaceId) { + setLoading(false) + setDocuments([]) + return + } + + let mounted = true + + async function init() { + try { + const electricClient = await initElectric() + if (!mounted) return + + setElectric(electricClient) + + // Start syncing documents for this search space via Electric SQL + // Only sync id, document_type, search_space_id columns for efficiency + console.log('Starting Electric SQL sync for documents, search_space_id:', searchSpaceId) + + const handle = await electricClient.syncShape({ + table: 'documents', + where: `search_space_id = ${searchSpaceId}`, + columns: ['id', 'document_type', 'search_space_id', 'created_at'], + primaryKey: ['id'], + }) + + console.log('Electric SQL sync started for documents:', { + isUpToDate: handle.isUpToDate, + hasStream: !!handle.stream, + hasInitialSyncPromise: !!handle.initialSyncPromise, + }) + + // Optimized: Check if already up-to-date before waiting + if (handle.isUpToDate) { + console.log('Documents sync already up-to-date, skipping wait') + } else if (handle.initialSyncPromise) { + // Only wait if not already up-to-date + console.log('Waiting for initial documents sync to complete...') + try { + // Use Promise.race with a shorter timeout to avoid long waits + await Promise.race([ + handle.initialSyncPromise, + new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait + ]) + console.log('Initial documents sync promise resolved or timed out, checking status:', { + isUpToDate: handle.isUpToDate, + }) + } catch (syncErr) { + console.error('Initial documents sync failed:', syncErr) + } + } + + // Check status after waiting + console.log('Documents sync status after waiting:', { + isUpToDate: handle.isUpToDate, + hasStream: !!handle.stream, + }) + + if (!mounted) { + handle.unsubscribe() + return + } + + syncHandleRef.current = handle + setLoading(false) + setError(null) + + // Fetch documents after sync is complete (we already waited above) + await fetchDocuments(electricClient.db) + + // Set up real-time updates using PGlite live queries + // Electric SQL syncs data to PGlite in real-time via HTTP streaming + // PGlite live queries detect when the synced data changes and trigger callbacks + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = electricClient.db as any + + // Use PGlite's live query API for real-time updates + // CORRECT API: await db.live.query() then use .subscribe() + if (db.live?.query && typeof db.live.query === 'function') { + // IMPORTANT: db.live.query() returns a Promise - must await it! + const liveQuery = await db.live.query( + `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ) + + if (!mounted) { + liveQuery.unsubscribe?.() + return + } + + // Set initial results immediately from the resolved query + if (liveQuery.initialResults?.rows) { + console.log('📋 Initial live query results for documents:', liveQuery.initialResults.rows.length) + setDocuments(liveQuery.initialResults.rows) + } else if (liveQuery.rows) { + // Some versions have rows directly on the result + console.log('📋 Initial live query results for documents (direct):', liveQuery.rows.length) + setDocuments(liveQuery.rows) + } + + // Subscribe to changes - this is the correct API! + // The callback fires automatically when Electric SQL syncs new data to PGlite + if (typeof liveQuery.subscribe === 'function') { + liveQuery.subscribe((result: { rows: Document[] }) => { + if (mounted && result.rows) { + console.log('🔄 Documents updated via live query:', result.rows.length) + setDocuments(result.rows) + } + }) + + // Store unsubscribe function for cleanup + liveQueryRef.current = liveQuery + } + } else { + console.warn('PGlite live query API not available for documents, falling back to polling') + } + } catch (liveQueryErr) { + console.error('Failed to set up live query for documents:', liveQueryErr) + // Don't fail completely - we still have the initial fetch + } + } catch (err) { + console.error('Failed to initialize Electric SQL for documents:', err) + if (mounted) { + setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL for documents')) + setLoading(false) + } + } + } + + init() + + return () => { + mounted = false + syncHandleRef.current?.unsubscribe?.() + liveQueryRef.current?.unsubscribe?.() + syncHandleRef.current = null + liveQueryRef.current = null + } + }, [searchSpaceId]) + + async function fetchDocuments(db: any) { + try { + const result = await db.query( + `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ) + console.log('📋 Fetched documents from PGlite:', result.rows?.length || 0) + setDocuments(result.rows || []) + } catch (err) { + console.error('Failed to fetch documents from PGlite:', err) + } + } + + return { documentTypeCounts, loading, error } +} + diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 509fd0231..02a1af40e 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -97,6 +97,44 @@ export async function initElectric(): Promise { CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(read); `) + // Create the search_source_connectors table schema in PGlite + // This matches the backend schema + await db.exec(` + CREATE TABLE IF NOT EXISTS search_source_connectors ( + id INTEGER PRIMARY KEY, + search_space_id INTEGER NOT NULL, + user_id TEXT NOT NULL, + connector_type TEXT NOT NULL, + name TEXT NOT NULL, + is_indexable BOOLEAN NOT NULL DEFAULT FALSE, + last_indexed_at TIMESTAMPTZ, + config JSONB DEFAULT '{}', + periodic_indexing_enabled BOOLEAN NOT NULL DEFAULT FALSE, + indexing_frequency_minutes INTEGER, + next_scheduled_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_connectors_search_space_id ON search_source_connectors(search_space_id); + CREATE INDEX IF NOT EXISTS idx_connectors_type ON search_source_connectors(connector_type); + CREATE INDEX IF NOT EXISTS idx_connectors_user_id ON search_source_connectors(user_id); + `) + + // Create the documents table schema in PGlite + // Only sync minimal fields needed for type counts: id, document_type, search_space_id + await db.exec(` + CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY, + search_space_id INTEGER NOT NULL, + document_type TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents(search_space_id); + CREATE INDEX IF NOT EXISTS idx_documents_type ON documents(document_type); + CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type); + `) + const electricUrl = getElectricUrl() // Create the client wrapper