diff --git a/surfsense_backend/alembic/versions/62_add_notifications_table.py b/surfsense_backend/alembic/versions/62_add_notifications_table.py index 5f738b9bf..5af37d2f8 100644 --- a/surfsense_backend/alembic/versions/62_add_notifications_table.py +++ b/surfsense_backend/alembic/versions/62_add_notifications_table.py @@ -6,6 +6,7 @@ Revises: 61 Note: Electric SQL replication setup (REPLICA IDENTITY FULL and publication) is handled in app/db.py setup_electric_replication() which runs on app startup. """ + from collections.abc import Sequence from alembic import op diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 44edaee55..7af55eaa3 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -711,15 +711,22 @@ class Notification(BaseModel, TimestampMixin): __tablename__ = "notifications" user_id = Column( - UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False, index=True + UUID(as_uuid=True), + ForeignKey("user.id", ondelete="CASCADE"), + nullable=False, + index=True, ) search_space_id = Column( Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True ) - type = Column(String(50), nullable=False) # 'connector_indexing', 'document_processing', etc. + type = Column( + String(50), nullable=False + ) # 'connector_indexing', 'document_processing', etc. title = Column(String(200), nullable=False) message = Column(Text, nullable=False) - read = Column(Boolean, nullable=False, default=False, server_default=text("false"), index=True) + read = Column( + Boolean, nullable=False, default=False, server_default=text("false"), index=True + ) notification_metadata = Column("metadata", JSONB, nullable=True, default={}) user = relationship("User", back_populates="notifications") @@ -990,7 +997,9 @@ async def setup_electric_replication(): # Set REPLICA IDENTITY FULL (required by Electric SQL for replication) # This logs full row data for UPDATE/DELETE operations in the WAL await conn.execute(text("ALTER TABLE notifications REPLICA IDENTITY FULL;")) - await conn.execute(text("ALTER TABLE search_source_connectors REPLICA IDENTITY FULL;")) + await conn.execute( + text("ALTER TABLE search_source_connectors REPLICA IDENTITY FULL;") + ) await conn.execute(text("ALTER TABLE documents REPLICA IDENTITY FULL;")) # Add tables to Electric SQL publication for replication diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 9be9895eb..f63349916 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -986,21 +986,25 @@ async def _run_indexing_with_notifications( try: # Get connector info for notification connector_result = await session.execute( - select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id) + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) ) connector = connector_result.scalar_one_or_none() if connector: # Create notification when indexing starts - notification = await NotificationService.connector_indexing.notify_indexing_started( - session=session, - user_id=UUID(user_id), - connector_id=connector_id, - connector_name=connector.name, - connector_type=connector.connector_type.value, - search_space_id=search_space_id, - start_date=start_date, - end_date=end_date, + notification = ( + await NotificationService.connector_indexing.notify_indexing_started( + session=session, + user_id=UUID(user_id), + connector_id=connector_id, + connector_name=connector.name, + connector_type=connector.connector_type.value, + search_space_id=search_space_id, + start_date=start_date, + end_date=end_date, + ) ) # Update notification to fetching stage @@ -1640,6 +1644,7 @@ async def run_google_gmail_indexing( start_date: Start date for indexing end_date: End date for indexing """ + # Create a wrapper function that calls index_google_gmail_messages with max_messages async def gmail_indexing_wrapper( session: AsyncSession, @@ -1701,7 +1706,9 @@ async def run_google_drive_indexing( # Get connector info for notification connector_result = await session.execute( - select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id) + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) ) connector = connector_result.scalar_one_or_none() @@ -1813,7 +1820,7 @@ async def run_google_drive_indexing( f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}", exc_info=True, ) - + # Update notification on exception if notification: try: diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 0988eb034..e410157c0 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -99,7 +99,9 @@ class BaseNotificationHandler: flag_modified(notification, "notification_metadata") await session.commit() await session.refresh(notification) - logger.info(f"Updated notification {notification.id} for operation {operation_id}") + logger.info( + f"Updated notification {notification.id} for operation {operation_id}" + ) return notification # Create new notification @@ -119,7 +121,9 @@ class BaseNotificationHandler: session.add(notification) await session.commit() await session.refresh(notification) - logger.info(f"Created notification {notification.id} for operation {operation_id}") + logger.info( + f"Created notification {notification.id} for operation {operation_id}" + ) return notification async def update_notification( @@ -153,9 +157,9 @@ class BaseNotificationHandler: if status is not None: notification.notification_metadata["status"] = status if status in ("completed", "failed"): - notification.notification_metadata["completed_at"] = ( - datetime.now(UTC).isoformat() - ) + notification.notification_metadata["completed_at"] = datetime.now( + UTC + ).isoformat() # Mark JSONB column as modified so SQLAlchemy detects the change flag_modified(notification, "notification_metadata") @@ -180,7 +184,10 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): super().__init__("connector_indexing") def _generate_operation_id( - self, connector_id: int, start_date: str | None = None, end_date: str | None = None + self, + connector_id: int, + start_date: str | None = None, + end_date: str | None = None, ) -> str: """ Generate a unique operation ID for a connector indexing operation. @@ -298,7 +305,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "processing": "Preparing for search", "storing": "Almost done", } - + # Use stage-based message if stage provided, otherwise fallback if stage or stage_message: progress_msg = stage_message or stage_messages.get(stage, "Processing") @@ -341,7 +348,9 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): Returns: Updated notification """ - connector_name = notification.notification_metadata.get("connector_name", "Connector") + connector_name = notification.notification_metadata.get( + "connector_name", "Connector" + ) if error_message: title = f"Failed: {connector_name}" @@ -414,7 +423,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "indexed_count": 0, "sync_stage": "connecting", } - + if folder_names: metadata["folder_names"] = folder_names if file_names: @@ -454,6 +463,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") # Create a short hash of filename to ensure uniqueness import hashlib + filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8] return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}" @@ -480,7 +490,9 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): Returns: Notification: The created notification """ - operation_id = self._generate_operation_id(document_type, document_name, search_space_id) + operation_id = self._generate_operation_id( + document_type, document_name, search_space_id + ) title = f"Processing: {document_name}" message = "Waiting in queue" @@ -489,7 +501,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): "document_name": document_name, "processing_stage": "queued", } - + if file_size is not None: metadata["file_size"] = file_size @@ -531,7 +543,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): "embedding": "Preparing for search", "storing": "Finalizing", } - + message = stage_message or stage_messages.get(stage, "Processing") metadata_updates = {"processing_stage": stage} @@ -568,7 +580,9 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): Returns: Updated notification """ - document_name = notification.notification_metadata.get("document_name", "Document") + document_name = notification.notification_metadata.get( + "document_name", "Document" + ) if error_message: title = f"Failed: {document_name}" @@ -583,7 +597,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): "processing_stage": "completed" if not error_message else "failed", "error_message": error_message, } - + if document_id is not None: metadata_updates["document_id"] = document_id # Store chunks_count in metadata for debugging, but don't show to user @@ -645,4 +659,3 @@ class NotificationService: await session.refresh(notification) logger.info(f"Created notification {notification.id} for user {user_id}") return notification - diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 669ea389f..d59ec95a2 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -92,12 +92,14 @@ async def _process_extension_document( page_title += "..." # Create notification for document processing - notification = await NotificationService.document_processing.notify_processing_started( - session=session, - user_id=UUID(user_id), - document_type="EXTENSION", - document_name=page_title, - search_space_id=search_space_id, + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="EXTENSION", + document_name=page_title, + search_space_id=search_space_id, + ) ) log_entry = await task_logger.log_task_start( @@ -115,7 +117,10 @@ async def _process_extension_document( try: # Update notification: parsing stage await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Reading page content" + session, + notification, + stage="parsing", + stage_message="Reading page content", ) result = await add_extension_received_document( @@ -130,11 +135,13 @@ async def _process_extension_document( ) # Update notification on success - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - document_id=result.id, - chunks_count=None, + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + document_id=result.id, + chunks_count=None, + ) ) else: await task_logger.log_task_success( @@ -144,10 +151,12 @@ async def _process_extension_document( ) # Update notification for duplicate - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message="Page already saved (duplicate)", + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Page already saved (duplicate)", + ) ) except Exception as e: await task_logger.log_task_failure( @@ -198,12 +207,14 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str): video_name = url.split("v=")[-1][:11] if "v=" in url else url # Create notification for document processing - notification = await NotificationService.document_processing.notify_processing_started( - session=session, - user_id=UUID(user_id), - document_type="YOUTUBE_VIDEO", - document_name=f"YouTube: {video_name}", - search_space_id=search_space_id, + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="YOUTUBE_VIDEO", + document_name=f"YouTube: {video_name}", + search_space_id=search_space_id, + ) ) log_entry = await task_logger.log_task_start( @@ -216,7 +227,10 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str): try: # Update notification: parsing (fetching transcript) await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Fetching video transcript" + session, + notification, + stage="parsing", + stage_message="Fetching video transcript", ) result = await add_youtube_video_document( @@ -235,11 +249,13 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str): ) # Update notification on success - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - document_id=result.id, - chunks_count=None, + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + document_id=result.id, + chunks_count=None, + ) ) else: await task_logger.log_task_success( @@ -249,10 +265,12 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str): ) # Update notification for duplicate - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message="Video already exists (duplicate)", + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Video already exists (duplicate)", + ) ) except Exception as e: await task_logger.log_task_failure( @@ -317,13 +335,15 @@ async def _process_file_upload( file_size = None # Create notification for document processing - notification = await NotificationService.document_processing.notify_processing_started( - session=session, - user_id=UUID(user_id), - document_type="FILE", - document_name=filename, - search_space_id=search_space_id, - file_size=file_size, + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="FILE", + document_name=filename, + search_space_id=search_space_id, + file_size=file_size, + ) ) log_entry = await task_logger.log_task_start( @@ -352,18 +372,22 @@ async def _process_file_upload( # Update notification on success if result: - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - document_id=result.id, - chunks_count=None, + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + document_id=result.id, + chunks_count=None, + ) ) else: # Duplicate detected - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message="Document already exists (duplicate)", + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Document already exists (duplicate)", + ) ) except Exception as e: @@ -456,12 +480,14 @@ async def _process_circleback_meeting( # Create notification if user_id is available notification = None if user_id: - notification = await NotificationService.document_processing.notify_processing_started( - session=session, - user_id=UUID(user_id), - document_type="CIRCLEBACK", - document_name=f"Meeting: {meeting_name[:40]}", - search_space_id=search_space_id, + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="CIRCLEBACK", + document_name=f"Meeting: {meeting_name[:40]}", + search_space_id=search_space_id, + ) ) log_entry = await task_logger.log_task_start( @@ -479,8 +505,13 @@ async def _process_circleback_meeting( try: # Update notification: parsing stage if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Reading meeting notes" + await ( + NotificationService.document_processing.notify_processing_progress( + session, + notification, + stage="parsing", + stage_message="Reading meeting notes", + ) ) result = await add_circleback_meeting_document( @@ -535,10 +566,12 @@ async def _process_circleback_meeting( # Update notification on failure if notification: - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=str(e)[:100], + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:100], + ) ) logger.error(f"Error processing Circleback meeting: {e!s}") diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index eecb1ac1a..2c501d9a3 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -476,15 +476,21 @@ async def process_file_in_background( log_entry: Log, connector: dict | None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}} - notification: Notification | None = None, # Optional notification for progress updates + notification: Notification + | None = None, # Optional notification for progress updates ) -> Document | None: try: # Check if the file is a markdown or text file if filename.lower().endswith((".md", ".markdown", ".txt")): # Update notification: parsing stage if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Reading file" + await ( + NotificationService.document_processing.notify_processing_progress( + session, + notification, + stage="parsing", + stage_message="Reading file", + ) ) await task_logger.log_task_progress( @@ -508,8 +514,10 @@ async def process_file_in_background( # Update notification: chunking stage if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="chunking" + await ( + NotificationService.document_processing.notify_processing_progress( + session, notification, stage="chunking" + ) ) await task_logger.log_task_progress( @@ -554,8 +562,13 @@ async def process_file_in_background( ): # Update notification: parsing stage (transcription) if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Transcribing audio" + await ( + NotificationService.document_processing.notify_processing_progress( + session, + notification, + stage="parsing", + stage_message="Transcribing audio", + ) ) await task_logger.log_task_progress( @@ -643,8 +656,10 @@ async def process_file_in_background( # Update notification: chunking stage if notification: - await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="chunking" + await ( + NotificationService.document_processing.notify_processing_progress( + session, notification, stage="chunking" + ) ) # Clean up the temp file @@ -749,7 +764,10 @@ async def process_file_in_background( # Update notification: parsing stage if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Extracting content" + session, + notification, + stage="parsing", + stage_message="Extracting content", ) await task_logger.log_task_progress( @@ -859,7 +877,10 @@ async def process_file_in_background( # Update notification: parsing stage if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Extracting content" + session, + notification, + stage="parsing", + stage_message="Extracting content", ) await task_logger.log_task_progress( @@ -904,7 +925,10 @@ async def process_file_in_background( # Update notification: chunking stage if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="chunking", chunks_count=len(markdown_documents) + session, + notification, + stage="chunking", + chunks_count=len(markdown_documents), ) await task_logger.log_task_progress( @@ -1018,7 +1042,10 @@ async def process_file_in_background( # Update notification: parsing stage if notification: await NotificationService.document_processing.notify_processing_progress( - session, notification, stage="parsing", stage_message="Extracting content" + session, + notification, + stage="parsing", + stage_message="Extracting content", ) await task_logger.log_task_progress( diff --git a/surfsense_web/components/assistant-ui/connector-popup.tsx b/surfsense_web/components/assistant-ui/connector-popup.tsx index 6d9d2390b..cde32cabe 100644 --- a/surfsense_web/components/assistant-ui/connector-popup.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup.tsx @@ -27,7 +27,7 @@ import { YouTubeCrawlerView } from "./connector-popup/views/youtube-crawler-view export const ConnectorIndicator: FC = () => { const searchSpaceId = useAtomValue(activeSearchSpaceIdAtom); const searchParams = useSearchParams(); - + // Fetch document type counts using Electric SQL + PGlite for real-time updates const { documentTypeCounts, loading: documentTypesLoading } = useDocumentsElectric(searchSpaceId); @@ -96,9 +96,10 @@ export const ConnectorIndicator: FC = () => { } = useConnectorsElectric(searchSpaceId); // Fallback to API if Electric fails or is not available - const connectors = connectorsFromElectric.length > 0 || !connectorsError - ? connectorsFromElectric - : allConnectors || []; + const connectors = + connectorsFromElectric.length > 0 || !connectorsError + ? connectorsFromElectric + : allConnectors || []; // Manual refresh function that works with both Electric and API const refreshConnectors = async () => { diff --git a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts index abc350431..2ac8d340a 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts @@ -5,17 +5,17 @@ import type { SearchSourceConnector } from "@/contracts/types/connector.types"; /** * Hook to track which connectors are currently indexing using local state. - * + * * This provides a better UX than polling by: * 1. Setting indexing state immediately when user triggers indexing (optimistic) * 2. Clearing indexing state when Electric SQL detects last_indexed_at changed - * + * * The actual `last_indexed_at` value comes from Electric SQL/PGlite, not local state. */ export function useIndexingConnectors(connectors: SearchSourceConnector[]) { // Set of connector IDs that are currently indexing const [indexingConnectorIds, setIndexingConnectorIds] = useState>(new Set()); - + // Track previous last_indexed_at values to detect changes const previousLastIndexedAtRef = useRef>(new Map()); @@ -79,4 +79,3 @@ export function useIndexingConnectors(connectors: SearchSourceConnector[]) { isIndexing, }; } - diff --git a/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx b/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx index ed4d377fb..4e4de39a4 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx @@ -63,7 +63,6 @@ export const ActiveConnectorsTab: FC = ({ return `${m.replace(/\.0$/, "")}M docs`; }; - // Document types that should be shown as standalone cards (not from connectors) const standaloneDocumentTypes = ["EXTENSION", "FILE", "NOTE", "YOUTUBE_VIDEO", "CRAWLED_URL"]; diff --git a/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx b/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx index 721b66f01..b78b50beb 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx @@ -49,7 +49,6 @@ export const AllConnectorsTab: FC = ({ onManage, onViewAccountsList, }) => { - // Filter connectors based on search const filteredOAuth = OAUTH_CONNECTORS.filter( (c) => diff --git a/surfsense_web/components/assistant-ui/document-upload-popup.tsx b/surfsense_web/components/assistant-ui/document-upload-popup.tsx index 453c6abde..29f633ebf 100644 --- a/surfsense_web/components/assistant-ui/document-upload-popup.tsx +++ b/surfsense_web/components/assistant-ui/document-upload-popup.tsx @@ -119,10 +119,7 @@ const DocumentUploadPopupContent: FC<{
- +
{/* Bottom fade shadow */} diff --git a/surfsense_web/components/assistant-ui/tooltip-icon-button.tsx b/surfsense_web/components/assistant-ui/tooltip-icon-button.tsx index b7a3ae7fe..154240cb4 100644 --- a/surfsense_web/components/assistant-ui/tooltip-icon-button.tsx +++ b/surfsense_web/components/assistant-ui/tooltip-icon-button.tsx @@ -27,11 +27,7 @@ export const TooltipIconButton = forwardRef{tooltip} - - {tooltip} - + {tooltip} ); } diff --git a/surfsense_web/components/notifications/NotificationButton.tsx b/surfsense_web/components/notifications/NotificationButton.tsx index ae6534394..a4785af44 100644 --- a/surfsense_web/components/notifications/NotificationButton.tsx +++ b/surfsense_web/components/notifications/NotificationButton.tsx @@ -13,7 +13,8 @@ import { cn } from "@/lib/utils"; export function NotificationButton() { const { data: user } = useAtomValue(currentUserAtom); const userId = user?.id ? String(user.id) : null; - const { notifications, unreadCount, loading, markAsRead, markAllAsRead } = useNotifications(userId); + const { notifications, unreadCount, loading, markAsRead, markAllAsRead } = + useNotifications(userId); return ( @@ -50,4 +51,3 @@ export function NotificationButton() { ); } - diff --git a/surfsense_web/components/notifications/NotificationPopup.tsx b/surfsense_web/components/notifications/NotificationPopup.tsx index 19823fdeb..f194aa394 100644 --- a/surfsense_web/components/notifications/NotificationPopup.tsx +++ b/surfsense_web/components/notifications/NotificationPopup.tsx @@ -41,7 +41,7 @@ export function NotificationPopup({ const getStatusIcon = (notification: Notification) => { const status = notification.metadata?.status as string | undefined; - + switch (status) { case "in_progress": return ; @@ -62,12 +62,7 @@ export function NotificationPopup({

Notifications

{unreadCount > 0 && ( - @@ -98,9 +93,7 @@ export function NotificationPopup({ )} >
-
- {getStatusIcon(notification)} -
+
{getStatusIcon(notification)}

(null) + const [initialized, setInitialized] = useState(false); + const [error, setError] = useState(null); useEffect(() => { // Skip if already initialized if (isElectricInitialized()) { - setInitialized(true) - return + setInitialized(true); + return; } - let mounted = true + let mounted = true; async function init() { try { - await initElectric() + await initElectric(); if (mounted) { - setInitialized(true) - setError(null) + setInitialized(true); + setError(null); } } catch (err) { - console.error('Failed to initialize Electric SQL:', err) + console.error("Failed to initialize Electric SQL:", err); if (mounted) { - setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL')) + setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL")); // Don't block rendering if Electric SQL fails - app can still work - setInitialized(true) + setInitialized(true); } } } - init() + init(); return () => { - mounted = false - } - }, []) + mounted = false; + }; + }, []); // Show loading state only briefly, then render children // Electric SQL will sync in the background @@ -57,13 +57,13 @@ export function ElectricProvider({ children }: ElectricProviderProps) {

Initializing...
- ) + ); } // If there's an error, still render children but log the error if (error) { - console.warn('Electric SQL initialization failed, notifications may not sync:', error.message) + console.warn("Electric SQL initialization failed, notifications may not sync:", error.message); } - return <>{children} + return <>{children}; } diff --git a/surfsense_web/contracts/types/notification.types.ts b/surfsense_web/contracts/types/notification.types.ts index cac49b09c..6a11e81b4 100644 --- a/surfsense_web/contracts/types/notification.types.ts +++ b/surfsense_web/contracts/types/notification.types.ts @@ -5,19 +5,12 @@ import { documentTypeEnum } from "./document.types"; /** * Notification type enum - matches backend notification types */ -export const notificationTypeEnum = z.enum([ - "connector_indexing", - "document_processing", -]); +export const notificationTypeEnum = z.enum(["connector_indexing", "document_processing"]); /** * Notification status enum - used in metadata */ -export const notificationStatusEnum = z.enum([ - "in_progress", - "completed", - "failed", -]); +export const notificationStatusEnum = z.enum(["in_progress", "completed", "failed"]); /** * Document processing stage enum @@ -125,4 +118,3 @@ export type NotificationMetadata = z.infer; export type Notification = z.infer; export type ConnectorIndexingNotification = z.infer; export type DocumentProcessingNotification = z.infer; - diff --git a/surfsense_web/hooks/use-connectors-electric.ts b/surfsense_web/hooks/use-connectors-electric.ts index 50c398d48..d750cfdf3 100644 --- a/surfsense_web/hooks/use-connectors-electric.ts +++ b/surfsense_web/hooks/use-connectors-electric.ts @@ -1,16 +1,21 @@ -"use client" +"use client"; -import { useEffect, useState, useCallback, useRef } from 'react' -import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client' -import type { SearchSourceConnector } from '@/contracts/types/connector.types' +import { useEffect, useState, useCallback, useRef } from "react"; +import { + initElectric, + isElectricInitialized, + type ElectricClient, + type SyncHandle, +} from "@/lib/electric/client"; +import type { SearchSourceConnector } from "@/contracts/types/connector.types"; export function useConnectorsElectric(searchSpaceId: number | string | null) { - const [electric, setElectric] = useState(null) - const [connectors, setConnectors] = useState([]) - const [loading, setLoading] = useState(true) - const [error, setError] = useState(null) - const syncHandleRef = useRef(null) - const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null) + const [electric, setElectric] = useState(null); + const [connectors, setConnectors] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); // Transform connector data from Electric SQL/PGlite to match expected format // Converts Date objects to ISO strings as expected by Zod schema @@ -18,184 +23,193 @@ export function useConnectorsElectric(searchSpaceId: number | string | null) { return { ...connector, last_indexed_at: connector.last_indexed_at - ? typeof connector.last_indexed_at === 'string' + ? typeof connector.last_indexed_at === "string" ? connector.last_indexed_at : new Date(connector.last_indexed_at).toISOString() : null, next_scheduled_at: connector.next_scheduled_at - ? typeof connector.next_scheduled_at === 'string' + ? typeof connector.next_scheduled_at === "string" ? connector.next_scheduled_at : new Date(connector.next_scheduled_at).toISOString() : null, created_at: connector.created_at - ? typeof connector.created_at === 'string' + ? typeof connector.created_at === "string" ? connector.created_at : new Date(connector.created_at).toISOString() : new Date().toISOString(), // fallback - } + }; } // Initialize Electric SQL and start syncing with real-time updates useEffect(() => { if (!searchSpaceId) { - setLoading(false) - setConnectors([]) - return + setLoading(false); + setConnectors([]); + return; } - let mounted = true + let mounted = true; async function init() { try { - const electricClient = await initElectric() - if (!mounted) return + const electricClient = await initElectric(); + if (!mounted) return; - setElectric(electricClient) + setElectric(electricClient); // Start syncing connectors for this search space via Electric SQL - console.log('Starting Electric SQL sync for connectors, search_space_id:', searchSpaceId) - + console.log("Starting Electric SQL sync for connectors, search_space_id:", searchSpaceId); + // Use numeric format for WHERE clause (PGlite sync plugin expects this format) const handle = await electricClient.syncShape({ - table: 'search_source_connectors', + table: "search_source_connectors", where: `search_space_id = ${searchSpaceId}`, - primaryKey: ['id'], - }) - - console.log('Electric SQL sync started for connectors:', { + primaryKey: ["id"], + }); + + console.log("Electric SQL sync started for connectors:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, hasInitialSyncPromise: !!handle.initialSyncPromise, - }) - + }); + // Optimized: Check if already up-to-date before waiting if (handle.isUpToDate) { - console.log('Connectors sync already up-to-date, skipping wait') + console.log("Connectors sync already up-to-date, skipping wait"); } else if (handle.initialSyncPromise) { // Only wait if not already up-to-date - console.log('Waiting for initial connectors sync to complete...') + console.log("Waiting for initial connectors sync to complete..."); try { // Use Promise.race with a shorter timeout to avoid long waits await Promise.race([ handle.initialSyncPromise, - new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait - ]) - console.log('Initial connectors sync promise resolved or timed out, checking status:', { + new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait + ]); + console.log("Initial connectors sync promise resolved or timed out, checking status:", { isUpToDate: handle.isUpToDate, - }) + }); } catch (syncErr) { - console.error('Initial connectors sync failed:', syncErr) + console.error("Initial connectors sync failed:", syncErr); } } - + // Check status after waiting - console.log('Connectors sync status after waiting:', { + console.log("Connectors sync status after waiting:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, - }) + }); if (!mounted) { - handle.unsubscribe() - return + handle.unsubscribe(); + return; } - syncHandleRef.current = handle - setLoading(false) - setError(null) + syncHandleRef.current = handle; + setLoading(false); + setError(null); // Fetch connectors after sync is complete (we already waited above) - await fetchConnectors(electricClient.db) + await fetchConnectors(electricClient.db); // Set up real-time updates using PGlite live queries // Electric SQL syncs data to PGlite in real-time via HTTP streaming // PGlite live queries detect when the synced data changes and trigger callbacks try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any - + const db = electricClient.db as any; + // Use PGlite's live query API for real-time updates // CORRECT API: await db.live.query() then use .subscribe() - if (db.live?.query && typeof db.live.query === 'function') { + if (db.live?.query && typeof db.live.query === "function") { // IMPORTANT: db.live.query() returns a Promise - must await it! const liveQuery = await db.live.query( `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, [searchSpaceId] - ) - + ); + if (!mounted) { - liveQuery.unsubscribe?.() - return + liveQuery.unsubscribe?.(); + return; } - + // Set initial results immediately from the resolved query if (liveQuery.initialResults?.rows) { - console.log('📋 Initial live query results for connectors:', liveQuery.initialResults.rows.length) - setConnectors(liveQuery.initialResults.rows.map(transformConnector)) + console.log( + "📋 Initial live query results for connectors:", + liveQuery.initialResults.rows.length + ); + setConnectors(liveQuery.initialResults.rows.map(transformConnector)); } else if (liveQuery.rows) { // Some versions have rows directly on the result - console.log('📋 Initial live query results for connectors (direct):', liveQuery.rows.length) - setConnectors(liveQuery.rows.map(transformConnector)) + console.log( + "📋 Initial live query results for connectors (direct):", + liveQuery.rows.length + ); + setConnectors(liveQuery.rows.map(transformConnector)); } - + // Subscribe to changes - this is the correct API! // The callback fires automatically when Electric SQL syncs new data to PGlite - if (typeof liveQuery.subscribe === 'function') { + if (typeof liveQuery.subscribe === "function") { liveQuery.subscribe((result: { rows: any[] }) => { if (mounted && result.rows) { - console.log('🔄 Connectors updated via live query:', result.rows.length) - setConnectors(result.rows.map(transformConnector)) + console.log("🔄 Connectors updated via live query:", result.rows.length); + setConnectors(result.rows.map(transformConnector)); } - }) - + }); + // Store unsubscribe function for cleanup - liveQueryRef.current = liveQuery + liveQueryRef.current = liveQuery; } } else { - console.warn('PGlite live query API not available, falling back to polling') + console.warn("PGlite live query API not available, falling back to polling"); } } catch (liveQueryErr) { - console.error('Failed to set up live query for connectors:', liveQueryErr) + console.error("Failed to set up live query for connectors:", liveQueryErr); // Don't fail completely - we still have the initial fetch } } catch (err) { - console.error('Failed to initialize Electric SQL for connectors:', err) + console.error("Failed to initialize Electric SQL for connectors:", err); if (mounted) { - setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL for connectors')) - setLoading(false) + setError( + err instanceof Error + ? err + : new Error("Failed to initialize Electric SQL for connectors") + ); + setLoading(false); } } } - init() + init(); return () => { - mounted = false - syncHandleRef.current?.unsubscribe?.() - liveQueryRef.current?.unsubscribe?.() - syncHandleRef.current = null - liveQueryRef.current = null - } - }, [searchSpaceId]) + mounted = false; + syncHandleRef.current?.unsubscribe?.(); + liveQueryRef.current?.unsubscribe?.(); + syncHandleRef.current = null; + liveQueryRef.current = null; + }; + }, [searchSpaceId]); async function fetchConnectors(db: any) { try { const result = await db.query( `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, [searchSpaceId] - ) - console.log('📋 Fetched connectors from PGlite:', result.rows?.length || 0) - setConnectors((result.rows || []).map(transformConnector)) + ); + console.log("📋 Fetched connectors from PGlite:", result.rows?.length || 0); + setConnectors((result.rows || []).map(transformConnector)); } catch (err) { - console.error('Failed to fetch connectors from PGlite:', err) + console.error("Failed to fetch connectors from PGlite:", err); } } // Manual refresh function (optional, for fallback) const refreshConnectors = useCallback(async () => { - if (!electric) return - await fetchConnectors(electric.db) - }, [electric]) + if (!electric) return; + await fetchConnectors(electric.db); + }, [electric]); - return { connectors, loading, error, refreshConnectors } + return { connectors, loading, error, refreshConnectors }; } - diff --git a/surfsense_web/hooks/use-documents-electric.ts b/surfsense_web/hooks/use-documents-electric.ts index a4b6f23c4..985b8c6c6 100644 --- a/surfsense_web/hooks/use-documents-electric.ts +++ b/surfsense_web/hooks/use-documents-electric.ts @@ -1,190 +1,201 @@ -"use client" +"use client"; -import { useEffect, useState, useRef, useMemo } from 'react' -import { initElectric, type ElectricClient, type SyncHandle } from '@/lib/electric/client' +import { useEffect, useState, useRef, useMemo } from "react"; +import { initElectric, type ElectricClient, type SyncHandle } from "@/lib/electric/client"; interface Document { - id: number - search_space_id: number - document_type: string - created_at: string + id: number; + search_space_id: number; + document_type: string; + created_at: string; } export function useDocumentsElectric(searchSpaceId: number | string | null) { - const [electric, setElectric] = useState(null) - const [documents, setDocuments] = useState([]) - const [loading, setLoading] = useState(true) - const [error, setError] = useState(null) - const syncHandleRef = useRef(null) - const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null) + const [electric, setElectric] = useState(null); + const [documents, setDocuments] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); // Calculate document type counts from synced documents const documentTypeCounts = useMemo(() => { - if (!documents.length) return {} - - const counts: Record = {} + if (!documents.length) return {}; + + const counts: Record = {}; for (const doc of documents) { - counts[doc.document_type] = (counts[doc.document_type] || 0) + 1 + counts[doc.document_type] = (counts[doc.document_type] || 0) + 1; } - return counts - }, [documents]) + return counts; + }, [documents]); // Initialize Electric SQL and start syncing with real-time updates useEffect(() => { if (!searchSpaceId) { - setLoading(false) - setDocuments([]) - return + setLoading(false); + setDocuments([]); + return; } - let mounted = true + let mounted = true; async function init() { try { - const electricClient = await initElectric() - if (!mounted) return + const electricClient = await initElectric(); + if (!mounted) return; - setElectric(electricClient) + setElectric(electricClient); // Start syncing documents for this search space via Electric SQL // Only sync id, document_type, search_space_id columns for efficiency - console.log('Starting Electric SQL sync for documents, search_space_id:', searchSpaceId) - + console.log("Starting Electric SQL sync for documents, search_space_id:", searchSpaceId); + const handle = await electricClient.syncShape({ - table: 'documents', + table: "documents", where: `search_space_id = ${searchSpaceId}`, - columns: ['id', 'document_type', 'search_space_id', 'created_at'], - primaryKey: ['id'], - }) - - console.log('Electric SQL sync started for documents:', { + columns: ["id", "document_type", "search_space_id", "created_at"], + primaryKey: ["id"], + }); + + console.log("Electric SQL sync started for documents:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, hasInitialSyncPromise: !!handle.initialSyncPromise, - }) - + }); + // Optimized: Check if already up-to-date before waiting if (handle.isUpToDate) { - console.log('Documents sync already up-to-date, skipping wait') + console.log("Documents sync already up-to-date, skipping wait"); } else if (handle.initialSyncPromise) { // Only wait if not already up-to-date - console.log('Waiting for initial documents sync to complete...') + console.log("Waiting for initial documents sync to complete..."); try { // Use Promise.race with a shorter timeout to avoid long waits await Promise.race([ handle.initialSyncPromise, - new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait - ]) - console.log('Initial documents sync promise resolved or timed out, checking status:', { + new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait + ]); + console.log("Initial documents sync promise resolved or timed out, checking status:", { isUpToDate: handle.isUpToDate, - }) + }); } catch (syncErr) { - console.error('Initial documents sync failed:', syncErr) + console.error("Initial documents sync failed:", syncErr); } } - + // Check status after waiting - console.log('Documents sync status after waiting:', { + console.log("Documents sync status after waiting:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, - }) + }); if (!mounted) { - handle.unsubscribe() - return + handle.unsubscribe(); + return; } - syncHandleRef.current = handle - setLoading(false) - setError(null) + syncHandleRef.current = handle; + setLoading(false); + setError(null); // Fetch documents after sync is complete (we already waited above) - await fetchDocuments(electricClient.db) + await fetchDocuments(electricClient.db); // Set up real-time updates using PGlite live queries // Electric SQL syncs data to PGlite in real-time via HTTP streaming // PGlite live queries detect when the synced data changes and trigger callbacks try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any - + const db = electricClient.db as any; + // Use PGlite's live query API for real-time updates // CORRECT API: await db.live.query() then use .subscribe() - if (db.live?.query && typeof db.live.query === 'function') { + if (db.live?.query && typeof db.live.query === "function") { // IMPORTANT: db.live.query() returns a Promise - must await it! const liveQuery = await db.live.query( `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, [searchSpaceId] - ) - + ); + if (!mounted) { - liveQuery.unsubscribe?.() - return + liveQuery.unsubscribe?.(); + return; } - + // Set initial results immediately from the resolved query if (liveQuery.initialResults?.rows) { - console.log('📋 Initial live query results for documents:', liveQuery.initialResults.rows.length) - setDocuments(liveQuery.initialResults.rows) + console.log( + "📋 Initial live query results for documents:", + liveQuery.initialResults.rows.length + ); + setDocuments(liveQuery.initialResults.rows); } else if (liveQuery.rows) { // Some versions have rows directly on the result - console.log('📋 Initial live query results for documents (direct):', liveQuery.rows.length) - setDocuments(liveQuery.rows) + console.log( + "📋 Initial live query results for documents (direct):", + liveQuery.rows.length + ); + setDocuments(liveQuery.rows); } - + // Subscribe to changes - this is the correct API! // The callback fires automatically when Electric SQL syncs new data to PGlite - if (typeof liveQuery.subscribe === 'function') { + if (typeof liveQuery.subscribe === "function") { liveQuery.subscribe((result: { rows: Document[] }) => { if (mounted && result.rows) { - console.log('🔄 Documents updated via live query:', result.rows.length) - setDocuments(result.rows) + console.log("🔄 Documents updated via live query:", result.rows.length); + setDocuments(result.rows); } - }) - + }); + // Store unsubscribe function for cleanup - liveQueryRef.current = liveQuery + liveQueryRef.current = liveQuery; } } else { - console.warn('PGlite live query API not available for documents, falling back to polling') + console.warn( + "PGlite live query API not available for documents, falling back to polling" + ); } } catch (liveQueryErr) { - console.error('Failed to set up live query for documents:', liveQueryErr) + console.error("Failed to set up live query for documents:", liveQueryErr); // Don't fail completely - we still have the initial fetch } } catch (err) { - console.error('Failed to initialize Electric SQL for documents:', err) + console.error("Failed to initialize Electric SQL for documents:", err); if (mounted) { - setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL for documents')) - setLoading(false) + setError( + err instanceof Error + ? err + : new Error("Failed to initialize Electric SQL for documents") + ); + setLoading(false); } } } - init() + init(); return () => { - mounted = false - syncHandleRef.current?.unsubscribe?.() - liveQueryRef.current?.unsubscribe?.() - syncHandleRef.current = null - liveQueryRef.current = null - } - }, [searchSpaceId]) + mounted = false; + syncHandleRef.current?.unsubscribe?.(); + liveQueryRef.current?.unsubscribe?.(); + syncHandleRef.current = null; + liveQueryRef.current = null; + }; + }, [searchSpaceId]); async function fetchDocuments(db: any) { try { const result = await db.query( `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, [searchSpaceId] - ) - console.log('📋 Fetched documents from PGlite:', result.rows?.length || 0) - setDocuments(result.rows || []) + ); + console.log("📋 Fetched documents from PGlite:", result.rows?.length || 0); + setDocuments(result.rows || []); } catch (err) { - console.error('Failed to fetch documents from PGlite:', err) + console.error("Failed to fetch documents from PGlite:", err); } } - return { documentTypeCounts, loading, error } + return { documentTypeCounts, loading, error }; } - diff --git a/surfsense_web/hooks/use-notifications.ts b/surfsense_web/hooks/use-notifications.ts index d077bd1d8..7e95b32ef 100644 --- a/surfsense_web/hooks/use-notifications.ts +++ b/surfsense_web/hooks/use-notifications.ts @@ -1,211 +1,226 @@ -"use client" +"use client"; -import { useEffect, useState, useCallback, useRef } from 'react' -import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client' -import type { Notification } from '@/contracts/types/notification.types' +import { useEffect, useState, useCallback, useRef } from "react"; +import { + initElectric, + isElectricInitialized, + type ElectricClient, + type SyncHandle, +} from "@/lib/electric/client"; +import type { Notification } from "@/contracts/types/notification.types"; -export type { Notification } from '@/contracts/types/notification.types' +export type { Notification } from "@/contracts/types/notification.types"; export function useNotifications(userId: string | null) { - const [electric, setElectric] = useState(null) - const [notifications, setNotifications] = useState([]) - const [loading, setLoading] = useState(true) - const [error, setError] = useState(null) - const syncHandleRef = useRef(null) - const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null) + const [electric, setElectric] = useState(null); + const [notifications, setNotifications] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); // Use ref instead of state to track initialization - prevents cleanup from running when set - const initializedRef = useRef(false) + const initializedRef = useRef(false); // Initialize Electric SQL and start syncing with real-time updates useEffect(() => { // Use ref to prevent re-initialization without triggering cleanup - if (!userId || initializedRef.current) return - initializedRef.current = true + if (!userId || initializedRef.current) return; + initializedRef.current = true; - let mounted = true + let mounted = true; async function init() { try { - const electricClient = await initElectric() - if (!mounted) return + const electricClient = await initElectric(); + if (!mounted) return; - setElectric(electricClient) + setElectric(electricClient); // Start syncing notifications for this user via Electric SQL // Note: user_id is stored as TEXT in PGlite (UUID from backend is converted) - console.log('Starting Electric SQL sync for user:', userId) - + console.log("Starting Electric SQL sync for user:", userId); + // Use string format for WHERE clause (PGlite sync plugin expects this format) // The user_id is a UUID string, so we need to quote it properly const handle = await electricClient.syncShape({ - table: 'notifications', + table: "notifications", where: `user_id = '${userId}'`, - primaryKey: ['id'], - }) - - console.log('Electric SQL sync started:', { + primaryKey: ["id"], + }); + + console.log("Electric SQL sync started:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, hasInitialSyncPromise: !!handle.initialSyncPromise, - }) - + }); + // Optimized: Check if already up-to-date before waiting if (handle.isUpToDate) { - console.log('Sync already up-to-date, skipping wait') + console.log("Sync already up-to-date, skipping wait"); } else if (handle.initialSyncPromise) { // Only wait if not already up-to-date - console.log('Waiting for initial sync to complete...') + console.log("Waiting for initial sync to complete..."); try { // Use Promise.race with a shorter timeout to avoid long waits await Promise.race([ handle.initialSyncPromise, - new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait - ]) - console.log('Initial sync promise resolved or timed out, checking status:', { + new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait + ]); + console.log("Initial sync promise resolved or timed out, checking status:", { isUpToDate: handle.isUpToDate, - }) + }); } catch (syncErr) { - console.error('Initial sync failed:', syncErr) + console.error("Initial sync failed:", syncErr); } } - + // Check status after waiting - console.log('Sync status after waiting:', { + console.log("Sync status after waiting:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, - }) + }); if (!mounted) { - handle.unsubscribe() - return + handle.unsubscribe(); + return; } - syncHandleRef.current = handle - setLoading(false) - setError(null) + syncHandleRef.current = handle; + setLoading(false); + setError(null); // Fetch notifications after sync is complete (we already waited above) - await fetchNotifications(electricClient.db) + await fetchNotifications(electricClient.db); // Set up real-time updates using PGlite live queries // Electric SQL syncs data to PGlite in real-time via HTTP streaming // PGlite live queries detect when the synced data changes and trigger callbacks try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any - + const db = electricClient.db as any; + // Use PGlite's live query API for real-time updates // CORRECT API: await db.live.query() then use .subscribe() - if (db.live?.query && typeof db.live.query === 'function') { + if (db.live?.query && typeof db.live.query === "function") { // IMPORTANT: db.live.query() returns a Promise - must await it! const liveQuery = await db.live.query( `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`, [userId] - ) - + ); + if (!mounted) { - liveQuery.unsubscribe?.() - return + liveQuery.unsubscribe?.(); + return; } - + // Set initial results immediately from the resolved query if (liveQuery.initialResults?.rows) { - console.log('📋 Initial live query results:', liveQuery.initialResults.rows.length) - setNotifications(liveQuery.initialResults.rows) + console.log("📋 Initial live query results:", liveQuery.initialResults.rows.length); + setNotifications(liveQuery.initialResults.rows); } else if (liveQuery.rows) { // Some versions have rows directly on the result - console.log('📋 Initial live query results (direct):', liveQuery.rows.length) - setNotifications(liveQuery.rows) + console.log("📋 Initial live query results (direct):", liveQuery.rows.length); + setNotifications(liveQuery.rows); } - + // Subscribe to changes - this is the correct API! // The callback fires automatically when Electric SQL syncs new data to PGlite - if (typeof liveQuery.subscribe === 'function') { + if (typeof liveQuery.subscribe === "function") { liveQuery.subscribe((result: { rows: Notification[] }) => { - console.log('🔔 Live query update received:', result.rows?.length || 0, 'notifications') + console.log( + "🔔 Live query update received:", + result.rows?.length || 0, + "notifications" + ); if (mounted && result.rows) { - setNotifications(result.rows) + setNotifications(result.rows); } - }) - console.log('✅ Real-time notifications enabled via PGlite live queries') + }); + console.log("✅ Real-time notifications enabled via PGlite live queries"); } else { - console.warn('⚠️ Live query subscribe method not available') + console.warn("⚠️ Live query subscribe method not available"); } - + // Store for cleanup - if (typeof liveQuery.unsubscribe === 'function') { - liveQueryRef.current = liveQuery + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; } } else { - console.error('❌ PGlite live queries not available - db.live.query is not a function') - console.log('db.live:', db.live) + console.error("❌ PGlite live queries not available - db.live.query is not a function"); + console.log("db.live:", db.live); } } catch (liveErr) { - console.error('❌ Failed to set up real-time updates:', liveErr) + console.error("❌ Failed to set up real-time updates:", liveErr); } } catch (err) { - if (!mounted) return - console.error('Failed to initialize Electric SQL:', err) - setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL')) + if (!mounted) return; + console.error("Failed to initialize Electric SQL:", err); + setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL")); // Still mark as loaded so the UI doesn't block - setLoading(false) + setLoading(false); } } - async function fetchNotifications(db: InstanceType) { + async function fetchNotifications( + db: InstanceType + ) { try { // Debug: Check all notifications first const allNotifications = await db.query( `SELECT * FROM notifications ORDER BY created_at DESC` - ) - console.log('All notifications in PGlite:', allNotifications.rows?.length || 0, allNotifications.rows) - + ); + console.log( + "All notifications in PGlite:", + allNotifications.rows?.length || 0, + allNotifications.rows + ); + // Use PGlite's query method (not exec for SELECT queries) const result = await db.query( `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`, [userId] - ) - console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows) - + ); + console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows); + if (mounted) { // PGlite query returns { rows: [] } format - setNotifications(result.rows || []) + setNotifications(result.rows || []); } } catch (err) { - console.error('Failed to fetch notifications:', err) + console.error("Failed to fetch notifications:", err); // Log more details for debugging - console.error('Error details:', err) + console.error("Error details:", err); } } - init() + init(); return () => { - mounted = false + mounted = false; // Reset initialization state so we can reinitialize with a new userId - initializedRef.current = false - setLoading(true) + initializedRef.current = false; + setLoading(true); if (syncHandleRef.current) { - syncHandleRef.current.unsubscribe() - syncHandleRef.current = null + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; } if (liveQueryRef.current) { - liveQueryRef.current.unsubscribe() - liveQueryRef.current = null + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; } - } - // Only depend on userId - using ref for initialization tracking to prevent cleanup issues - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [userId]) + }; + // Only depend on userId - using ref for initialization tracking to prevent cleanup issues + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [userId]); // Mark notification as read (local only - needs backend sync) const markAsRead = useCallback( async (notificationId: number) => { if (!electric || !isElectricInitialized()) { - console.warn('Electric SQL not initialized') - return false + console.warn("Electric SQL not initialized"); + return false; } try { @@ -213,46 +228,46 @@ export function useNotifications(userId: string | null) { await electric.db.query( `UPDATE notifications SET read = true, updated_at = NOW() WHERE id = $1`, [notificationId] - ) + ); // Update local state - setNotifications(prev => - prev.map(n => n.id === notificationId ? { ...n, read: true } : n) - ) + setNotifications((prev) => + prev.map((n) => (n.id === notificationId ? { ...n, read: true } : n)) + ); // TODO: Also send to backend to persist the change // This could be done via a REST API call - return true + return true; } catch (err) { - console.error('Failed to mark notification as read:', err) - return false + console.error("Failed to mark notification as read:", err); + return false; } }, [electric] - ) + ); // Mark all notifications as read const markAllAsRead = useCallback(async () => { if (!electric || !isElectricInitialized()) { - console.warn('Electric SQL not initialized') - return false + console.warn("Electric SQL not initialized"); + return false; } try { - const unread = notifications.filter(n => !n.read) + const unread = notifications.filter((n) => !n.read); for (const notification of unread) { - await markAsRead(notification.id) + await markAsRead(notification.id); } - return true + return true; } catch (err) { - console.error('Failed to mark all notifications as read:', err) - return false + console.error("Failed to mark all notifications as read:", err); + return false; } - }, [electric, notifications, markAsRead]) + }, [electric, notifications, markAsRead]); // Get unread count - const unreadCount = notifications.filter(n => !n.read).length + const unreadCount = notifications.filter((n) => !n.read).length; return { notifications, @@ -261,5 +276,5 @@ export function useNotifications(userId: string | null) { markAllAsRead, loading, error, - } + }; } diff --git a/surfsense_web/lib/electric/auth.ts b/surfsense_web/lib/electric/auth.ts index eb2ffba3f..2b65ba091 100644 --- a/surfsense_web/lib/electric/auth.ts +++ b/surfsense_web/lib/electric/auth.ts @@ -5,17 +5,16 @@ export async function getElectricAuthToken(): Promise { // For insecure mode (development), return empty string - if (process.env.NEXT_PUBLIC_ELECTRIC_AUTH_MODE === 'insecure') { - return '' + if (process.env.NEXT_PUBLIC_ELECTRIC_AUTH_MODE === "insecure") { + return ""; } // In production, get token from your auth system // This should match your backend auth token - if (typeof window !== 'undefined') { - const token = localStorage.getItem('surfsense_bearer_token') - return token || '' + if (typeof window !== "undefined") { + const token = localStorage.getItem("surfsense_bearer_token"); + return token || ""; } - return '' + return ""; } - diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 02a1af40e..4b888f12f 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -1,53 +1,53 @@ /** * Electric SQL client setup for ElectricSQL 1.x with PGlite - * + * * This uses the new ElectricSQL 1.x architecture: * - PGlite: In-browser PostgreSQL database (local storage) * - @electric-sql/pglite-sync: Sync plugin to sync Electric shapes into PGlite * - @electric-sql/client: HTTP client for subscribing to shapes */ -import { PGlite } from '@electric-sql/pglite' -import { electricSync } from '@electric-sql/pglite-sync' -import { live } from '@electric-sql/pglite/live' +import { PGlite } from "@electric-sql/pglite"; +import { electricSync } from "@electric-sql/pglite-sync"; +import { live } from "@electric-sql/pglite/live"; // Types export interface ElectricClient { - db: PGlite - syncShape: (options: SyncShapeOptions) => Promise + db: PGlite; + syncShape: (options: SyncShapeOptions) => Promise; } export interface SyncShapeOptions { - table: string - where?: string - columns?: string[] - primaryKey?: string[] + table: string; + where?: string; + columns?: string[]; + primaryKey?: string[]; } export interface SyncHandle { - unsubscribe: () => void - readonly isUpToDate: boolean + unsubscribe: () => void; + readonly isUpToDate: boolean; // The stream property contains the ShapeStreamInterface from pglite-sync - stream?: unknown + stream?: unknown; // Promise that resolves when initial sync is complete - initialSyncPromise?: Promise + initialSyncPromise?: Promise; } // Singleton instance -let electricClient: ElectricClient | null = null -let isInitializing = false -let initPromise: Promise | null = null +let electricClient: ElectricClient | null = null; +let isInitializing = false; +let initPromise: Promise | null = null; // Version for sync state - increment this to force fresh sync when Electric config changes // Incremented to v4 to fix sync completion issues -const SYNC_VERSION = 4 +const SYNC_VERSION = 4; // Get Electric URL from environment function getElectricUrl(): string { - if (typeof window !== 'undefined') { - return process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133' + if (typeof window !== "undefined") { + return process.env.NEXT_PUBLIC_ELECTRIC_URL || "http://localhost:5133"; } - return 'http://localhost:5133' + return "http://localhost:5133"; } /** @@ -55,14 +55,14 @@ function getElectricUrl(): string { */ export async function initElectric(): Promise { if (electricClient) { - return electricClient + return electricClient; } if (isInitializing && initPromise) { - return initPromise + return initPromise; } - isInitializing = true + isInitializing = true; initPromise = (async () => { try { // Create PGlite instance with Electric sync plugin and live queries @@ -75,7 +75,7 @@ export async function initElectric(): Promise { electric: electricSync({ debug: true }), live, // Enable live queries for real-time updates }, - }) + }); // Create the notifications table schema in PGlite // This matches the backend schema @@ -95,7 +95,7 @@ export async function initElectric(): Promise { CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notifications(user_id); CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(read); - `) + `); // Create the search_source_connectors table schema in PGlite // This matches the backend schema @@ -118,7 +118,7 @@ export async function initElectric(): Promise { CREATE INDEX IF NOT EXISTS idx_connectors_search_space_id ON search_source_connectors(search_space_id); CREATE INDEX IF NOT EXISTS idx_connectors_type ON search_source_connectors(connector_type); CREATE INDEX IF NOT EXISTS idx_connectors_user_id ON search_source_connectors(user_id); - `) + `); // Create the documents table schema in PGlite // Only sync minimal fields needed for type counts: id, document_type, search_space_id @@ -133,292 +133,309 @@ export async function initElectric(): Promise { CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents(search_space_id); CREATE INDEX IF NOT EXISTS idx_documents_type ON documents(document_type); CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type); - `) + `); - const electricUrl = getElectricUrl() + const electricUrl = getElectricUrl(); // Create the client wrapper electricClient = { db, syncShape: async (options: SyncShapeOptions): Promise => { - const { table, where, columns, primaryKey = ['id'] } = options + const { table, where, columns, primaryKey = ["id"] } = options; - // Build params for the shape request - // Electric SQL expects params as URL query parameters - const params: Record = { table } - - // Validate and fix WHERE clause to ensure string literals are properly quoted - let validatedWhere = where - if (where) { - // Check if where uses positional parameters - if (where.includes('$1')) { - // Extract the value from the where clause if it's embedded - // For now, we'll use the where clause as-is and let Electric handle it - params.where = where - validatedWhere = where - } else { - // Validate that string literals are properly quoted - // Count single quotes - should be even (pairs) for properly quoted strings - const singleQuoteCount = (where.match(/'/g) || []).length - - if (singleQuoteCount % 2 !== 0) { - // Odd number of quotes means unterminated string literal - console.warn('Where clause has unmatched quotes, fixing:', where) - // Add closing quote at the end - validatedWhere = `${where}'` - params.where = validatedWhere + // Build params for the shape request + // Electric SQL expects params as URL query parameters + const params: Record = { table }; + + // Validate and fix WHERE clause to ensure string literals are properly quoted + let validatedWhere = where; + if (where) { + // Check if where uses positional parameters + if (where.includes("$1")) { + // Extract the value from the where clause if it's embedded + // For now, we'll use the where clause as-is and let Electric handle it + params.where = where; + validatedWhere = where; } else { - // Use the where clause directly (already formatted) - params.where = where - validatedWhere = where + // Validate that string literals are properly quoted + // Count single quotes - should be even (pairs) for properly quoted strings + const singleQuoteCount = (where.match(/'/g) || []).length; + + if (singleQuoteCount % 2 !== 0) { + // Odd number of quotes means unterminated string literal + console.warn("Where clause has unmatched quotes, fixing:", where); + // Add closing quote at the end + validatedWhere = `${where}'`; + params.where = validatedWhere; + } else { + // Use the where clause directly (already formatted) + params.where = where; + validatedWhere = where; + } } } - } - - if (columns) params.columns = columns.join(',') - console.log('Syncing shape with params:', params) - console.log('Electric URL:', `${electricUrl}/v1/shape`) - console.log('Where clause:', where, 'Validated:', validatedWhere) + if (columns) params.columns = columns.join(","); + + console.log("Syncing shape with params:", params); + console.log("Electric URL:", `${electricUrl}/v1/shape`); + console.log("Where clause:", where, "Validated:", validatedWhere); try { // Debug: Test Electric SQL connection directly first // Use validatedWhere to ensure proper URL encoding - const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ''}` - console.log('Testing Electric SQL directly:', testUrl) + const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`; + console.log("Testing Electric SQL directly:", testUrl); try { - const testResponse = await fetch(testUrl) + const testResponse = await fetch(testUrl); const testHeaders = { - handle: testResponse.headers.get('electric-handle'), - offset: testResponse.headers.get('electric-offset'), - upToDate: testResponse.headers.get('electric-up-to-date'), - } - console.log('Direct Electric SQL response headers:', testHeaders) - const testData = await testResponse.json() - console.log('Direct Electric SQL data count:', Array.isArray(testData) ? testData.length : 'not array', testData) + handle: testResponse.headers.get("electric-handle"), + offset: testResponse.headers.get("electric-offset"), + upToDate: testResponse.headers.get("electric-up-to-date"), + }; + console.log("Direct Electric SQL response headers:", testHeaders); + const testData = await testResponse.json(); + console.log( + "Direct Electric SQL data count:", + Array.isArray(testData) ? testData.length : "not array", + testData + ); } catch (testErr) { - console.error('Direct Electric SQL test failed:', testErr) + console.error("Direct Electric SQL test failed:", testErr); } // Use PGlite's electric sync plugin to sync the shape - // According to Electric SQL docs, the shape config uses params for table, where, columns - // Note: mapColumns is OPTIONAL per pglite-sync types.ts - - // Create a promise that resolves when initial sync is complete - // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout - // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates - let resolveInitialSync: () => void - let rejectInitialSync: (error: Error) => void - let syncResolved = false - - const initialSyncPromise = new Promise((resolve, reject) => { - resolveInitialSync = () => { - if (!syncResolved) { - syncResolved = true - // DON'T unsubscribe from stream - it needs to stay active for real-time updates - resolve() - } - } - rejectInitialSync = (error: Error) => { - if (!syncResolved) { - syncResolved = true - // DON'T unsubscribe from stream even on error - let Electric handle it - reject(error) - } - } - - // Shorter timeout (5 seconds) as fallback - const timeoutId = setTimeout(() => { - if (!syncResolved) { - console.warn(`⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`) - // Check isUpToDate one more time before resolving - // This will be checked after shape is created - setTimeout(() => { - if (!syncResolved) { - console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`) - resolveInitialSync() - } - }, 100) - } - }, 5000) - - // Store timeout ID for cleanup if needed - // Note: timeout will be cleared if sync completes early - }) - - const shapeConfig = { - shape: { - url: `${electricUrl}/v1/shape`, - params: { - table, - ...(validatedWhere ? { where: validatedWhere } : {}), - ...(columns ? { columns: columns.join(',') } : {}), + // According to Electric SQL docs, the shape config uses params for table, where, columns + // Note: mapColumns is OPTIONAL per pglite-sync types.ts + + // Create a promise that resolves when initial sync is complete + // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout + // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates + let resolveInitialSync: () => void; + let rejectInitialSync: (error: Error) => void; + let syncResolved = false; + + const initialSyncPromise = new Promise((resolve, reject) => { + resolveInitialSync = () => { + if (!syncResolved) { + syncResolved = true; + // DON'T unsubscribe from stream - it needs to stay active for real-time updates + resolve(); + } + }; + rejectInitialSync = (error: Error) => { + if (!syncResolved) { + syncResolved = true; + // DON'T unsubscribe from stream even on error - let Electric handle it + reject(error); + } + }; + + // Shorter timeout (5 seconds) as fallback + const timeoutId = setTimeout(() => { + if (!syncResolved) { + console.warn( + `⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` + ); + // Check isUpToDate one more time before resolving + // This will be checked after shape is created + setTimeout(() => { + if (!syncResolved) { + console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`); + resolveInitialSync(); + } + }, 100); + } + }, 5000); + + // Store timeout ID for cleanup if needed + // Note: timeout will be cleared if sync completes early + }); + + const shapeConfig = { + shape: { + url: `${electricUrl}/v1/shape`, + params: { + table, + ...(validatedWhere ? { where: validatedWhere } : {}), + ...(columns ? { columns: columns.join(",") } : {}), + }, }, - }, - table, - primaryKey, - shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, '_') || 'all'}`, // Versioned key to force fresh sync when needed - onInitialSync: () => { - console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`) - resolveInitialSync() - }, - onError: (error: Error) => { - console.error(`❌ Shape sync error for ${table}:`, error) - console.error('Error details:', JSON.stringify(error, Object.getOwnPropertyNames(error))) - rejectInitialSync(error) - }, - } - - console.log('syncShapeToTable config:', JSON.stringify(shapeConfig, null, 2)) - - // Type assertion to PGlite with electric extension - const pgWithElectric = db as PGlite & { electric: { syncShapeToTable: (config: typeof shapeConfig) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }> } } - const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig) + table, + primaryKey, + shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // Versioned key to force fresh sync when needed + onInitialSync: () => { + console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`); + resolveInitialSync(); + }, + onError: (error: Error) => { + console.error(`❌ Shape sync error for ${table}:`, error); + console.error( + "Error details:", + JSON.stringify(error, Object.getOwnPropertyNames(error)) + ); + rejectInitialSync(error); + }, + }; - if (!shape) { - throw new Error('syncShapeToTable returned undefined') - } + console.log("syncShapeToTable config:", JSON.stringify(shapeConfig, null, 2)); - // Log the actual shape result structure - console.log('Shape sync result (initial):', { - hasUnsubscribe: typeof shape?.unsubscribe === 'function', - isUpToDate: shape?.isUpToDate, - hasStream: !!shape?.stream, - streamType: typeof shape?.stream, - }) - - // Recommended Approach Step 1: Check isUpToDate immediately - if (shape.isUpToDate) { - console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`) - resolveInitialSync() - } else { - // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message - if (shape?.stream) { - const stream = shape.stream as any - console.log('Shape stream details:', { - shapeHandle: stream?.shapeHandle, - lastOffset: stream?.lastOffset, - isUpToDate: stream?.isUpToDate, - error: stream?.error, - hasSubscribe: typeof stream?.subscribe === 'function', - hasUnsubscribe: typeof stream?.unsubscribe === 'function', - }) - - // Subscribe to the stream to watch for "up-to-date" control message - // NOTE: We keep this subscription active - don't unsubscribe! - // The stream is what Electric SQL uses for real-time updates - if (typeof stream?.subscribe === 'function') { - console.log('Subscribing to shape stream to watch for up-to-date message...') - // Subscribe but don't store unsubscribe - we want it to stay active - stream.subscribe((messages: unknown[]) => { - // Continue receiving updates even after sync is resolved - if (!syncResolved) { - console.log('🔵 Shape stream received messages:', messages?.length || 0) - } - - // Check if any message indicates sync is complete - if (messages && messages.length > 0) { - for (const message of messages) { - const msg = message as any - // Check for "up-to-date" control message - if (msg?.headers?.control === 'up-to-date' || - msg?.headers?.electric_up_to_date === 'true' || - (typeof msg === 'object' && 'up-to-date' in msg)) { - if (!syncResolved) { - console.log(`✅ Received up-to-date message for ${table}`) - resolveInitialSync() + // Type assertion to PGlite with electric extension + const pgWithElectric = db as PGlite & { + electric: { + syncShapeToTable: ( + config: typeof shapeConfig + ) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>; + }; + }; + const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); + + if (!shape) { + throw new Error("syncShapeToTable returned undefined"); + } + + // Log the actual shape result structure + console.log("Shape sync result (initial):", { + hasUnsubscribe: typeof shape?.unsubscribe === "function", + isUpToDate: shape?.isUpToDate, + hasStream: !!shape?.stream, + streamType: typeof shape?.stream, + }); + + // Recommended Approach Step 1: Check isUpToDate immediately + if (shape.isUpToDate) { + console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`); + resolveInitialSync(); + } else { + // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message + if (shape?.stream) { + const stream = shape.stream as any; + console.log("Shape stream details:", { + shapeHandle: stream?.shapeHandle, + lastOffset: stream?.lastOffset, + isUpToDate: stream?.isUpToDate, + error: stream?.error, + hasSubscribe: typeof stream?.subscribe === "function", + hasUnsubscribe: typeof stream?.unsubscribe === "function", + }); + + // Subscribe to the stream to watch for "up-to-date" control message + // NOTE: We keep this subscription active - don't unsubscribe! + // The stream is what Electric SQL uses for real-time updates + if (typeof stream?.subscribe === "function") { + console.log("Subscribing to shape stream to watch for up-to-date message..."); + // Subscribe but don't store unsubscribe - we want it to stay active + stream.subscribe((messages: unknown[]) => { + // Continue receiving updates even after sync is resolved + if (!syncResolved) { + console.log("🔵 Shape stream received messages:", messages?.length || 0); + } + + // Check if any message indicates sync is complete + if (messages && messages.length > 0) { + for (const message of messages) { + const msg = message as any; + // Check for "up-to-date" control message + if ( + msg?.headers?.control === "up-to-date" || + msg?.headers?.electric_up_to_date === "true" || + (typeof msg === "object" && "up-to-date" in msg) + ) { + if (!syncResolved) { + console.log(`✅ Received up-to-date message for ${table}`); + resolveInitialSync(); + } + // Continue listening for real-time updates - don't return! } - // Continue listening for real-time updates - don't return! + } + if (!syncResolved && messages.length > 0) { + console.log("First message:", JSON.stringify(messages[0], null, 2)); } } - if (!syncResolved && messages.length > 0) { - console.log('First message:', JSON.stringify(messages[0], null, 2)) - } - } - - // Also check stream's isUpToDate property after receiving messages - if (!syncResolved && stream?.isUpToDate) { - console.log(`✅ Stream isUpToDate is true for ${table}`) - resolveInitialSync() - } - }) - - // Also check stream's isUpToDate property immediately - if (stream?.isUpToDate) { - console.log(`✅ Stream isUpToDate is true immediately for ${table}`) - resolveInitialSync() - } - } - - // Also poll isUpToDate periodically as a backup (every 200ms) - const pollInterval = setInterval(() => { - if (syncResolved) { - clearInterval(pollInterval) - return - } - - if (shape.isUpToDate || stream?.isUpToDate) { - console.log(`✅ Sync completed (detected via polling) for ${table}`) - clearInterval(pollInterval) - resolveInitialSync() - } - }, 200) - - // Clean up polling when promise resolves - initialSyncPromise.finally(() => { - clearInterval(pollInterval) - }) - } else { - console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`) - } - } - // Return the shape handle - isUpToDate is a getter that reflects current state - return { - unsubscribe: () => { - console.log('unsubscribing') - if (shape && typeof shape.unsubscribe === 'function') { - shape.unsubscribe() + // Also check stream's isUpToDate property after receiving messages + if (!syncResolved && stream?.isUpToDate) { + console.log(`✅ Stream isUpToDate is true for ${table}`); + resolveInitialSync(); + } + }); + + // Also check stream's isUpToDate property immediately + if (stream?.isUpToDate) { + console.log(`✅ Stream isUpToDate is true immediately for ${table}`); + resolveInitialSync(); + } + } + + // Also poll isUpToDate periodically as a backup (every 200ms) + const pollInterval = setInterval(() => { + if (syncResolved) { + clearInterval(pollInterval); + return; + } + + if (shape.isUpToDate || stream?.isUpToDate) { + console.log(`✅ Sync completed (detected via polling) for ${table}`); + clearInterval(pollInterval); + resolveInitialSync(); + } + }, 200); + + // Clean up polling when promise resolves + initialSyncPromise.finally(() => { + clearInterval(pollInterval); + }); + } else { + console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`); } - }, - // Use getter to always return current state - get isUpToDate() { - return shape?.isUpToDate ?? false - }, - stream: shape?.stream, - initialSyncPromise, // Expose promise so callers can wait for sync - } + } + + // Return the shape handle - isUpToDate is a getter that reflects current state + return { + unsubscribe: () => { + console.log("unsubscribing"); + if (shape && typeof shape.unsubscribe === "function") { + shape.unsubscribe(); + } + }, + // Use getter to always return current state + get isUpToDate() { + return shape?.isUpToDate ?? false; + }, + stream: shape?.stream, + initialSyncPromise, // Expose promise so callers can wait for sync + }; } catch (error) { - console.error('Failed to sync shape:', error) + console.error("Failed to sync shape:", error); // Check if Electric SQL server is reachable try { const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { - method: 'GET', - }) - console.log('Electric SQL server response:', response.status, response.statusText) + method: "GET", + }); + console.log("Electric SQL server response:", response.status, response.statusText); if (!response.ok) { - console.error('Electric SQL server error:', await response.text()) + console.error("Electric SQL server error:", await response.text()); } } catch (fetchError) { - console.error('Cannot reach Electric SQL server:', fetchError) - console.error('Make sure Electric SQL is running at:', electricUrl) + console.error("Cannot reach Electric SQL server:", fetchError); + console.error("Make sure Electric SQL is running at:", electricUrl); } - throw error + throw error; } }, - } + }; - console.log('Electric SQL initialized successfully with PGlite') - return electricClient + console.log("Electric SQL initialized successfully with PGlite"); + return electricClient; } catch (error) { - console.error('Failed to initialize Electric SQL:', error) - throw error + console.error("Failed to initialize Electric SQL:", error); + throw error; } finally { - isInitializing = false + isInitializing = false; } - })() + })(); - return initPromise + return initPromise; } /** @@ -426,21 +443,21 @@ export async function initElectric(): Promise { */ export function getElectric(): ElectricClient { if (!electricClient) { - throw new Error('Electric not initialized. Call initElectric() first.') + throw new Error("Electric not initialized. Call initElectric() first."); } - return electricClient + return electricClient; } /** * Check if Electric is initialized */ export function isElectricInitialized(): boolean { - return electricClient !== null + return electricClient !== null; } /** * Get the PGlite database instance */ export function getDb(): PGlite | null { - return electricClient?.db ?? null + return electricClient?.db ?? null; }