chore: linting

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-01-20 00:32:31 -08:00
parent 08df9e68e5
commit d96ae66012
21 changed files with 502 additions and 479 deletions

View file

@ -77,7 +77,7 @@ class MCPClient:
# Initialize the connection
await session.initialize()
self.session = session
if attempt > 0:
logger.info(
"Connected to MCP server on attempt %d: %s %s",
@ -267,30 +267,40 @@ async def test_mcp_http_connection(
"""
try:
logger.info("Testing HTTP MCP connection to: %s (transport: %s)", url, transport)
logger.info(
"Testing HTTP MCP connection to: %s (transport: %s)", url, transport
)
# Use streamable HTTP client for all HTTP-based transports
async with streamablehttp_client(url, headers=headers or {}) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
# List available tools
response = await session.list_tools()
tools = []
for tool in response.tools:
tools.append({
async with (
streamablehttp_client(url, headers=headers or {}) as (read, write, _),
ClientSession(read, write) as session,
):
await session.initialize()
# List available tools
response = await session.list_tools()
tools = []
for tool in response.tools:
tools.append(
{
"name": tool.name,
"description": tool.description or "",
"input_schema": tool.inputSchema if hasattr(tool, "inputSchema") else {},
})
logger.info("HTTP MCP connection successful. Found %d tools.", len(tools))
return {
"status": "success",
"message": f"Connected successfully. Found {len(tools)} tools.",
"tools": tools,
}
"input_schema": tool.inputSchema
if hasattr(tool, "inputSchema")
else {},
}
)
logger.info(
"HTTP MCP connection successful. Found %d tools.", len(tools)
)
return {
"status": "success",
"message": f"Connected successfully. Found {len(tools)} tools.",
"tools": tools,
}
except Exception as e:
logger.error("Failed to connect to HTTP MCP server: %s", e, exc_info=True)
return {

View file

@ -160,27 +160,31 @@ async def _create_mcp_tool_from_definition_http(
logger.info(f"MCP HTTP tool '{tool_name}' called with params: {kwargs}")
try:
async with streamablehttp_client(url, headers=headers) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
# Call the tool
response = await session.call_tool(tool_name, arguments=kwargs)
# Extract content from response
result = []
for content in response.content:
if hasattr(content, "text"):
result.append(content.text)
elif hasattr(content, "data"):
result.append(str(content.data))
else:
result.append(str(content))
result_str = "\n".join(result) if result else ""
logger.info(f"MCP HTTP tool '{tool_name}' succeeded: {result_str[:200]}")
return result_str
async with (
streamablehttp_client(url, headers=headers) as (read, write, _),
ClientSession(read, write) as session,
):
await session.initialize()
# Call the tool
response = await session.call_tool(tool_name, arguments=kwargs)
# Extract content from response
result = []
for content in response.content:
if hasattr(content, "text"):
result.append(content.text)
elif hasattr(content, "data"):
result.append(str(content.data))
else:
result.append(str(content))
result_str = "\n".join(result) if result else ""
logger.info(
f"MCP HTTP tool '{tool_name}' succeeded: {result_str[:200]}"
)
return result_str
except Exception as e:
error_msg = f"MCP HTTP tool '{tool_name}' execution failed: {e!s}"
logger.exception(error_msg)
@ -192,7 +196,11 @@ async def _create_mcp_tool_from_definition_http(
description=tool_description,
coroutine=mcp_http_tool_call,
args_schema=input_model,
metadata={"mcp_input_schema": input_schema, "mcp_transport": "http", "mcp_url": url},
metadata={
"mcp_input_schema": input_schema,
"mcp_transport": "http",
"mcp_url": url,
},
)
logger.info(f"Created MCP tool (HTTP): '{tool_name}'")
@ -205,17 +213,17 @@ async def _load_stdio_mcp_tools(
server_config: dict[str, Any],
) -> list[StructuredTool]:
"""Load tools from a stdio-based MCP server.
Args:
connector_id: Connector ID for logging
connector_name: Connector name for logging
server_config: Server configuration with command, args, env
Returns:
List of tools from the MCP server
"""
tools: list[StructuredTool] = []
# Validate required command field
command = server_config.get("command")
if not command or not isinstance(command, str):
@ -262,7 +270,7 @@ async def _load_stdio_mcp_tools(
f"Failed to create tool '{tool_def.get('name')}' "
f"from connector {connector_id}: {e!s}"
)
return tools
@ -272,17 +280,17 @@ async def _load_http_mcp_tools(
server_config: dict[str, Any],
) -> list[StructuredTool]:
"""Load tools from an HTTP-based MCP server.
Args:
connector_id: Connector ID for logging
connector_name: Connector name for logging
server_config: Server configuration with url, headers
Returns:
List of tools from the MCP server
"""
tools: list[StructuredTool] = []
# Validate required url field
url = server_config.get("url")
if not url or not isinstance(url, str):
@ -301,41 +309,49 @@ async def _load_http_mcp_tools(
# Connect and discover tools via HTTP
try:
async with streamablehttp_client(url, headers=headers) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
# List available tools
response = await session.list_tools()
tool_definitions = []
for tool in response.tools:
tool_definitions.append({
async with (
streamablehttp_client(url, headers=headers) as (read, write, _),
ClientSession(read, write) as session,
):
await session.initialize()
# List available tools
response = await session.list_tools()
tool_definitions = []
for tool in response.tools:
tool_definitions.append(
{
"name": tool.name,
"description": tool.description or "",
"input_schema": tool.inputSchema if hasattr(tool, "inputSchema") else {},
})
logger.info(
f"Discovered {len(tool_definitions)} tools from HTTP MCP server "
f"'{url}' (connector {connector_id})"
"input_schema": tool.inputSchema
if hasattr(tool, "inputSchema")
else {},
}
)
logger.info(
f"Discovered {len(tool_definitions)} tools from HTTP MCP server "
f"'{url}' (connector {connector_id})"
)
# Create LangChain tools from definitions
for tool_def in tool_definitions:
try:
tool = await _create_mcp_tool_from_definition_http(tool_def, url, headers)
tool = await _create_mcp_tool_from_definition_http(
tool_def, url, headers
)
tools.append(tool)
except Exception as e:
logger.exception(
f"Failed to create HTTP tool '{tool_def.get('name')}' "
f"from connector {connector_id}: {e!s}"
)
except Exception as e:
logger.exception(
f"Failed to connect to HTTP MCP server at '{url}' (connector {connector_id}): {e!s}"
)
return tools
@ -372,7 +388,7 @@ async def load_mcp_tools(
# Early validation: Extract and validate connector config
config = connector.config or {}
server_config = config.get("server_config", {})
# Validate server_config exists and is a dict
if not server_config or not isinstance(server_config, dict):
logger.warning(
@ -382,7 +398,7 @@ async def load_mcp_tools(
# Determine transport type
transport = server_config.get("transport", "stdio")
if transport in ("streamable-http", "http", "sse"):
# HTTP-based MCP server
connector_tools = await _load_http_mcp_tools(
@ -393,9 +409,9 @@ async def load_mcp_tools(
connector_tools = await _load_stdio_mcp_tools(
connector.id, connector.name, server_config
)
tools.extend(connector_tools)
except Exception as e:
logger.exception(
f"Failed to load tools from MCP connector {connector.id}: {e!s}"

View file

@ -2403,25 +2403,29 @@ async def test_mcp_server_connection(
)
transport = server_config.get("transport", "stdio")
# HTTP transport (streamable-http, http, sse)
if transport in ("streamable-http", "http", "sse"):
url = server_config.get("url")
headers = server_config.get("headers", {})
if not url:
raise HTTPException(status_code=400, detail="Server URL is required for HTTP transport")
raise HTTPException(
status_code=400, detail="Server URL is required for HTTP transport"
)
result = await test_mcp_http_connection(url, headers, transport)
return result
# stdio transport (default)
command = server_config.get("command")
args = server_config.get("args", [])
env = server_config.get("env", {})
if not command:
raise HTTPException(status_code=400, detail="Server command is required for stdio transport")
raise HTTPException(
status_code=400, detail="Server command is required for stdio transport"
)
# Test the connection
result = await test_mcp_connection(command, args, env)

View file

@ -84,7 +84,7 @@ class SearchSourceConnectorRead(SearchSourceConnectorBase, IDModel, TimestampMod
class MCPServerConfig(BaseModel):
"""Configuration for an MCP server connection.
Supports two transport types:
- stdio: Local process (command, args, env)
- streamable-http/http/sse: Remote HTTP server (url, headers)
@ -94,13 +94,13 @@ class MCPServerConfig(BaseModel):
command: str | None = None # e.g., "uvx", "node", "python"
args: list[str] = [] # e.g., ["mcp-server-git", "--repository", "/path"]
env: dict[str, str] = {} # Environment variables for the server process
# HTTP transport fields
url: str | None = None # e.g., "https://mcp-server.com/mcp"
headers: dict[str, str] = {} # HTTP headers for authentication
transport: str = "stdio" # "stdio" | "streamable-http" | "http" | "sse"
def is_http_transport(self) -> bool:
"""Check if this config uses HTTP transport."""
return self.transport in ("streamable-http", "http", "sse")

View file

@ -204,7 +204,9 @@ export const AssistantMessage: FC = () => {
>
<MessageSquare className={cn("size-4", hasComments && "fill-current")} />
{hasComments ? (
<span>{commentCount} {commentCount === 1 ? "comment" : "comments"}</span>
<span>
{commentCount} {commentCount === 1 ? "comment" : "comments"}
</span>
) : (
<span>Add comment</span>
)}

View file

@ -1,7 +1,7 @@
"use client";
import type { FC } from "react";
import { AlertCircle } from "lucide-react";
import type { FC } from "react";
import { Label } from "@/components/ui/label";
import {
Select,

View file

@ -7,13 +7,13 @@ import { Button } from "@/components/ui/button";
import { Label } from "@/components/ui/label";
import { Textarea } from "@/components/ui/textarea";
import { EnumConnectorName } from "@/contracts/enums/connector";
import type { ConnectFormProps } from "..";
import {
extractServerName,
type MCPConnectionTestResult,
parseMCPConfig,
testMCPConnection,
type MCPConnectionTestResult,
} from "../../utils/mcp-config-validator";
import type { ConnectFormProps } from "..";
export const MCPConnectForm: FC<ConnectFormProps> = ({ onSubmit, isSubmitting }) => {
const isSubmittingRef = useRef(false);

View file

@ -1,6 +1,6 @@
"use client";
import { File, FileText, FileSpreadsheet, FolderClosed, Image, Presentation } from "lucide-react";
import { File, FileSpreadsheet, FileText, FolderClosed, Image, Presentation } from "lucide-react";
import type { FC } from "react";
import { useEffect, useState } from "react";
import { GoogleDriveFolderTree } from "@/components/connectors/google-drive-folder-tree";

View file

@ -10,12 +10,12 @@ import { Label } from "@/components/ui/label";
import { Textarea } from "@/components/ui/textarea";
import { EnumConnectorName } from "@/contracts/enums/connector";
import type { MCPServerConfig } from "@/contracts/types/mcp.types";
import type { ConnectorConfigProps } from "../index";
import {
type MCPConnectionTestResult,
parseMCPConfig,
testMCPConnection,
type MCPConnectionTestResult,
} from "../../utils/mcp-config-validator";
import type { ConnectorConfigProps } from "../index";
interface MCPConfigProps extends ConnectorConfigProps {
onNameChange?: (name: string) => void;

View file

@ -11,8 +11,8 @@ import {
useState,
} from "react";
import ReactDOMServer from "react-dom/server";
import type { Document } from "@/contracts/types/document.types";
import { getConnectorIcon } from "@/contracts/enums/connectorIcons";
import type { Document } from "@/contracts/types/document.types";
import { cn } from "@/lib/utils";
export interface MentionedDocument {

View file

@ -36,10 +36,12 @@ export function CommentPanel({
if (isLoading) {
return (
<div className={cn(
"flex min-h-[120px] items-center justify-center p-4",
!isMobile && "w-96 rounded-lg border bg-card"
)}>
<div
className={cn(
"flex min-h-[120px] items-center justify-center p-4",
!isMobile && "w-96 rounded-lg border bg-card"
)}
>
<div className="flex items-center gap-2 text-sm text-muted-foreground">
<div className="size-4 animate-spin rounded-full border-2 border-current border-t-transparent" />
Loading comments...
@ -57,10 +59,7 @@ export function CommentPanel({
return (
<div
className={cn(
"flex flex-col",
isMobile ? "w-full" : "w-85 rounded-lg border bg-card"
)}
className={cn("flex flex-col", isMobile ? "w-full" : "w-85 rounded-lg border bg-card")}
style={!isMobile && effectiveMaxHeight ? { maxHeight: effectiveMaxHeight } : undefined}
>
{hasThreads && (
@ -92,11 +91,7 @@ export function CommentPanel({
</div>
)}
<div className={cn(
"p-3",
showEmptyState && !isMobile && "border-t",
isMobile && "border-t"
)}>
<div className={cn("p-3", showEmptyState && !isMobile && "border-t", isMobile && "border-t")}>
{isComposerOpen ? (
<CommentComposer
members={members}

View file

@ -1,12 +1,7 @@
"use client";
import { MessageSquare } from "lucide-react";
import {
Sheet,
SheetContent,
SheetHeader,
SheetTitle,
} from "@/components/ui/sheet";
import { Sheet, SheetContent, SheetHeader, SheetTitle } from "@/components/ui/sheet";
import { cn } from "@/lib/utils";
import { CommentPanelContainer } from "../comment-panel-container/comment-panel-container";
import type { CommentSheetProps } from "./types";
@ -26,9 +21,7 @@ export function CommentSheet({
side={side}
className={cn(
"flex flex-col p-0",
isBottomSheet
? "h-[85vh] max-h-[85vh] rounded-t-xl"
: "h-full w-full max-w-md"
isBottomSheet ? "h-[85vh] max-h-[85vh] rounded-t-xl" : "h-full w-full max-w-md"
)}
>
{/* Drag handle indicator - only for bottom sheet */}
@ -37,10 +30,7 @@ export function CommentSheet({
<div className="h-1 w-10 rounded-full bg-muted-foreground/30" />
</div>
)}
<SheetHeader className={cn(
"flex-shrink-0 border-b px-4",
isBottomSheet ? "pb-3" : "py-4"
)}>
<SheetHeader className={cn("flex-shrink-0 border-b px-4", isBottomSheet ? "pb-3" : "py-4")}>
<SheetTitle className="flex items-center gap-2 text-base font-semibold">
<MessageSquare className="size-5" />
Comments
@ -52,11 +42,7 @@ export function CommentSheet({
</SheetTitle>
</SheetHeader>
<div className="min-h-0 flex-1 overflow-y-auto">
<CommentPanelContainer
messageId={messageId}
isOpen={true}
variant="mobile"
/>
<CommentPanelContainer messageId={messageId} isOpen={true} variant="mobile" />
</div>
</SheetContent>
</Sheet>

View file

@ -4,6 +4,7 @@ import {
ChevronDown,
ChevronRight,
File,
FileSpreadsheet,
FileText,
FolderClosed,
FolderOpen,
@ -11,7 +12,6 @@ import {
Image,
Loader2,
Presentation,
FileSpreadsheet,
} from "lucide-react";
import { useState } from "react";
import { Checkbox } from "@/components/ui/checkbox";

View file

@ -1,9 +1,9 @@
"use client";
import { Moon, Sun } from "lucide-react";
import { NotificationButton } from "@/components/notifications/NotificationButton";
import { Button } from "@/components/ui/button";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { NotificationButton } from "@/components/notifications/NotificationButton";
interface HeaderProps {
breadcrumb?: React.ReactNode;

View file

@ -1,16 +1,16 @@
"use client";
import { useState } from "react";
import { useAtomValue } from "jotai";
import { Bell } from "lucide-react";
import { useParams } from "next/navigation";
import { useState } from "react";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
import { Button } from "@/components/ui/button";
import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { useNotifications } from "@/hooks/use-notifications";
import { useAtomValue } from "jotai";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
import { NotificationPopup } from "./NotificationPopup";
import { cn } from "@/lib/utils";
import { useParams } from "next/navigation";
import { NotificationPopup } from "./NotificationPopup";
export function NotificationButton() {
const [open, setOpen] = useState(false);

View file

@ -1,14 +1,14 @@
"use client";
import { Bell, CheckCheck, Loader2, AlertCircle, CheckCircle2 } from "lucide-react";
import { formatDistanceToNow } from "date-fns";
import { AlertCircle, Bell, CheckCheck, CheckCircle2, Loader2 } from "lucide-react";
import { useRouter } from "next/navigation";
import { convertRenderedToDisplay } from "@/components/chat-comments/comment-item/comment-item";
import { Button } from "@/components/ui/button";
import { ScrollArea } from "@/components/ui/scroll-area";
import { Separator } from "@/components/ui/separator";
import type { Notification } from "@/hooks/use-notifications";
import { formatDistanceToNow } from "date-fns";
import { cn } from "@/lib/utils";
import { convertRenderedToDisplay } from "@/components/chat-comments/comment-item/comment-item";
interface NotificationPopupProps {
notifications: Notification[];

View file

@ -1,13 +1,13 @@
"use client";
import { useEffect, useState, useRef } from "react";
import { useAtomValue } from "jotai";
import { useEffect, useRef, useState } from "react";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
import {
initElectric,
cleanupElectric,
isElectricInitialized,
type ElectricClient,
initElectric,
isElectricInitialized,
} from "@/lib/electric/client";
import { ElectricContext } from "@/lib/electric/context";

View file

@ -1,9 +1,9 @@
"use client";
import { useEffect, useState, useCallback, useRef } from "react";
import { useElectricClient } from "@/lib/electric/context";
import type { SyncHandle } from "@/lib/electric/client";
import { useCallback, useEffect, useRef, useState } from "react";
import type { SearchSourceConnector } from "@/contracts/types/connector.types";
import type { SyncHandle } from "@/lib/electric/client";
import { useElectricClient } from "@/lib/electric/context";
/**
* Hook for managing connectors with Electric SQL real-time sync

View file

@ -1,8 +1,8 @@
"use client";
import { useEffect, useState, useRef, useMemo } from "react";
import { useElectricClient } from "@/lib/electric/context";
import { useEffect, useMemo, useRef, useState } from "react";
import type { SyncHandle } from "@/lib/electric/client";
import { useElectricClient } from "@/lib/electric/context";
interface Document {
id: number;

View file

@ -1,10 +1,10 @@
"use client";
import { useEffect, useState, useCallback, useRef } from "react";
import { useElectricClient } from "@/lib/electric/context";
import type { SyncHandle } from "@/lib/electric/client";
import { useCallback, useEffect, useRef, useState } from "react";
import type { Notification } from "@/contracts/types/notification.types";
import { authenticatedFetch } from "@/lib/auth-utils";
import type { SyncHandle } from "@/lib/electric/client";
import { useElectricClient } from "@/lib/electric/context";
export type { Notification } from "@/contracts/types/notification.types";

View file

@ -13,8 +13,8 @@
*/
import { PGlite } from "@electric-sql/pglite";
import { electricSync } from "@electric-sql/pglite-sync";
import { live } from "@electric-sql/pglite/live";
import { electricSync } from "@electric-sql/pglite-sync";
// Types
export interface ElectricClient {
@ -270,365 +270,375 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Create and track the sync promise to prevent race conditions
const syncPromise = (async (): Promise<SyncHandle> => {
// Build params for the shape request
// Electric SQL expects params as URL query parameters
const params: Record<string, string> = { table };
// Electric SQL expects params as URL query parameters
const params: Record<string, string> = { table };
// Validate and fix WHERE clause to ensure string literals are properly quoted
let validatedWhere = where;
if (where) {
// Check if where uses positional parameters
if (where.includes("$1")) {
// Extract the value from the where clause if it's embedded
// For now, we'll use the where clause as-is and let Electric handle it
params.where = where;
validatedWhere = where;
} else {
// Validate that string literals are properly quoted
// Count single quotes - should be even (pairs) for properly quoted strings
const singleQuoteCount = (where.match(/'/g) || []).length;
if (singleQuoteCount % 2 !== 0) {
// Odd number of quotes means unterminated string literal
console.warn("Where clause has unmatched quotes, fixing:", where);
// Add closing quote at the end
validatedWhere = `${where}'`;
params.where = validatedWhere;
} else {
// Use the where clause directly (already formatted)
// Validate and fix WHERE clause to ensure string literals are properly quoted
let validatedWhere = where;
if (where) {
// Check if where uses positional parameters
if (where.includes("$1")) {
// Extract the value from the where clause if it's embedded
// For now, we'll use the where clause as-is and let Electric handle it
params.where = where;
validatedWhere = where;
} else {
// Validate that string literals are properly quoted
// Count single quotes - should be even (pairs) for properly quoted strings
const singleQuoteCount = (where.match(/'/g) || []).length;
if (singleQuoteCount % 2 !== 0) {
// Odd number of quotes means unterminated string literal
console.warn("Where clause has unmatched quotes, fixing:", where);
// Add closing quote at the end
validatedWhere = `${where}'`;
params.where = validatedWhere;
} else {
// Use the where clause directly (already formatted)
params.where = where;
validatedWhere = where;
}
}
}
}
if (columns) params.columns = columns.join(",");
if (columns) params.columns = columns.join(",");
console.log("[Electric] Syncing shape with params:", params);
console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`);
console.log("[Electric] Where clause:", where, "Validated:", validatedWhere);
console.log("[Electric] Syncing shape with params:", params);
console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`);
console.log("[Electric] Where clause:", where, "Validated:", validatedWhere);
try {
// Debug: Test Electric SQL connection directly first
// Use validatedWhere to ensure proper URL encoding
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`;
console.log("[Electric] Testing Electric SQL directly:", testUrl);
try {
const testResponse = await fetch(testUrl);
const testHeaders = {
handle: testResponse.headers.get("electric-handle"),
offset: testResponse.headers.get("electric-offset"),
upToDate: testResponse.headers.get("electric-up-to-date"),
};
console.log("[Electric] Direct Electric SQL response headers:", testHeaders);
const testData = await testResponse.json();
console.log(
"[Electric] Direct Electric SQL data count:",
Array.isArray(testData) ? testData.length : "not array",
testData
);
} catch (testErr) {
console.error("[Electric] Direct Electric SQL test failed:", testErr);
}
// Use PGlite's electric sync plugin to sync the shape
// According to Electric SQL docs, the shape config uses params for table, where, columns
// Note: mapColumns is OPTIONAL per pglite-sync types.ts
// Create a promise that resolves when initial sync is complete
// Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout
// IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates
let syncResolved = false;
// Initialize with no-op functions to satisfy TypeScript
let resolveInitialSync: () => void = () => {};
let rejectInitialSync: (error: Error) => void = () => {};
const initialSyncPromise = new Promise<void>((resolve, reject) => {
resolveInitialSync = () => {
if (!syncResolved) {
syncResolved = true;
// DON'T unsubscribe from stream - it needs to stay active for real-time updates
resolve();
}
};
rejectInitialSync = (error: Error) => {
if (!syncResolved) {
syncResolved = true;
// DON'T unsubscribe from stream even on error - let Electric handle it
reject(error);
}
};
// Shorter timeout (5 seconds) as fallback
setTimeout(() => {
if (!syncResolved) {
console.warn(
`[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`
);
// Check isUpToDate one more time before resolving
// This will be checked after shape is created
setTimeout(() => {
if (!syncResolved) {
console.warn(
`[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s`
);
resolveInitialSync();
}
}, 100);
}
}, 5000);
});
// Include userId in shapeKey for user-specific sync state
const shapeConfig = {
shape: {
url: `${electricUrl}/v1/shape`,
params: {
table,
...(validatedWhere ? { where: validatedWhere } : {}),
...(columns ? { columns: columns.join(",") } : {}),
},
},
table,
primaryKey,
shapeKey: `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // User-specific versioned key
onInitialSync: () => {
// Debug: Test Electric SQL connection directly first
// Use validatedWhere to ensure proper URL encoding
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`;
console.log("[Electric] Testing Electric SQL directly:", testUrl);
try {
const testResponse = await fetch(testUrl);
const testHeaders = {
handle: testResponse.headers.get("electric-handle"),
offset: testResponse.headers.get("electric-offset"),
upToDate: testResponse.headers.get("electric-up-to-date"),
};
console.log("[Electric] Direct Electric SQL response headers:", testHeaders);
const testData = await testResponse.json();
console.log(
`[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite`
"[Electric] Direct Electric SQL data count:",
Array.isArray(testData) ? testData.length : "not array",
testData
);
} catch (testErr) {
console.error("[Electric] Direct Electric SQL test failed:", testErr);
}
// Use PGlite's electric sync plugin to sync the shape
// According to Electric SQL docs, the shape config uses params for table, where, columns
// Note: mapColumns is OPTIONAL per pglite-sync types.ts
// Create a promise that resolves when initial sync is complete
// Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout
// IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates
let syncResolved = false;
// Initialize with no-op functions to satisfy TypeScript
let resolveInitialSync: () => void = () => {};
let rejectInitialSync: (error: Error) => void = () => {};
const initialSyncPromise = new Promise<void>((resolve, reject) => {
resolveInitialSync = () => {
if (!syncResolved) {
syncResolved = true;
// DON'T unsubscribe from stream - it needs to stay active for real-time updates
resolve();
}
};
rejectInitialSync = (error: Error) => {
if (!syncResolved) {
syncResolved = true;
// DON'T unsubscribe from stream even on error - let Electric handle it
reject(error);
}
};
// Shorter timeout (5 seconds) as fallback
setTimeout(() => {
if (!syncResolved) {
console.warn(
`[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`
);
// Check isUpToDate one more time before resolving
// This will be checked after shape is created
setTimeout(() => {
if (!syncResolved) {
console.warn(
`[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s`
);
resolveInitialSync();
}
}, 100);
}
}, 5000);
});
// Include userId in shapeKey for user-specific sync state
const shapeConfig = {
shape: {
url: `${electricUrl}/v1/shape`,
params: {
table,
...(validatedWhere ? { where: validatedWhere } : {}),
...(columns ? { columns: columns.join(",") } : {}),
},
},
table,
primaryKey,
shapeKey: `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // User-specific versioned key
onInitialSync: () => {
console.log(
`[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite`
);
resolveInitialSync();
},
onError: (error: Error) => {
console.error(`[Electric] ❌ Shape sync error for ${table}:`, error);
console.error(
"[Electric] Error details:",
JSON.stringify(error, Object.getOwnPropertyNames(error))
);
rejectInitialSync(error);
},
};
console.log(
"[Electric] syncShapeToTable config:",
JSON.stringify(shapeConfig, null, 2)
);
// Type assertion to PGlite with electric extension
const pgWithElectric = db as PGlite & {
electric: {
syncShapeToTable: (
config: typeof shapeConfig
) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>;
};
};
let shape: { unsubscribe: () => void; isUpToDate: boolean; stream: unknown };
try {
shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);
} catch (syncError) {
// Handle "Already syncing" error - pglite-sync might not have fully cleaned up yet
const errorMessage =
syncError instanceof Error ? syncError.message : String(syncError);
if (errorMessage.includes("Already syncing")) {
console.warn(
`[Electric] Already syncing ${table}, waiting for existing sync to settle...`
);
// Wait a short time for pglite-sync to settle
await new Promise((resolve) => setTimeout(resolve, 100));
// Check if an active handle now exists (another sync might have completed)
const existingHandle = activeSyncHandles.get(cacheKey);
if (existingHandle) {
console.log(`[Electric] Found existing handle after waiting: ${cacheKey}`);
return existingHandle;
}
// Retry once after waiting
console.log(`[Electric] Retrying sync for ${table}...`);
try {
shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);
} catch (retryError) {
const retryMessage =
retryError instanceof Error ? retryError.message : String(retryError);
if (retryMessage.includes("Already syncing")) {
// Still syncing - create a placeholder handle that indicates the table is being synced
console.warn(
`[Electric] ${table} still syncing, creating placeholder handle`
);
const placeholderHandle: SyncHandle = {
unsubscribe: () => {
console.log(`[Electric] Placeholder unsubscribe for: ${cacheKey}`);
activeSyncHandles.delete(cacheKey);
},
get isUpToDate() {
return false; // We don't know the real state
},
stream: undefined,
initialSyncPromise: Promise.resolve(), // Already syncing means data should be coming
};
activeSyncHandles.set(cacheKey, placeholderHandle);
return placeholderHandle;
}
throw retryError;
}
} else {
throw syncError;
}
}
if (!shape) {
throw new Error("syncShapeToTable returned undefined");
}
// Log the actual shape result structure
console.log("[Electric] Shape sync result (initial):", {
hasUnsubscribe: typeof shape?.unsubscribe === "function",
isUpToDate: shape?.isUpToDate,
hasStream: !!shape?.stream,
streamType: typeof shape?.stream,
});
// Recommended Approach Step 1: Check isUpToDate immediately
if (shape.isUpToDate) {
console.log(
`[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)`
);
resolveInitialSync();
},
onError: (error: Error) => {
console.error(`[Electric] ❌ Shape sync error for ${table}:`, error);
console.error(
"[Electric] Error details:",
JSON.stringify(error, Object.getOwnPropertyNames(error))
);
rejectInitialSync(error);
},
};
console.log(
"[Electric] syncShapeToTable config:",
JSON.stringify(shapeConfig, null, 2)
);
// Type assertion to PGlite with electric extension
const pgWithElectric = db as PGlite & {
electric: {
syncShapeToTable: (
config: typeof shapeConfig
) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>;
};
};
let shape: { unsubscribe: () => void; isUpToDate: boolean; stream: unknown };
try {
shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);
} catch (syncError) {
// Handle "Already syncing" error - pglite-sync might not have fully cleaned up yet
const errorMessage = syncError instanceof Error ? syncError.message : String(syncError);
if (errorMessage.includes("Already syncing")) {
console.warn(`[Electric] Already syncing ${table}, waiting for existing sync to settle...`);
// Wait a short time for pglite-sync to settle
await new Promise(resolve => setTimeout(resolve, 100));
// Check if an active handle now exists (another sync might have completed)
const existingHandle = activeSyncHandles.get(cacheKey);
if (existingHandle) {
console.log(`[Electric] Found existing handle after waiting: ${cacheKey}`);
return existingHandle;
}
// Retry once after waiting
console.log(`[Electric] Retrying sync for ${table}...`);
try {
shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);
} catch (retryError) {
const retryMessage = retryError instanceof Error ? retryError.message : String(retryError);
if (retryMessage.includes("Already syncing")) {
// Still syncing - create a placeholder handle that indicates the table is being synced
console.warn(`[Electric] ${table} still syncing, creating placeholder handle`);
const placeholderHandle: SyncHandle = {
unsubscribe: () => {
console.log(`[Electric] Placeholder unsubscribe for: ${cacheKey}`);
activeSyncHandles.delete(cacheKey);
},
get isUpToDate() {
return false; // We don't know the real state
},
stream: undefined,
initialSyncPromise: Promise.resolve(), // Already syncing means data should be coming
};
activeSyncHandles.set(cacheKey, placeholderHandle);
return placeholderHandle;
}
throw retryError;
}
} else {
throw syncError;
}
}
if (!shape) {
throw new Error("syncShapeToTable returned undefined");
}
// Log the actual shape result structure
console.log("[Electric] Shape sync result (initial):", {
hasUnsubscribe: typeof shape?.unsubscribe === "function",
isUpToDate: shape?.isUpToDate,
hasStream: !!shape?.stream,
streamType: typeof shape?.stream,
});
// Recommended Approach Step 1: Check isUpToDate immediately
if (shape.isUpToDate) {
console.log(
`[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)`
);
resolveInitialSync();
} else {
// Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message
if (shape?.stream) {
const stream = shape.stream as any;
console.log("[Electric] Shape stream details:", {
shapeHandle: stream?.shapeHandle,
lastOffset: stream?.lastOffset,
isUpToDate: stream?.isUpToDate,
error: stream?.error,
hasSubscribe: typeof stream?.subscribe === "function",
hasUnsubscribe: typeof stream?.unsubscribe === "function",
});
// Subscribe to the stream to watch for "up-to-date" control message
// NOTE: We keep this subscription active - don't unsubscribe!
// The stream is what Electric SQL uses for real-time updates
if (typeof stream?.subscribe === "function") {
console.log(
"[Electric] Subscribing to shape stream to watch for up-to-date message..."
);
// Subscribe but don't store unsubscribe - we want it to stay active
stream.subscribe((messages: unknown[]) => {
// Continue receiving updates even after sync is resolved
if (!syncResolved) {
console.log(
"[Electric] 🔵 Shape stream received messages:",
messages?.length || 0
);
}
// Check if any message indicates sync is complete
if (messages && messages.length > 0) {
for (const message of messages) {
const msg = message as any;
// Check for "up-to-date" control message
if (
msg?.headers?.control === "up-to-date" ||
msg?.headers?.electric_up_to_date === "true" ||
(typeof msg === "object" && "up-to-date" in msg)
) {
if (!syncResolved) {
console.log(`[Electric] ✅ Received up-to-date message for ${table}`);
resolveInitialSync();
}
// Continue listening for real-time updates - don't return!
}
}
if (!syncResolved && messages.length > 0) {
console.log(
"[Electric] First message:",
JSON.stringify(messages[0], null, 2)
);
}
}
// Also check stream's isUpToDate property after receiving messages
if (!syncResolved && stream?.isUpToDate) {
console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`);
resolveInitialSync();
}
// Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message
if (shape?.stream) {
const stream = shape.stream as any;
console.log("[Electric] Shape stream details:", {
shapeHandle: stream?.shapeHandle,
lastOffset: stream?.lastOffset,
isUpToDate: stream?.isUpToDate,
error: stream?.error,
hasSubscribe: typeof stream?.subscribe === "function",
hasUnsubscribe: typeof stream?.unsubscribe === "function",
});
// Also check stream's isUpToDate property immediately
if (stream?.isUpToDate) {
console.log(`[Electric] ✅ Stream isUpToDate is true immediately for ${table}`);
resolveInitialSync();
// Subscribe to the stream to watch for "up-to-date" control message
// NOTE: We keep this subscription active - don't unsubscribe!
// The stream is what Electric SQL uses for real-time updates
if (typeof stream?.subscribe === "function") {
console.log(
"[Electric] Subscribing to shape stream to watch for up-to-date message..."
);
// Subscribe but don't store unsubscribe - we want it to stay active
stream.subscribe((messages: unknown[]) => {
// Continue receiving updates even after sync is resolved
if (!syncResolved) {
console.log(
"[Electric] 🔵 Shape stream received messages:",
messages?.length || 0
);
}
// Check if any message indicates sync is complete
if (messages && messages.length > 0) {
for (const message of messages) {
const msg = message as any;
// Check for "up-to-date" control message
if (
msg?.headers?.control === "up-to-date" ||
msg?.headers?.electric_up_to_date === "true" ||
(typeof msg === "object" && "up-to-date" in msg)
) {
if (!syncResolved) {
console.log(`[Electric] ✅ Received up-to-date message for ${table}`);
resolveInitialSync();
}
// Continue listening for real-time updates - don't return!
}
}
if (!syncResolved && messages.length > 0) {
console.log(
"[Electric] First message:",
JSON.stringify(messages[0], null, 2)
);
}
}
// Also check stream's isUpToDate property after receiving messages
if (!syncResolved && stream?.isUpToDate) {
console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`);
resolveInitialSync();
}
});
// Also check stream's isUpToDate property immediately
if (stream?.isUpToDate) {
console.log(
`[Electric] ✅ Stream isUpToDate is true immediately for ${table}`
);
resolveInitialSync();
}
}
// Also poll isUpToDate periodically as a backup (every 200ms)
const pollInterval = setInterval(() => {
if (syncResolved) {
clearInterval(pollInterval);
return;
}
if (shape.isUpToDate || stream?.isUpToDate) {
console.log(
`[Electric] ✅ Sync completed (detected via polling) for ${table}`
);
clearInterval(pollInterval);
resolveInitialSync();
}
}, 200);
// Clean up polling when promise resolves
initialSyncPromise.finally(() => {
clearInterval(pollInterval);
});
} else {
console.warn(
`[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout`
);
}
// Also poll isUpToDate periodically as a backup (every 200ms)
const pollInterval = setInterval(() => {
if (syncResolved) {
clearInterval(pollInterval);
return;
}
if (shape.isUpToDate || stream?.isUpToDate) {
console.log(`[Electric] ✅ Sync completed (detected via polling) for ${table}`);
clearInterval(pollInterval);
resolveInitialSync();
}
}, 200);
// Clean up polling when promise resolves
initialSyncPromise.finally(() => {
clearInterval(pollInterval);
});
} else {
console.warn(
`[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout`
);
}
}
// Create the sync handle with proper cleanup
const syncHandle: SyncHandle = {
unsubscribe: () => {
console.log(`[Electric] Unsubscribing from: ${cacheKey}`);
// Remove from cache first
activeSyncHandles.delete(cacheKey);
// Then unsubscribe from the shape
if (shape && typeof shape.unsubscribe === "function") {
shape.unsubscribe();
}
},
// Use getter to always return current state
get isUpToDate() {
return shape?.isUpToDate ?? false;
},
stream: shape?.stream,
initialSyncPromise, // Expose promise so callers can wait for sync
};
// Create the sync handle with proper cleanup
const syncHandle: SyncHandle = {
unsubscribe: () => {
console.log(`[Electric] Unsubscribing from: ${cacheKey}`);
// Remove from cache first
activeSyncHandles.delete(cacheKey);
// Then unsubscribe from the shape
if (shape && typeof shape.unsubscribe === "function") {
shape.unsubscribe();
}
},
// Use getter to always return current state
get isUpToDate() {
return shape?.isUpToDate ?? false;
},
stream: shape?.stream,
initialSyncPromise, // Expose promise so callers can wait for sync
};
// Cache the sync handle for reuse (memory optimization)
activeSyncHandles.set(cacheKey, syncHandle);
console.log(
`[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})`
);
return syncHandle;
} catch (error) {
console.error("[Electric] Failed to sync shape:", error);
// Check if Electric SQL server is reachable
try {
const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
method: "GET",
});
// Cache the sync handle for reuse (memory optimization)
activeSyncHandles.set(cacheKey, syncHandle);
console.log(
"[Electric] Electric SQL server response:",
response.status,
response.statusText
`[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})`
);
if (!response.ok) {
console.error("[Electric] Electric SQL server error:", await response.text());
return syncHandle;
} catch (error) {
console.error("[Electric] Failed to sync shape:", error);
// Check if Electric SQL server is reachable
try {
const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
method: "GET",
});
console.log(
"[Electric] Electric SQL server response:",
response.status,
response.statusText
);
if (!response.ok) {
console.error("[Electric] Electric SQL server error:", await response.text());
}
} catch (fetchError) {
console.error("[Electric] Cannot reach Electric SQL server:", fetchError);
console.error("[Electric] Make sure Electric SQL is running at:", electricUrl);
}
} catch (fetchError) {
console.error("[Electric] Cannot reach Electric SQL server:", fetchError);
console.error("[Electric] Make sure Electric SQL is running at:", electricUrl);
throw error;
}
throw error;
}
})();
// Track the sync promise to prevent concurrent syncs for the same shape