feat: rewrite use-messages-electric hook from Electric to Zero

Replace PGlite sync+live query with Zero useQuery. Maps camelCase
output to snake_case to preserve consumer contract. 162 → 38 lines.
This commit is contained in:
CREDO23 2026-03-23 19:00:20 +02:00
parent b27061e44a
commit 5ad25d10ea

View file

@ -1,162 +1,38 @@
"use client"; "use client";
import { useCallback, useEffect, useRef } from "react"; import { useEffect, useRef } from "react";
import type { RawMessage } from "@/contracts/types/chat-messages.types"; import type { RawMessage } from "@/contracts/types/chat-messages.types";
import type { SyncHandle } from "@/lib/electric/client"; import { queries } from "@/zero/queries";
import { useElectricClient } from "@/lib/electric/context"; import { useQuery } from "@rocicorp/zero/react";
/** /**
* Syncs chat messages for a thread via Electric SQL. * Syncs chat messages for a thread via Zero.
* Calls onMessagesUpdate when messages change. * Calls onMessagesUpdate when messages change.
*/ */
export function useMessagesElectric( export function useMessagesElectric(
threadId: number | null, threadId: number | null,
onMessagesUpdate: (messages: RawMessage[]) => void onMessagesUpdate: (messages: RawMessage[]) => void
) { ) {
const electricClient = useElectricClient();
const syncHandleRef = useRef<SyncHandle | null>(null);
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
const syncKeyRef = useRef<string | null>(null);
const onMessagesUpdateRef = useRef(onMessagesUpdate); const onMessagesUpdateRef = useRef(onMessagesUpdate);
useEffect(() => { useEffect(() => {
onMessagesUpdateRef.current = onMessagesUpdate; onMessagesUpdateRef.current = onMessagesUpdate;
}, [onMessagesUpdate]); }, [onMessagesUpdate]);
const handleMessagesUpdate = useCallback((rows: RawMessage[]) => { const [messages] = useQuery(queries.messages.byThread({ threadId: threadId ?? -1 }));
onMessagesUpdateRef.current(rows);
}, []);
useEffect(() => { useEffect(() => {
if (!threadId || !electricClient) { if (!threadId || !messages) return;
return;
}
const syncKey = `messages_${threadId}`; const mapped: RawMessage[] = messages.map((msg) => ({
if (syncKeyRef.current === syncKey) { id: msg.id,
return; thread_id: msg.threadId,
} role: msg.role,
content: msg.content,
author_id: msg.authorId ?? null,
created_at: String(msg.createdAt),
}));
const client = electricClient; onMessagesUpdateRef.current(mapped);
let mounted = true; }, [threadId, messages]);
syncKeyRef.current = syncKey;
async function startSync() {
try {
const handle = await client.syncShape({
table: "new_chat_messages",
where: `thread_id = ${threadId}`,
columns: ["id", "thread_id", "role", "content", "author_id", "created_at"],
primaryKey: ["id"],
});
if (!handle.isUpToDate && handle.initialSyncPromise) {
try {
await Promise.race([
handle.initialSyncPromise,
new Promise((resolve) => setTimeout(resolve, 3000)),
]);
} catch {
// Timeout
}
}
if (!mounted) {
handle.unsubscribe();
return;
}
syncHandleRef.current = handle;
await fetchMessages();
await setupLiveQuery();
} catch {
// Sync failed
}
}
async function fetchMessages() {
try {
const result = await client.db.query<RawMessage>(
`SELECT id, thread_id, role, content, author_id, created_at
FROM new_chat_messages
WHERE thread_id = $1
ORDER BY created_at ASC`,
[threadId]
);
if (mounted && result.rows) {
handleMessagesUpdate(result.rows);
}
} catch {
// Query failed
}
}
async function setupLiveQuery() {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const db = client.db as any;
if (db.live?.query && typeof db.live.query === "function") {
const liveQuery = await db.live.query(
`SELECT id, thread_id, role, content, author_id, created_at
FROM new_chat_messages
WHERE thread_id = $1
ORDER BY created_at ASC`,
[threadId]
);
if (!mounted) {
liveQuery.unsubscribe?.();
return;
}
if (liveQuery.initialResults?.rows) {
handleMessagesUpdate(liveQuery.initialResults.rows);
} else if (liveQuery.rows) {
handleMessagesUpdate(liveQuery.rows);
}
if (typeof liveQuery.subscribe === "function") {
liveQuery.subscribe((result: { rows: RawMessage[] }) => {
if (mounted && result.rows) {
handleMessagesUpdate(result.rows);
}
});
}
if (typeof liveQuery.unsubscribe === "function") {
liveQueryRef.current = liveQuery;
}
}
} catch {
// Live query failed
}
}
startSync();
return () => {
mounted = false;
syncKeyRef.current = null;
if (syncHandleRef.current) {
try {
syncHandleRef.current.unsubscribe();
} catch {
// PGlite may already be closed during cleanup
}
syncHandleRef.current = null;
}
if (liveQueryRef.current) {
try {
liveQueryRef.current.unsubscribe();
} catch {
// PGlite may already be closed during cleanup
}
liveQueryRef.current = null;
}
};
}, [threadId, electricClient, handleMessagesUpdate]);
} }