diff --git a/surfsense_backend/app/agents/new_chat/tools/mcp_client.py b/surfsense_backend/app/agents/new_chat/tools/mcp_client.py index 56f85b361..9ec4b5fbf 100644 --- a/surfsense_backend/app/agents/new_chat/tools/mcp_client.py +++ b/surfsense_backend/app/agents/new_chat/tools/mcp_client.py @@ -77,7 +77,7 @@ class MCPClient: # Initialize the connection await session.initialize() self.session = session - + if attempt > 0: logger.info( "Connected to MCP server on attempt %d: %s %s", @@ -267,30 +267,40 @@ async def test_mcp_http_connection( """ try: - logger.info("Testing HTTP MCP connection to: %s (transport: %s)", url, transport) - + logger.info( + "Testing HTTP MCP connection to: %s (transport: %s)", url, transport + ) + # Use streamable HTTP client for all HTTP-based transports - async with streamablehttp_client(url, headers=headers or {}) as (read, write, _): - async with ClientSession(read, write) as session: - await session.initialize() - - # List available tools - response = await session.list_tools() - tools = [] - for tool in response.tools: - tools.append({ + async with ( + streamablehttp_client(url, headers=headers or {}) as (read, write, _), + ClientSession(read, write) as session, + ): + await session.initialize() + + # List available tools + response = await session.list_tools() + tools = [] + for tool in response.tools: + tools.append( + { "name": tool.name, "description": tool.description or "", - "input_schema": tool.inputSchema if hasattr(tool, "inputSchema") else {}, - }) - - logger.info("HTTP MCP connection successful. Found %d tools.", len(tools)) - return { - "status": "success", - "message": f"Connected successfully. Found {len(tools)} tools.", - "tools": tools, - } - + "input_schema": tool.inputSchema + if hasattr(tool, "inputSchema") + else {}, + } + ) + + logger.info( + "HTTP MCP connection successful. Found %d tools.", len(tools) + ) + return { + "status": "success", + "message": f"Connected successfully. Found {len(tools)} tools.", + "tools": tools, + } + except Exception as e: logger.error("Failed to connect to HTTP MCP server: %s", e, exc_info=True) return { diff --git a/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py b/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py index 4cb85f4cc..5ccd2e749 100644 --- a/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py +++ b/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py @@ -160,27 +160,31 @@ async def _create_mcp_tool_from_definition_http( logger.info(f"MCP HTTP tool '{tool_name}' called with params: {kwargs}") try: - async with streamablehttp_client(url, headers=headers) as (read, write, _): - async with ClientSession(read, write) as session: - await session.initialize() - - # Call the tool - response = await session.call_tool(tool_name, arguments=kwargs) - - # Extract content from response - result = [] - for content in response.content: - if hasattr(content, "text"): - result.append(content.text) - elif hasattr(content, "data"): - result.append(str(content.data)) - else: - result.append(str(content)) - - result_str = "\n".join(result) if result else "" - logger.info(f"MCP HTTP tool '{tool_name}' succeeded: {result_str[:200]}") - return result_str - + async with ( + streamablehttp_client(url, headers=headers) as (read, write, _), + ClientSession(read, write) as session, + ): + await session.initialize() + + # Call the tool + response = await session.call_tool(tool_name, arguments=kwargs) + + # Extract content from response + result = [] + for content in response.content: + if hasattr(content, "text"): + result.append(content.text) + elif hasattr(content, "data"): + result.append(str(content.data)) + else: + result.append(str(content)) + + result_str = "\n".join(result) if result else "" + logger.info( + f"MCP HTTP tool '{tool_name}' succeeded: {result_str[:200]}" + ) + return result_str + except Exception as e: error_msg = f"MCP HTTP tool '{tool_name}' execution failed: {e!s}" logger.exception(error_msg) @@ -192,7 +196,11 @@ async def _create_mcp_tool_from_definition_http( description=tool_description, coroutine=mcp_http_tool_call, args_schema=input_model, - metadata={"mcp_input_schema": input_schema, "mcp_transport": "http", "mcp_url": url}, + metadata={ + "mcp_input_schema": input_schema, + "mcp_transport": "http", + "mcp_url": url, + }, ) logger.info(f"Created MCP tool (HTTP): '{tool_name}'") @@ -205,17 +213,17 @@ async def _load_stdio_mcp_tools( server_config: dict[str, Any], ) -> list[StructuredTool]: """Load tools from a stdio-based MCP server. - + Args: connector_id: Connector ID for logging connector_name: Connector name for logging server_config: Server configuration with command, args, env - + Returns: List of tools from the MCP server """ tools: list[StructuredTool] = [] - + # Validate required command field command = server_config.get("command") if not command or not isinstance(command, str): @@ -262,7 +270,7 @@ async def _load_stdio_mcp_tools( f"Failed to create tool '{tool_def.get('name')}' " f"from connector {connector_id}: {e!s}" ) - + return tools @@ -272,17 +280,17 @@ async def _load_http_mcp_tools( server_config: dict[str, Any], ) -> list[StructuredTool]: """Load tools from an HTTP-based MCP server. - + Args: connector_id: Connector ID for logging connector_name: Connector name for logging server_config: Server configuration with url, headers - + Returns: List of tools from the MCP server """ tools: list[StructuredTool] = [] - + # Validate required url field url = server_config.get("url") if not url or not isinstance(url, str): @@ -301,41 +309,49 @@ async def _load_http_mcp_tools( # Connect and discover tools via HTTP try: - async with streamablehttp_client(url, headers=headers) as (read, write, _): - async with ClientSession(read, write) as session: - await session.initialize() - - # List available tools - response = await session.list_tools() - tool_definitions = [] - for tool in response.tools: - tool_definitions.append({ + async with ( + streamablehttp_client(url, headers=headers) as (read, write, _), + ClientSession(read, write) as session, + ): + await session.initialize() + + # List available tools + response = await session.list_tools() + tool_definitions = [] + for tool in response.tools: + tool_definitions.append( + { "name": tool.name, "description": tool.description or "", - "input_schema": tool.inputSchema if hasattr(tool, "inputSchema") else {}, - }) - - logger.info( - f"Discovered {len(tool_definitions)} tools from HTTP MCP server " - f"'{url}' (connector {connector_id})" + "input_schema": tool.inputSchema + if hasattr(tool, "inputSchema") + else {}, + } ) + logger.info( + f"Discovered {len(tool_definitions)} tools from HTTP MCP server " + f"'{url}' (connector {connector_id})" + ) + # Create LangChain tools from definitions for tool_def in tool_definitions: try: - tool = await _create_mcp_tool_from_definition_http(tool_def, url, headers) + tool = await _create_mcp_tool_from_definition_http( + tool_def, url, headers + ) tools.append(tool) except Exception as e: logger.exception( f"Failed to create HTTP tool '{tool_def.get('name')}' " f"from connector {connector_id}: {e!s}" ) - + except Exception as e: logger.exception( f"Failed to connect to HTTP MCP server at '{url}' (connector {connector_id}): {e!s}" ) - + return tools @@ -372,7 +388,7 @@ async def load_mcp_tools( # Early validation: Extract and validate connector config config = connector.config or {} server_config = config.get("server_config", {}) - + # Validate server_config exists and is a dict if not server_config or not isinstance(server_config, dict): logger.warning( @@ -382,7 +398,7 @@ async def load_mcp_tools( # Determine transport type transport = server_config.get("transport", "stdio") - + if transport in ("streamable-http", "http", "sse"): # HTTP-based MCP server connector_tools = await _load_http_mcp_tools( @@ -393,9 +409,9 @@ async def load_mcp_tools( connector_tools = await _load_stdio_mcp_tools( connector.id, connector.name, server_config ) - + tools.extend(connector_tools) - + except Exception as e: logger.exception( f"Failed to load tools from MCP connector {connector.id}: {e!s}" diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index dedef0ea9..f6319653f 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -2403,25 +2403,29 @@ async def test_mcp_server_connection( ) transport = server_config.get("transport", "stdio") - + # HTTP transport (streamable-http, http, sse) if transport in ("streamable-http", "http", "sse"): url = server_config.get("url") headers = server_config.get("headers", {}) - + if not url: - raise HTTPException(status_code=400, detail="Server URL is required for HTTP transport") - + raise HTTPException( + status_code=400, detail="Server URL is required for HTTP transport" + ) + result = await test_mcp_http_connection(url, headers, transport) return result - + # stdio transport (default) command = server_config.get("command") args = server_config.get("args", []) env = server_config.get("env", {}) if not command: - raise HTTPException(status_code=400, detail="Server command is required for stdio transport") + raise HTTPException( + status_code=400, detail="Server command is required for stdio transport" + ) # Test the connection result = await test_mcp_connection(command, args, env) diff --git a/surfsense_backend/app/schemas/search_source_connector.py b/surfsense_backend/app/schemas/search_source_connector.py index ddffdc969..b8ff3e649 100644 --- a/surfsense_backend/app/schemas/search_source_connector.py +++ b/surfsense_backend/app/schemas/search_source_connector.py @@ -84,7 +84,7 @@ class SearchSourceConnectorRead(SearchSourceConnectorBase, IDModel, TimestampMod class MCPServerConfig(BaseModel): """Configuration for an MCP server connection. - + Supports two transport types: - stdio: Local process (command, args, env) - streamable-http/http/sse: Remote HTTP server (url, headers) @@ -94,13 +94,13 @@ class MCPServerConfig(BaseModel): command: str | None = None # e.g., "uvx", "node", "python" args: list[str] = [] # e.g., ["mcp-server-git", "--repository", "/path"] env: dict[str, str] = {} # Environment variables for the server process - + # HTTP transport fields url: str | None = None # e.g., "https://mcp-server.com/mcp" headers: dict[str, str] = {} # HTTP headers for authentication - + transport: str = "stdio" # "stdio" | "streamable-http" | "http" | "sse" - + def is_http_transport(self) -> bool: """Check if this config uses HTTP transport.""" return self.transport in ("streamable-http", "http", "sse") diff --git a/surfsense_web/components/assistant-ui/assistant-message.tsx b/surfsense_web/components/assistant-ui/assistant-message.tsx index 106596403..681dc315a 100644 --- a/surfsense_web/components/assistant-ui/assistant-message.tsx +++ b/surfsense_web/components/assistant-ui/assistant-message.tsx @@ -204,7 +204,9 @@ export const AssistantMessage: FC = () => { > {hasComments ? ( - {commentCount} {commentCount === 1 ? "comment" : "comments"} + + {commentCount} {commentCount === 1 ? "comment" : "comments"} + ) : ( Add comment )} diff --git a/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx index 1d52f0182..b7c68734a 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx @@ -1,7 +1,7 @@ "use client"; -import type { FC } from "react"; import { AlertCircle } from "lucide-react"; +import type { FC } from "react"; import { Label } from "@/components/ui/label"; import { Select, diff --git a/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx b/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx index c1a1af5a1..9ece079f3 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx @@ -7,13 +7,13 @@ import { Button } from "@/components/ui/button"; import { Label } from "@/components/ui/label"; import { Textarea } from "@/components/ui/textarea"; import { EnumConnectorName } from "@/contracts/enums/connector"; -import type { ConnectFormProps } from ".."; import { extractServerName, + type MCPConnectionTestResult, parseMCPConfig, testMCPConnection, - type MCPConnectionTestResult, } from "../../utils/mcp-config-validator"; +import type { ConnectFormProps } from ".."; export const MCPConnectForm: FC = ({ onSubmit, isSubmitting }) => { const isSubmittingRef = useRef(false); diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx index d0bc96872..17f4a49a5 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx @@ -1,6 +1,6 @@ "use client"; -import { File, FileText, FileSpreadsheet, FolderClosed, Image, Presentation } from "lucide-react"; +import { File, FileSpreadsheet, FileText, FolderClosed, Image, Presentation } from "lucide-react"; import type { FC } from "react"; import { useEffect, useState } from "react"; import { GoogleDriveFolderTree } from "@/components/connectors/google-drive-folder-tree"; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/mcp-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/mcp-config.tsx index 1de6300f0..ac450677e 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/mcp-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/mcp-config.tsx @@ -10,12 +10,12 @@ import { Label } from "@/components/ui/label"; import { Textarea } from "@/components/ui/textarea"; import { EnumConnectorName } from "@/contracts/enums/connector"; import type { MCPServerConfig } from "@/contracts/types/mcp.types"; -import type { ConnectorConfigProps } from "../index"; import { + type MCPConnectionTestResult, parseMCPConfig, testMCPConnection, - type MCPConnectionTestResult, } from "../../utils/mcp-config-validator"; +import type { ConnectorConfigProps } from "../index"; interface MCPConfigProps extends ConnectorConfigProps { onNameChange?: (name: string) => void; diff --git a/surfsense_web/components/assistant-ui/inline-mention-editor.tsx b/surfsense_web/components/assistant-ui/inline-mention-editor.tsx index 570440f6a..ae8fe9b8d 100644 --- a/surfsense_web/components/assistant-ui/inline-mention-editor.tsx +++ b/surfsense_web/components/assistant-ui/inline-mention-editor.tsx @@ -11,8 +11,8 @@ import { useState, } from "react"; import ReactDOMServer from "react-dom/server"; -import type { Document } from "@/contracts/types/document.types"; import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; +import type { Document } from "@/contracts/types/document.types"; import { cn } from "@/lib/utils"; export interface MentionedDocument { diff --git a/surfsense_web/components/chat-comments/comment-panel/comment-panel.tsx b/surfsense_web/components/chat-comments/comment-panel/comment-panel.tsx index 2ec960614..12f336d85 100644 --- a/surfsense_web/components/chat-comments/comment-panel/comment-panel.tsx +++ b/surfsense_web/components/chat-comments/comment-panel/comment-panel.tsx @@ -36,10 +36,12 @@ export function CommentPanel({ if (isLoading) { return ( -
+
Loading comments... @@ -57,10 +59,7 @@ export function CommentPanel({ return (
{hasThreads && ( @@ -92,11 +91,7 @@ export function CommentPanel({
)} -
+
{isComposerOpen ? ( {/* Drag handle indicator - only for bottom sheet */} @@ -37,10 +30,7 @@ export function CommentSheet({
)} - + Comments @@ -52,11 +42,7 @@ export function CommentSheet({
- +
diff --git a/surfsense_web/components/connectors/google-drive-folder-tree.tsx b/surfsense_web/components/connectors/google-drive-folder-tree.tsx index 894564167..30df2d788 100644 --- a/surfsense_web/components/connectors/google-drive-folder-tree.tsx +++ b/surfsense_web/components/connectors/google-drive-folder-tree.tsx @@ -4,6 +4,7 @@ import { ChevronDown, ChevronRight, File, + FileSpreadsheet, FileText, FolderClosed, FolderOpen, @@ -11,7 +12,6 @@ import { Image, Loader2, Presentation, - FileSpreadsheet, } from "lucide-react"; import { useState } from "react"; import { Checkbox } from "@/components/ui/checkbox"; diff --git a/surfsense_web/components/layout/ui/header/Header.tsx b/surfsense_web/components/layout/ui/header/Header.tsx index 182934d27..90869e4b0 100644 --- a/surfsense_web/components/layout/ui/header/Header.tsx +++ b/surfsense_web/components/layout/ui/header/Header.tsx @@ -1,9 +1,9 @@ "use client"; import { Moon, Sun } from "lucide-react"; +import { NotificationButton } from "@/components/notifications/NotificationButton"; import { Button } from "@/components/ui/button"; import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"; -import { NotificationButton } from "@/components/notifications/NotificationButton"; interface HeaderProps { breadcrumb?: React.ReactNode; diff --git a/surfsense_web/components/notifications/NotificationButton.tsx b/surfsense_web/components/notifications/NotificationButton.tsx index e9f5db2dc..acecc06af 100644 --- a/surfsense_web/components/notifications/NotificationButton.tsx +++ b/surfsense_web/components/notifications/NotificationButton.tsx @@ -1,16 +1,16 @@ "use client"; -import { useState } from "react"; +import { useAtomValue } from "jotai"; import { Bell } from "lucide-react"; +import { useParams } from "next/navigation"; +import { useState } from "react"; +import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { Button } from "@/components/ui/button"; import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover"; import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"; import { useNotifications } from "@/hooks/use-notifications"; -import { useAtomValue } from "jotai"; -import { currentUserAtom } from "@/atoms/user/user-query.atoms"; -import { NotificationPopup } from "./NotificationPopup"; import { cn } from "@/lib/utils"; -import { useParams } from "next/navigation"; +import { NotificationPopup } from "./NotificationPopup"; export function NotificationButton() { const [open, setOpen] = useState(false); diff --git a/surfsense_web/components/notifications/NotificationPopup.tsx b/surfsense_web/components/notifications/NotificationPopup.tsx index 9196ceaa4..50deadf03 100644 --- a/surfsense_web/components/notifications/NotificationPopup.tsx +++ b/surfsense_web/components/notifications/NotificationPopup.tsx @@ -1,14 +1,14 @@ "use client"; -import { Bell, CheckCheck, Loader2, AlertCircle, CheckCircle2 } from "lucide-react"; +import { formatDistanceToNow } from "date-fns"; +import { AlertCircle, Bell, CheckCheck, CheckCircle2, Loader2 } from "lucide-react"; import { useRouter } from "next/navigation"; +import { convertRenderedToDisplay } from "@/components/chat-comments/comment-item/comment-item"; import { Button } from "@/components/ui/button"; import { ScrollArea } from "@/components/ui/scroll-area"; import { Separator } from "@/components/ui/separator"; import type { Notification } from "@/hooks/use-notifications"; -import { formatDistanceToNow } from "date-fns"; import { cn } from "@/lib/utils"; -import { convertRenderedToDisplay } from "@/components/chat-comments/comment-item/comment-item"; interface NotificationPopupProps { notifications: Notification[]; diff --git a/surfsense_web/components/providers/ElectricProvider.tsx b/surfsense_web/components/providers/ElectricProvider.tsx index af3046a64..e31885973 100644 --- a/surfsense_web/components/providers/ElectricProvider.tsx +++ b/surfsense_web/components/providers/ElectricProvider.tsx @@ -1,13 +1,13 @@ "use client"; -import { useEffect, useState, useRef } from "react"; import { useAtomValue } from "jotai"; +import { useEffect, useRef, useState } from "react"; import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { - initElectric, cleanupElectric, - isElectricInitialized, type ElectricClient, + initElectric, + isElectricInitialized, } from "@/lib/electric/client"; import { ElectricContext } from "@/lib/electric/context"; diff --git a/surfsense_web/hooks/use-connectors-electric.ts b/surfsense_web/hooks/use-connectors-electric.ts index 94d5062c9..08ef0621d 100644 --- a/surfsense_web/hooks/use-connectors-electric.ts +++ b/surfsense_web/hooks/use-connectors-electric.ts @@ -1,9 +1,9 @@ "use client"; -import { useEffect, useState, useCallback, useRef } from "react"; -import { useElectricClient } from "@/lib/electric/context"; -import type { SyncHandle } from "@/lib/electric/client"; +import { useCallback, useEffect, useRef, useState } from "react"; import type { SearchSourceConnector } from "@/contracts/types/connector.types"; +import type { SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; /** * Hook for managing connectors with Electric SQL real-time sync diff --git a/surfsense_web/hooks/use-documents-electric.ts b/surfsense_web/hooks/use-documents-electric.ts index 74d9e91e7..43809499e 100644 --- a/surfsense_web/hooks/use-documents-electric.ts +++ b/surfsense_web/hooks/use-documents-electric.ts @@ -1,8 +1,8 @@ "use client"; -import { useEffect, useState, useRef, useMemo } from "react"; -import { useElectricClient } from "@/lib/electric/context"; +import { useEffect, useMemo, useRef, useState } from "react"; import type { SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; interface Document { id: number; diff --git a/surfsense_web/hooks/use-notifications.ts b/surfsense_web/hooks/use-notifications.ts index 7a3b49861..fbdf421de 100644 --- a/surfsense_web/hooks/use-notifications.ts +++ b/surfsense_web/hooks/use-notifications.ts @@ -1,10 +1,10 @@ "use client"; -import { useEffect, useState, useCallback, useRef } from "react"; -import { useElectricClient } from "@/lib/electric/context"; -import type { SyncHandle } from "@/lib/electric/client"; +import { useCallback, useEffect, useRef, useState } from "react"; import type { Notification } from "@/contracts/types/notification.types"; import { authenticatedFetch } from "@/lib/auth-utils"; +import type { SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; export type { Notification } from "@/contracts/types/notification.types"; diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index c33969914..514185d23 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -13,8 +13,8 @@ */ import { PGlite } from "@electric-sql/pglite"; -import { electricSync } from "@electric-sql/pglite-sync"; import { live } from "@electric-sql/pglite/live"; +import { electricSync } from "@electric-sql/pglite-sync"; // Types export interface ElectricClient { @@ -270,365 +270,375 @@ export async function initElectric(userId: string): Promise { // Create and track the sync promise to prevent race conditions const syncPromise = (async (): Promise => { // Build params for the shape request - // Electric SQL expects params as URL query parameters - const params: Record = { table }; + // Electric SQL expects params as URL query parameters + const params: Record = { table }; - // Validate and fix WHERE clause to ensure string literals are properly quoted - let validatedWhere = where; - if (where) { - // Check if where uses positional parameters - if (where.includes("$1")) { - // Extract the value from the where clause if it's embedded - // For now, we'll use the where clause as-is and let Electric handle it - params.where = where; - validatedWhere = where; - } else { - // Validate that string literals are properly quoted - // Count single quotes - should be even (pairs) for properly quoted strings - const singleQuoteCount = (where.match(/'/g) || []).length; - - if (singleQuoteCount % 2 !== 0) { - // Odd number of quotes means unterminated string literal - console.warn("Where clause has unmatched quotes, fixing:", where); - // Add closing quote at the end - validatedWhere = `${where}'`; - params.where = validatedWhere; - } else { - // Use the where clause directly (already formatted) + // Validate and fix WHERE clause to ensure string literals are properly quoted + let validatedWhere = where; + if (where) { + // Check if where uses positional parameters + if (where.includes("$1")) { + // Extract the value from the where clause if it's embedded + // For now, we'll use the where clause as-is and let Electric handle it params.where = where; validatedWhere = where; + } else { + // Validate that string literals are properly quoted + // Count single quotes - should be even (pairs) for properly quoted strings + const singleQuoteCount = (where.match(/'/g) || []).length; + + if (singleQuoteCount % 2 !== 0) { + // Odd number of quotes means unterminated string literal + console.warn("Where clause has unmatched quotes, fixing:", where); + // Add closing quote at the end + validatedWhere = `${where}'`; + params.where = validatedWhere; + } else { + // Use the where clause directly (already formatted) + params.where = where; + validatedWhere = where; + } } } - } - if (columns) params.columns = columns.join(","); + if (columns) params.columns = columns.join(","); - console.log("[Electric] Syncing shape with params:", params); - console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`); - console.log("[Electric] Where clause:", where, "Validated:", validatedWhere); + console.log("[Electric] Syncing shape with params:", params); + console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`); + console.log("[Electric] Where clause:", where, "Validated:", validatedWhere); - try { - // Debug: Test Electric SQL connection directly first - // Use validatedWhere to ensure proper URL encoding - const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`; - console.log("[Electric] Testing Electric SQL directly:", testUrl); try { - const testResponse = await fetch(testUrl); - const testHeaders = { - handle: testResponse.headers.get("electric-handle"), - offset: testResponse.headers.get("electric-offset"), - upToDate: testResponse.headers.get("electric-up-to-date"), - }; - console.log("[Electric] Direct Electric SQL response headers:", testHeaders); - const testData = await testResponse.json(); - console.log( - "[Electric] Direct Electric SQL data count:", - Array.isArray(testData) ? testData.length : "not array", - testData - ); - } catch (testErr) { - console.error("[Electric] Direct Electric SQL test failed:", testErr); - } - - // Use PGlite's electric sync plugin to sync the shape - // According to Electric SQL docs, the shape config uses params for table, where, columns - // Note: mapColumns is OPTIONAL per pglite-sync types.ts - - // Create a promise that resolves when initial sync is complete - // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout - // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates - let syncResolved = false; - // Initialize with no-op functions to satisfy TypeScript - let resolveInitialSync: () => void = () => {}; - let rejectInitialSync: (error: Error) => void = () => {}; - - const initialSyncPromise = new Promise((resolve, reject) => { - resolveInitialSync = () => { - if (!syncResolved) { - syncResolved = true; - // DON'T unsubscribe from stream - it needs to stay active for real-time updates - resolve(); - } - }; - rejectInitialSync = (error: Error) => { - if (!syncResolved) { - syncResolved = true; - // DON'T unsubscribe from stream even on error - let Electric handle it - reject(error); - } - }; - - // Shorter timeout (5 seconds) as fallback - setTimeout(() => { - if (!syncResolved) { - console.warn( - `[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` - ); - // Check isUpToDate one more time before resolving - // This will be checked after shape is created - setTimeout(() => { - if (!syncResolved) { - console.warn( - `[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s` - ); - resolveInitialSync(); - } - }, 100); - } - }, 5000); - }); - - // Include userId in shapeKey for user-specific sync state - const shapeConfig = { - shape: { - url: `${electricUrl}/v1/shape`, - params: { - table, - ...(validatedWhere ? { where: validatedWhere } : {}), - ...(columns ? { columns: columns.join(",") } : {}), - }, - }, - table, - primaryKey, - shapeKey: `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // User-specific versioned key - onInitialSync: () => { + // Debug: Test Electric SQL connection directly first + // Use validatedWhere to ensure proper URL encoding + const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`; + console.log("[Electric] Testing Electric SQL directly:", testUrl); + try { + const testResponse = await fetch(testUrl); + const testHeaders = { + handle: testResponse.headers.get("electric-handle"), + offset: testResponse.headers.get("electric-offset"), + upToDate: testResponse.headers.get("electric-up-to-date"), + }; + console.log("[Electric] Direct Electric SQL response headers:", testHeaders); + const testData = await testResponse.json(); console.log( - `[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite` + "[Electric] Direct Electric SQL data count:", + Array.isArray(testData) ? testData.length : "not array", + testData + ); + } catch (testErr) { + console.error("[Electric] Direct Electric SQL test failed:", testErr); + } + + // Use PGlite's electric sync plugin to sync the shape + // According to Electric SQL docs, the shape config uses params for table, where, columns + // Note: mapColumns is OPTIONAL per pglite-sync types.ts + + // Create a promise that resolves when initial sync is complete + // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout + // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates + let syncResolved = false; + // Initialize with no-op functions to satisfy TypeScript + let resolveInitialSync: () => void = () => {}; + let rejectInitialSync: (error: Error) => void = () => {}; + + const initialSyncPromise = new Promise((resolve, reject) => { + resolveInitialSync = () => { + if (!syncResolved) { + syncResolved = true; + // DON'T unsubscribe from stream - it needs to stay active for real-time updates + resolve(); + } + }; + rejectInitialSync = (error: Error) => { + if (!syncResolved) { + syncResolved = true; + // DON'T unsubscribe from stream even on error - let Electric handle it + reject(error); + } + }; + + // Shorter timeout (5 seconds) as fallback + setTimeout(() => { + if (!syncResolved) { + console.warn( + `[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` + ); + // Check isUpToDate one more time before resolving + // This will be checked after shape is created + setTimeout(() => { + if (!syncResolved) { + console.warn( + `[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s` + ); + resolveInitialSync(); + } + }, 100); + } + }, 5000); + }); + + // Include userId in shapeKey for user-specific sync state + const shapeConfig = { + shape: { + url: `${electricUrl}/v1/shape`, + params: { + table, + ...(validatedWhere ? { where: validatedWhere } : {}), + ...(columns ? { columns: columns.join(",") } : {}), + }, + }, + table, + primaryKey, + shapeKey: `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // User-specific versioned key + onInitialSync: () => { + console.log( + `[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite` + ); + resolveInitialSync(); + }, + onError: (error: Error) => { + console.error(`[Electric] ❌ Shape sync error for ${table}:`, error); + console.error( + "[Electric] Error details:", + JSON.stringify(error, Object.getOwnPropertyNames(error)) + ); + rejectInitialSync(error); + }, + }; + + console.log( + "[Electric] syncShapeToTable config:", + JSON.stringify(shapeConfig, null, 2) + ); + + // Type assertion to PGlite with electric extension + const pgWithElectric = db as PGlite & { + electric: { + syncShapeToTable: ( + config: typeof shapeConfig + ) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>; + }; + }; + + let shape: { unsubscribe: () => void; isUpToDate: boolean; stream: unknown }; + try { + shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); + } catch (syncError) { + // Handle "Already syncing" error - pglite-sync might not have fully cleaned up yet + const errorMessage = + syncError instanceof Error ? syncError.message : String(syncError); + if (errorMessage.includes("Already syncing")) { + console.warn( + `[Electric] Already syncing ${table}, waiting for existing sync to settle...` + ); + + // Wait a short time for pglite-sync to settle + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Check if an active handle now exists (another sync might have completed) + const existingHandle = activeSyncHandles.get(cacheKey); + if (existingHandle) { + console.log(`[Electric] Found existing handle after waiting: ${cacheKey}`); + return existingHandle; + } + + // Retry once after waiting + console.log(`[Electric] Retrying sync for ${table}...`); + try { + shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); + } catch (retryError) { + const retryMessage = + retryError instanceof Error ? retryError.message : String(retryError); + if (retryMessage.includes("Already syncing")) { + // Still syncing - create a placeholder handle that indicates the table is being synced + console.warn( + `[Electric] ${table} still syncing, creating placeholder handle` + ); + const placeholderHandle: SyncHandle = { + unsubscribe: () => { + console.log(`[Electric] Placeholder unsubscribe for: ${cacheKey}`); + activeSyncHandles.delete(cacheKey); + }, + get isUpToDate() { + return false; // We don't know the real state + }, + stream: undefined, + initialSyncPromise: Promise.resolve(), // Already syncing means data should be coming + }; + activeSyncHandles.set(cacheKey, placeholderHandle); + return placeholderHandle; + } + throw retryError; + } + } else { + throw syncError; + } + } + + if (!shape) { + throw new Error("syncShapeToTable returned undefined"); + } + + // Log the actual shape result structure + console.log("[Electric] Shape sync result (initial):", { + hasUnsubscribe: typeof shape?.unsubscribe === "function", + isUpToDate: shape?.isUpToDate, + hasStream: !!shape?.stream, + streamType: typeof shape?.stream, + }); + + // Recommended Approach Step 1: Check isUpToDate immediately + if (shape.isUpToDate) { + console.log( + `[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)` ); resolveInitialSync(); - }, - onError: (error: Error) => { - console.error(`[Electric] ❌ Shape sync error for ${table}:`, error); - console.error( - "[Electric] Error details:", - JSON.stringify(error, Object.getOwnPropertyNames(error)) - ); - rejectInitialSync(error); - }, - }; - - console.log( - "[Electric] syncShapeToTable config:", - JSON.stringify(shapeConfig, null, 2) - ); - - // Type assertion to PGlite with electric extension - const pgWithElectric = db as PGlite & { - electric: { - syncShapeToTable: ( - config: typeof shapeConfig - ) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>; - }; - }; - - let shape: { unsubscribe: () => void; isUpToDate: boolean; stream: unknown }; - try { - shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); - } catch (syncError) { - // Handle "Already syncing" error - pglite-sync might not have fully cleaned up yet - const errorMessage = syncError instanceof Error ? syncError.message : String(syncError); - if (errorMessage.includes("Already syncing")) { - console.warn(`[Electric] Already syncing ${table}, waiting for existing sync to settle...`); - - // Wait a short time for pglite-sync to settle - await new Promise(resolve => setTimeout(resolve, 100)); - - // Check if an active handle now exists (another sync might have completed) - const existingHandle = activeSyncHandles.get(cacheKey); - if (existingHandle) { - console.log(`[Electric] Found existing handle after waiting: ${cacheKey}`); - return existingHandle; - } - - // Retry once after waiting - console.log(`[Electric] Retrying sync for ${table}...`); - try { - shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); - } catch (retryError) { - const retryMessage = retryError instanceof Error ? retryError.message : String(retryError); - if (retryMessage.includes("Already syncing")) { - // Still syncing - create a placeholder handle that indicates the table is being synced - console.warn(`[Electric] ${table} still syncing, creating placeholder handle`); - const placeholderHandle: SyncHandle = { - unsubscribe: () => { - console.log(`[Electric] Placeholder unsubscribe for: ${cacheKey}`); - activeSyncHandles.delete(cacheKey); - }, - get isUpToDate() { - return false; // We don't know the real state - }, - stream: undefined, - initialSyncPromise: Promise.resolve(), // Already syncing means data should be coming - }; - activeSyncHandles.set(cacheKey, placeholderHandle); - return placeholderHandle; - } - throw retryError; - } } else { - throw syncError; - } - } - - if (!shape) { - throw new Error("syncShapeToTable returned undefined"); - } - - // Log the actual shape result structure - console.log("[Electric] Shape sync result (initial):", { - hasUnsubscribe: typeof shape?.unsubscribe === "function", - isUpToDate: shape?.isUpToDate, - hasStream: !!shape?.stream, - streamType: typeof shape?.stream, - }); - - // Recommended Approach Step 1: Check isUpToDate immediately - if (shape.isUpToDate) { - console.log( - `[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)` - ); - resolveInitialSync(); - } else { - // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message - if (shape?.stream) { - const stream = shape.stream as any; - console.log("[Electric] Shape stream details:", { - shapeHandle: stream?.shapeHandle, - lastOffset: stream?.lastOffset, - isUpToDate: stream?.isUpToDate, - error: stream?.error, - hasSubscribe: typeof stream?.subscribe === "function", - hasUnsubscribe: typeof stream?.unsubscribe === "function", - }); - - // Subscribe to the stream to watch for "up-to-date" control message - // NOTE: We keep this subscription active - don't unsubscribe! - // The stream is what Electric SQL uses for real-time updates - if (typeof stream?.subscribe === "function") { - console.log( - "[Electric] Subscribing to shape stream to watch for up-to-date message..." - ); - // Subscribe but don't store unsubscribe - we want it to stay active - stream.subscribe((messages: unknown[]) => { - // Continue receiving updates even after sync is resolved - if (!syncResolved) { - console.log( - "[Electric] 🔵 Shape stream received messages:", - messages?.length || 0 - ); - } - - // Check if any message indicates sync is complete - if (messages && messages.length > 0) { - for (const message of messages) { - const msg = message as any; - // Check for "up-to-date" control message - if ( - msg?.headers?.control === "up-to-date" || - msg?.headers?.electric_up_to_date === "true" || - (typeof msg === "object" && "up-to-date" in msg) - ) { - if (!syncResolved) { - console.log(`[Electric] ✅ Received up-to-date message for ${table}`); - resolveInitialSync(); - } - // Continue listening for real-time updates - don't return! - } - } - if (!syncResolved && messages.length > 0) { - console.log( - "[Electric] First message:", - JSON.stringify(messages[0], null, 2) - ); - } - } - - // Also check stream's isUpToDate property after receiving messages - if (!syncResolved && stream?.isUpToDate) { - console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`); - resolveInitialSync(); - } + // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message + if (shape?.stream) { + const stream = shape.stream as any; + console.log("[Electric] Shape stream details:", { + shapeHandle: stream?.shapeHandle, + lastOffset: stream?.lastOffset, + isUpToDate: stream?.isUpToDate, + error: stream?.error, + hasSubscribe: typeof stream?.subscribe === "function", + hasUnsubscribe: typeof stream?.unsubscribe === "function", }); - // Also check stream's isUpToDate property immediately - if (stream?.isUpToDate) { - console.log(`[Electric] ✅ Stream isUpToDate is true immediately for ${table}`); - resolveInitialSync(); + // Subscribe to the stream to watch for "up-to-date" control message + // NOTE: We keep this subscription active - don't unsubscribe! + // The stream is what Electric SQL uses for real-time updates + if (typeof stream?.subscribe === "function") { + console.log( + "[Electric] Subscribing to shape stream to watch for up-to-date message..." + ); + // Subscribe but don't store unsubscribe - we want it to stay active + stream.subscribe((messages: unknown[]) => { + // Continue receiving updates even after sync is resolved + if (!syncResolved) { + console.log( + "[Electric] 🔵 Shape stream received messages:", + messages?.length || 0 + ); + } + + // Check if any message indicates sync is complete + if (messages && messages.length > 0) { + for (const message of messages) { + const msg = message as any; + // Check for "up-to-date" control message + if ( + msg?.headers?.control === "up-to-date" || + msg?.headers?.electric_up_to_date === "true" || + (typeof msg === "object" && "up-to-date" in msg) + ) { + if (!syncResolved) { + console.log(`[Electric] ✅ Received up-to-date message for ${table}`); + resolveInitialSync(); + } + // Continue listening for real-time updates - don't return! + } + } + if (!syncResolved && messages.length > 0) { + console.log( + "[Electric] First message:", + JSON.stringify(messages[0], null, 2) + ); + } + } + + // Also check stream's isUpToDate property after receiving messages + if (!syncResolved && stream?.isUpToDate) { + console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`); + resolveInitialSync(); + } + }); + + // Also check stream's isUpToDate property immediately + if (stream?.isUpToDate) { + console.log( + `[Electric] ✅ Stream isUpToDate is true immediately for ${table}` + ); + resolveInitialSync(); + } } + + // Also poll isUpToDate periodically as a backup (every 200ms) + const pollInterval = setInterval(() => { + if (syncResolved) { + clearInterval(pollInterval); + return; + } + + if (shape.isUpToDate || stream?.isUpToDate) { + console.log( + `[Electric] ✅ Sync completed (detected via polling) for ${table}` + ); + clearInterval(pollInterval); + resolveInitialSync(); + } + }, 200); + + // Clean up polling when promise resolves + initialSyncPromise.finally(() => { + clearInterval(pollInterval); + }); + } else { + console.warn( + `[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout` + ); } - - // Also poll isUpToDate periodically as a backup (every 200ms) - const pollInterval = setInterval(() => { - if (syncResolved) { - clearInterval(pollInterval); - return; - } - - if (shape.isUpToDate || stream?.isUpToDate) { - console.log(`[Electric] ✅ Sync completed (detected via polling) for ${table}`); - clearInterval(pollInterval); - resolveInitialSync(); - } - }, 200); - - // Clean up polling when promise resolves - initialSyncPromise.finally(() => { - clearInterval(pollInterval); - }); - } else { - console.warn( - `[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout` - ); } - } - // Create the sync handle with proper cleanup - const syncHandle: SyncHandle = { - unsubscribe: () => { - console.log(`[Electric] Unsubscribing from: ${cacheKey}`); - // Remove from cache first - activeSyncHandles.delete(cacheKey); - // Then unsubscribe from the shape - if (shape && typeof shape.unsubscribe === "function") { - shape.unsubscribe(); - } - }, - // Use getter to always return current state - get isUpToDate() { - return shape?.isUpToDate ?? false; - }, - stream: shape?.stream, - initialSyncPromise, // Expose promise so callers can wait for sync - }; + // Create the sync handle with proper cleanup + const syncHandle: SyncHandle = { + unsubscribe: () => { + console.log(`[Electric] Unsubscribing from: ${cacheKey}`); + // Remove from cache first + activeSyncHandles.delete(cacheKey); + // Then unsubscribe from the shape + if (shape && typeof shape.unsubscribe === "function") { + shape.unsubscribe(); + } + }, + // Use getter to always return current state + get isUpToDate() { + return shape?.isUpToDate ?? false; + }, + stream: shape?.stream, + initialSyncPromise, // Expose promise so callers can wait for sync + }; - // Cache the sync handle for reuse (memory optimization) - activeSyncHandles.set(cacheKey, syncHandle); - console.log( - `[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})` - ); - - return syncHandle; - } catch (error) { - console.error("[Electric] Failed to sync shape:", error); - // Check if Electric SQL server is reachable - try { - const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { - method: "GET", - }); + // Cache the sync handle for reuse (memory optimization) + activeSyncHandles.set(cacheKey, syncHandle); console.log( - "[Electric] Electric SQL server response:", - response.status, - response.statusText + `[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})` ); - if (!response.ok) { - console.error("[Electric] Electric SQL server error:", await response.text()); + + return syncHandle; + } catch (error) { + console.error("[Electric] Failed to sync shape:", error); + // Check if Electric SQL server is reachable + try { + const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { + method: "GET", + }); + console.log( + "[Electric] Electric SQL server response:", + response.status, + response.statusText + ); + if (!response.ok) { + console.error("[Electric] Electric SQL server error:", await response.text()); + } + } catch (fetchError) { + console.error("[Electric] Cannot reach Electric SQL server:", fetchError); + console.error("[Electric] Make sure Electric SQL is running at:", electricUrl); } - } catch (fetchError) { - console.error("[Electric] Cannot reach Electric SQL server:", fetchError); - console.error("[Electric] Make sure Electric SQL is running at:", electricUrl); + throw error; } - throw error; - } })(); // Track the sync promise to prevent concurrent syncs for the same shape