Merge pull request #928 from CREDO23/feat/migrate-electric-to-zero

[Feat] Migrate real-time sync from Electric SQL to Rocicorp Zero
This commit is contained in:
Rohan Verma 2026-03-24 12:30:57 -07:00 committed by GitHub
commit ac403402ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
98 changed files with 3939 additions and 3150 deletions

View file

@ -16,7 +16,6 @@ import {
} from "@/components/ui/dropdown-menu";
import { Spinner } from "@/components/ui/spinner";
import { logout } from "@/lib/auth-utils";
import { cleanupElectric } from "@/lib/electric/client";
import { resetUser, trackLogout } from "@/lib/posthog/events";
export function UserDropdown({
@ -39,14 +38,6 @@ export function UserDropdown({
trackLogout();
resetUser();
// Best-effort cleanup of Electric SQL / PGlite
// Even if this fails, login-time cleanup will handle it
try {
await cleanupElectric();
} catch (err) {
console.warn("[Logout] Electric cleanup failed (will be handled on next login):", err);
}
// Revoke refresh token on server and clear all tokens from localStorage
await logout();

View file

@ -20,7 +20,7 @@ import { Dialog, DialogContent, DialogTitle } from "@/components/ui/dialog";
import { Spinner } from "@/components/ui/spinner";
import { Tabs, TabsContent } from "@/components/ui/tabs";
import type { SearchSourceConnector } from "@/contracts/types/connector.types";
import { useConnectorsElectric } from "@/hooks/use-connectors-electric";
import { useConnectorsSync } from "@/hooks/use-connectors-sync";
import { PICKER_CLOSE_EVENT, PICKER_OPEN_EVENT } from "@/hooks/use-google-picker";
import { cn } from "@/lib/utils";
import { ConnectorDialogHeader } from "./connector-popup/components/connector-dialog-header";
@ -155,33 +155,23 @@ export const ConnectorIndicator = forwardRef<ConnectorIndicatorHandle, Connector
};
}, []);
// Fetch connectors using Electric SQL + PGlite for real-time updates
// This provides instant updates when connectors change, without polling
const {
connectors: connectorsFromElectric = [],
connectors: connectorsFromSync = [],
loading: connectorsLoading,
error: connectorsError,
refreshConnectors: refreshConnectorsElectric,
} = useConnectorsElectric(searchSpaceId);
refreshConnectors: refreshConnectorsSync,
} = useConnectorsSync(searchSpaceId);
// Fallback to API if Electric is not available or fails
// Use Electric data if: 1) we have data, or 2) still loading without error
// Use API data if: Electric failed (has error) or finished loading with no data
const useElectricData =
connectorsFromElectric.length > 0 || (connectorsLoading && !connectorsError);
const connectors = useElectricData ? connectorsFromElectric : allConnectors || [];
const useSyncData = connectorsFromSync.length > 0 || (connectorsLoading && !connectorsError);
const connectors = useSyncData ? connectorsFromSync : allConnectors || [];
// Manual refresh function that works with both Electric and API
const refreshConnectors = async () => {
if (useElectricData) {
await refreshConnectorsElectric();
} else {
// Fallback: use allConnectors from useConnectorDialog (which uses connectorsAtom)
// The connectorsAtom will handle refetching if needed
if (useSyncData) {
await refreshConnectorsSync();
}
};
// Track indexing state locally - clears automatically when Electric SQL detects last_indexed_at changed
// Track indexing state locally - clears automatically when last_indexed_at changes via real-time sync
// Also clears when failed notifications are detected
const { indexingConnectorIds, startIndexing, stopIndexing } = useIndexingConnectors(
connectors as SearchSourceConnector[],
@ -202,7 +192,7 @@ export const ConnectorIndicator = forwardRef<ConnectorIndicatorHandle, Connector
const activeConnectorsCount = connectors.length;
// Check which connectors are already connected
// Using Electric SQL + PGlite for real-time connector updates
// Real-time connector updates via Zero sync
const connectedTypes = new Set<string>(
(connectors || []).map((c: SearchSourceConnector) => c.connector_type)
);
@ -291,7 +281,7 @@ export const ConnectorIndicator = forwardRef<ConnectorIndicatorHandle, Connector
<ConnectorAccountsListView
connectorType={viewingAccountsType.connectorType}
connectorTitle={viewingAccountsType.connectorTitle}
connectors={(connectors || []) as SearchSourceConnector[]} // Using Electric SQL + PGlite for real-time connector updates (all connector types)
connectors={(connectors || []) as SearchSourceConnector[]}
indexingConnectorIds={indexingConnectorIds}
onBack={handleBackFromAccountsList}
onManage={handleStartEdit}
@ -323,7 +313,7 @@ export const ConnectorIndicator = forwardRef<ConnectorIndicatorHandle, Connector
...editingConnector,
config: connectorConfig || editingConnector.config,
name: editingConnector.name,
// Sync last_indexed_at with live data from Electric SQL for real-time updates
// Sync last_indexed_at with live data from real-time sync
last_indexed_at:
(connectors as SearchSourceConnector[]).find((c) => c.id === editingConnector.id)
?.last_indexed_at ?? editingConnector.last_indexed_at,

View file

@ -1254,7 +1254,7 @@ export const useConnectorDialog = () => {
queryKey: cacheKeys.logs.summary(Number(searchSpaceId)),
});
// Note: Don't call stopIndexing here - let useIndexingConnectors hook
// detect when last_indexed_at changes via Electric SQL
// detect when last_indexed_at changes via real-time sync
} catch (error) {
console.error("Error indexing connector content:", error);
toast.error(error instanceof Error ? error.message : "Failed to start indexing");

View file

@ -48,13 +48,13 @@ function isTaskTimedOut(startedAt: string | null | undefined): boolean {
*
* This provides a better UX than polling by:
* 1. Setting indexing state immediately when user triggers indexing (optimistic)
* 2. Detecting in_progress notifications from Electric SQL to restore state after remounts
* 2. Detecting in_progress notifications to restore state after remounts
* 3. Clearing indexing state when notifications become completed or failed
* 4. Clearing indexing state when Electric SQL detects last_indexed_at changed
* 4. Clearing indexing state when real-time sync detects last_indexed_at changed
* 5. Detecting stale/stuck tasks that haven't updated in 15+ minutes
* 6. Detecting hard timeout (8h) - tasks that definitely cannot still be running
*
* The actual `last_indexed_at` value comes from Electric SQL/PGlite, not local state.
* The actual `last_indexed_at` value comes from real-time sync, not local state.
*/
export function useIndexingConnectors(
connectors: SearchSourceConnector[],
@ -66,7 +66,7 @@ export function useIndexingConnectors(
// Track previous last_indexed_at values to detect changes
const previousLastIndexedAtRef = useRef<Map<number, string | null>>(new Map());
// Detect when last_indexed_at changes (indexing completed) via Electric SQL
// Detect when last_indexed_at changes (indexing completed) via real-time sync
useEffect(() => {
const previousValues = previousLastIndexedAtRef.current;

View file

@ -97,7 +97,7 @@ import {
} from "@/contracts/enums/toolIcons";
import type { Document } from "@/contracts/types/document.types";
import { useBatchCommentsPreload } from "@/hooks/use-comments";
import { useCommentsElectric } from "@/hooks/use-comments-electric";
import { useCommentsSync } from "@/hooks/use-comments-sync";
import { useMediaQuery } from "@/hooks/use-media-query";
import { cn } from "@/lib/utils";
@ -371,8 +371,8 @@ const Composer: FC = () => {
const respondingToUserId = sessionState?.respondingToUserId ?? null;
const isBlockedByOtherUser = isAiResponding && respondingToUserId !== currentUser?.id;
// Sync comments for the entire thread via Electric SQL (one subscription per thread)
useCommentsElectric(threadId);
// Sync comments for the entire thread via Zero (one subscription per thread)
useCommentsSync(threadId);
// Batch-prefetch comments for all assistant messages so individual useComments
// hooks never fire their own network requests (eliminates N+1 API calls).
@ -1084,7 +1084,13 @@ const TOOL_GROUPS: ToolGroup[] = [
},
{
label: "Generate",
tools: ["generate_podcast", "generate_video_presentation", "generate_report", "generate_image", "display_image"],
tools: [
"generate_podcast",
"generate_video_presentation",
"generate_report",
"generate_image",
"display_image",
],
},
{
label: "Memory",

View file

@ -54,7 +54,6 @@ import { notificationsApiService } from "@/lib/apis/notifications-api.service";
import { searchSpacesApiService } from "@/lib/apis/search-spaces-api.service";
import { logout } from "@/lib/auth-utils";
import { deleteThread, fetchThreads, updateThread } from "@/lib/chat/thread-persistence";
import { cleanupElectric } from "@/lib/electric/client";
import { resetUser, trackLogout } from "@/lib/posthog/events";
import { cacheKeys } from "@/lib/query-client/cache-keys";
import type { ChatItem, NavItem, SearchSpace } from "../types/layout.types";
@ -155,8 +154,6 @@ export function LayoutDataProvider({ searchSpaceId, children }: LayoutDataProvid
// Search space dialog state
const [isCreateSearchSpaceDialogOpen, setIsCreateSearchSpaceDialogOpen] = useState(false);
// Per-tab inbox hooks — each has independent API loading, pagination,
// and Electric live queries. The Electric sync shape is shared (client-level cache).
const userId = user?.id ? String(user.id) : null;
const numericSpaceId = Number(searchSpaceId) || null;
@ -579,14 +576,6 @@ export function LayoutDataProvider({ searchSpaceId, children }: LayoutDataProvid
trackLogout();
resetUser();
// Best-effort cleanup of Electric SQL / PGlite
// Even if this fails, login-time cleanup will handle it
try {
await cleanupElectric();
} catch (err) {
console.warn("[Logout] Electric cleanup failed (will be handled on next login):", err);
}
// Revoke refresh token on server and clear all tokens from localStorage
await logout();

View file

@ -1,118 +0,0 @@
"use client";
import { useAtomValue } from "jotai";
import { usePathname } from "next/navigation";
import { useEffect, useRef, useState } from "react";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
import { useGlobalLoadingEffect } from "@/hooks/use-global-loading";
import { getBearerToken } from "@/lib/auth-utils";
import {
cleanupElectric,
type ElectricClient,
initElectric,
isElectricInitialized,
} from "@/lib/electric/client";
import { ElectricContext } from "@/lib/electric/context";
const IS_DEV = process.env.NODE_ENV === "development";
interface ElectricProviderProps {
children: React.ReactNode;
}
/**
* Initializes user-specific PGlite database with Electric SQL sync.
* Handles user isolation, cleanup, and re-initialization on user change.
*/
export function ElectricProvider({ children }: ElectricProviderProps) {
const [electricClient, setElectricClient] = useState<ElectricClient | null>(null);
const [error, setError] = useState<Error | null>(null);
const {
data: user,
isSuccess: isUserLoaded,
isError: isUserError,
} = useAtomValue(currentUserAtom);
const previousUserIdRef = useRef<string | null>(null);
const initializingRef = useRef(false);
const pathname = usePathname();
useEffect(() => {
if (typeof window === "undefined") return;
// No user logged in - cleanup if previous user existed
if (!isUserLoaded || !user?.id) {
if (previousUserIdRef.current && isElectricInitialized()) {
if (IS_DEV) console.log("[ElectricProvider] User logged out, cleaning up...");
cleanupElectric().then(() => {
previousUserIdRef.current = null;
setElectricClient(null);
});
}
return;
}
const userId = String(user.id);
// Skip if already initialized for this user or currently initializing
if ((electricClient && previousUserIdRef.current === userId) || initializingRef.current) {
return;
}
initializingRef.current = true;
let mounted = true;
async function init() {
try {
if (IS_DEV) console.log(`[ElectricProvider] Initializing for user: ${userId}`);
const client = await initElectric(userId);
if (mounted) {
previousUserIdRef.current = userId;
setElectricClient(client);
setError(null);
if (IS_DEV) console.log(`[ElectricProvider] ✅ Ready for user: ${userId}`);
}
} catch (err) {
console.error("[ElectricProvider] Failed to initialize:", err);
if (mounted) {
setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL"));
setElectricClient(null);
}
} finally {
if (mounted) {
initializingRef.current = false;
}
}
}
init();
return () => {
mounted = false;
};
}, [user?.id, isUserLoaded, electricClient]);
const hasToken = typeof window !== "undefined" && !!getBearerToken();
// Only block UI on dashboard routes; public pages render immediately
const requiresElectricLoading = pathname?.startsWith("/dashboard");
const shouldShowLoading =
hasToken && isUserLoaded && !!user?.id && !electricClient && !error && requiresElectricLoading;
useGlobalLoadingEffect(shouldShowLoading);
// Render immediately for unauthenticated users or failed user queries
if (!hasToken || !isUserLoaded || !user?.id || isUserError) {
return <ElectricContext.Provider value={null}>{children}</ElectricContext.Provider>;
}
// Render with null context while initializing
if (!electricClient && !error) {
return <ElectricContext.Provider value={null}>{children}</ElectricContext.Provider>;
}
if (error) {
console.warn("[ElectricProvider] Initialization failed, sync may not work:", error.message);
}
return <ElectricContext.Provider value={electricClient}>{children}</ElectricContext.Provider>;
}

View file

@ -0,0 +1,65 @@
"use client";
import {
useConnectionState,
useZero,
ZeroProvider as ZeroReactProvider,
} from "@rocicorp/zero/react";
import { useAtomValue } from "jotai";
import { useEffect, useRef } from "react";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
import { getBearerToken, handleUnauthorized, refreshAccessToken } from "@/lib/auth-utils";
import { queries } from "@/zero/queries";
import { schema } from "@/zero/schema";
const cacheURL = process.env.NEXT_PUBLIC_ZERO_CACHE_URL || "http://localhost:4848";
function ZeroAuthGuard({ children }: { children: React.ReactNode }) {
const zero = useZero();
const connectionState = useConnectionState();
const isRefreshingRef = useRef(false);
useEffect(() => {
if (connectionState.name !== "needs-auth" || isRefreshingRef.current) return;
isRefreshingRef.current = true;
refreshAccessToken()
.then((newToken) => {
if (newToken) {
zero.connection.connect({ auth: newToken });
} else {
handleUnauthorized();
}
})
.finally(() => {
isRefreshingRef.current = false;
});
}, [connectionState, zero]);
return <>{children}</>;
}
export function ZeroProvider({ children }: { children: React.ReactNode }) {
const { data: user } = useAtomValue(currentUserAtom);
const hasUser = !!user?.id;
const userID = hasUser ? String(user.id) : "anon";
const context = hasUser ? { userId: String(user.id) } : undefined;
const auth = hasUser ? getBearerToken() || undefined : undefined;
const opts = {
userID,
schema,
queries,
context,
cacheURL,
auth,
};
return (
<ZeroReactProvider {...opts}>
{hasUser ? <ZeroAuthGuard>{children}</ZeroAuthGuard> : children}
</ZeroReactProvider>
);
}

View file

@ -6,9 +6,9 @@ import { ReportPanel } from "@/components/report-panel/report-panel";
import { DisplayImageToolUI } from "@/components/tool-ui/display-image";
import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast";
import { GenerateReportToolUI } from "@/components/tool-ui/generate-report";
import { GenerateVideoPresentationToolUI } from "@/components/tool-ui/video-presentation";
import { LinkPreviewToolUI } from "@/components/tool-ui/link-preview";
import { ScrapeWebpageToolUI } from "@/components/tool-ui/scrape-webpage";
import { GenerateVideoPresentationToolUI } from "@/components/tool-ui/video-presentation";
import { Spinner } from "@/components/ui/spinner";
import { usePublicChat } from "@/hooks/use-public-chat";
import { usePublicChatRuntime } from "@/hooks/use-public-chat-runtime";

View file

@ -32,7 +32,6 @@ export {
} from "./display-image";
export { GeneratePodcastToolUI } from "./generate-podcast";
export { GenerateReportToolUI } from "./generate-report";
export { GenerateVideoPresentationToolUI } from "./video-presentation";
export { CreateGoogleDriveFileToolUI, DeleteGoogleDriveFileToolUI } from "./google-drive";
export {
Image,
@ -106,4 +105,5 @@ export {
SaveMemoryResultSchema,
SaveMemoryToolUI,
} from "./user-memory";
export { GenerateVideoPresentationToolUI } from "./video-presentation";
export { type WriteTodosData, WriteTodosSchema, WriteTodosToolUI } from "./write-todos";

View file

@ -1,9 +1,10 @@
"use client";
import React, { useMemo } from "react";
import { Player } from "@remotion/player";
import { Sequence, AbsoluteFill, useCurrentFrame, useVideoConfig, interpolate } from "remotion";
import { Audio } from "@remotion/media";
import { Player } from "@remotion/player";
import type React from "react";
import { useMemo } from "react";
import { AbsoluteFill, interpolate, Sequence, useCurrentFrame, useVideoConfig } from "remotion";
import { FPS } from "@/lib/remotion/constants";
export interface CompiledSlide {
@ -64,9 +65,7 @@ function Watermark() {
);
}
export function buildSlideWithWatermark(
SlideComponent: React.ComponentType,
): React.FC {
export function buildSlideWithWatermark(SlideComponent: React.ComponentType): React.FC {
const Wrapped: React.FC = () => (
<AbsoluteFill>
<SlideComponent />
@ -115,7 +114,7 @@ export function CombinedPlayer({ slides }: CombinedPlayerProps) {
const totalFrames = useMemo(
() => slides.reduce((sum, s) => sum + s.durationInFrames, 0),
[slides],
[slides]
);
return (

View file

@ -1,16 +1,9 @@
"use client";
import React, { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { makeAssistantToolUI } from "@assistant-ui/react";
import {
AlertCircleIcon,
Download,
Film,
Loader2,
Presentation,
X,
} from "lucide-react";
import { AlertCircleIcon, Download, Film, Loader2, Presentation, X } from "lucide-react";
import { useParams, usePathname } from "next/navigation";
import React, { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { z } from "zod";
import { Spinner } from "@/components/ui/spinner";
import { baseApiService } from "@/lib/apis/base-api.service";
@ -18,9 +11,9 @@ import { authenticatedFetch } from "@/lib/auth-utils";
import { compileCheck, compileToComponent } from "@/lib/remotion/compile-check";
import { FPS } from "@/lib/remotion/constants";
import {
CombinedPlayer,
buildCompositionComponent,
buildSlideWithWatermark,
CombinedPlayer,
type CompiledSlide,
} from "./combined-player";
@ -54,7 +47,7 @@ const VideoPresentationStatusResponseSchema = z.object({
audio_url: z.string().nullish(),
duration_seconds: z.number().nullish(),
duration_in_frames: z.number().nullish(),
}),
})
)
.nullish(),
scene_codes: z
@ -63,7 +56,7 @@ const VideoPresentationStatusResponseSchema = z.object({
slide_number: z.number(),
code: z.string(),
title: z.string().nullish(),
}),
})
)
.nullish(),
slide_count: z.number().nullish(),
@ -206,9 +199,7 @@ function VideoPresentationPlayer({
const durationInFrames = slide.duration_in_frames ?? 300;
const check = compileCheck(scene.code);
if (!check.success) {
console.warn(
`Slide ${slide.slide_number} failed to compile: ${check.error}`,
);
console.warn(`Slide ${slide.slide_number} failed to compile: ${check.error}`);
continue;
}
@ -219,9 +210,7 @@ function VideoPresentationPlayer({
title: scene.title ?? slide.title,
code: scene.code,
durationInFrames,
audioUrl: slide.audio_url
? `${backendUrl}${slide.audio_url}`
: undefined,
audioUrl: slide.audio_url ? `${backendUrl}${slide.audio_url}` : undefined,
});
}
@ -238,17 +227,13 @@ function VideoPresentationPlayer({
try {
let blob: Blob;
if (shareToken) {
blob = await baseApiService.getBlob(
new URL(slide.audioUrl).pathname,
);
blob = await baseApiService.getBlob(new URL(slide.audioUrl).pathname);
} else {
const resp = await authenticatedFetch(slide.audioUrl, {
method: "GET",
});
if (!resp.ok) {
console.warn(
`Audio fetch ${resp.status} for slide "${slide.title}"`,
);
console.warn(`Audio fetch ${resp.status} for slide "${slide.title}"`);
return { ...slide, audioUrl: undefined };
}
blob = await resp.blob();
@ -260,7 +245,7 @@ function VideoPresentationPlayer({
console.warn(`Failed to fetch audio for "${slide.title}":`, err);
return { ...slide, audioUrl: undefined };
}
}),
})
);
setCompiledSlides(withBlobs);
@ -284,7 +269,7 @@ function VideoPresentationPlayer({
const totalDuration = useMemo(
() => compiledSlides.reduce((sum, s) => sum + s.durationInFrames / FPS, 0),
[compiledSlides],
[compiledSlides]
);
const handleDownload = async () => {
@ -299,9 +284,7 @@ function VideoPresentationPlayer({
abortControllerRef.current = controller;
try {
const { canRenderMediaOnWeb, renderMediaOnWeb } = await import(
"@remotion/web-renderer"
);
const { canRenderMediaOnWeb, renderMediaOnWeb } = await import("@remotion/web-renderer");
const formats = [
{ container: "mp4" as const, videoCodec: "h264" as const, ext: "mp4" },
@ -326,7 +309,7 @@ function VideoPresentationPlayer({
if (!chosen) {
throw new Error(
"Your browser does not support video rendering (WebCodecs). Please use Chrome, Edge, or Firefox 130+.",
"Your browser does not support video rendering (WebCodecs). Please use Chrome, Edge, or Firefox 130+."
);
}
@ -422,7 +405,7 @@ function VideoPresentationPlayer({
durationInFrames: slide.durationInFrames,
fps: FPS,
style: { width: 1920, height: 1080 },
}),
})
);
});
@ -466,8 +449,7 @@ function VideoPresentationPlayer({
<div className="min-w-0">
<h3 className="text-sm font-semibold text-foreground truncate">{title}</h3>
<p className="text-xs text-muted-foreground">
{compiledSlides.length} slides &middot; {totalDuration.toFixed(1)}s &middot;{" "}
{FPS}fps
{compiledSlides.length} slides &middot; {totalDuration.toFixed(1)}s &middot; {FPS}fps
</p>
</div>
</div>
@ -479,9 +461,7 @@ function VideoPresentationPlayer({
<Loader2 className="size-3.5 animate-spin text-primary" />
<span className="text-xs font-medium">
Rendering {renderFormat ?? ""}{" "}
{renderProgress !== null
? `${Math.round(renderProgress * 100)}%`
: "..."}
{renderProgress !== null ? `${Math.round(renderProgress * 100)}%` : "..."}
</span>
<div className="h-1.5 w-20 overflow-hidden rounded-full bg-secondary">
<div
@ -538,9 +518,7 @@ function VideoPresentationPlayer({
<AlertCircleIcon className="mt-0.5 size-4 shrink-0 text-destructive" />
<div>
<p className="text-sm font-medium text-destructive">Download Failed</p>
<p className="mt-1 text-xs text-destructive/70 whitespace-pre-wrap">
{renderError}
</p>
<p className="mt-1 text-xs text-destructive/70 whitespace-pre-wrap">{renderError}</p>
</div>
</div>
)}
@ -626,8 +604,7 @@ export const GenerateVideoPresentationToolUI = makeAssistantToolUI<
const params = useParams();
const pathname = usePathname();
const isPublicRoute = pathname?.startsWith("/public/");
const shareToken =
isPublicRoute && typeof params?.token === "string" ? params.token : null;
const shareToken = isPublicRoute && typeof params?.token === "string" ? params.token : null;
const title = args.video_title || "SurfSense Presentation";

View file

@ -19,8 +19,7 @@ const carouselItems = [
},
{
title: "Video Generation",
description:
"Create short videos with AI-generated visuals and narration from your sources.",
description: "Create short videos with AI-generated visuals and narration from your sources.",
src: "/homepage/hero_tutorial/video_gen_surf.mp4",
},
{