Add new_chat_messages table to PGlite and create useMessagesElectric hook

This commit is contained in:
CREDO23 2026-01-22 18:43:20 +02:00
parent 0b8fed7304
commit 12437f840a
4 changed files with 169 additions and 83 deletions

View file

@ -10,7 +10,6 @@ export const rawMessage = z.object({
content: z.unknown(),
author_id: z.string().nullable(),
created_at: z.string(),
updated_at: z.string(),
});
export type RawMessage = z.infer<typeof rawMessage>;

View file

@ -1,82 +0,0 @@
"use client";
import { useShape } from "@electric-sql/react";
import { useAtomValue } from "jotai";
import { useMemo } from "react";
import { membersAtom } from "@/atoms/members/members-query.atoms";
import type { RawMessage } from "@/contracts/types/chat-messages.types";
import type { Membership } from "@/contracts/types/members.types";
import type { MessageRecord } from "@/lib/chat/thread-persistence";
const ELECTRIC_URL = process.env.NEXT_PUBLIC_ELECTRIC_URL || "http://localhost:5133";
/**
* Member info for building author data - derived from Membership
*/
type MemberInfo = Pick<Membership, "user_display_name" | "user_avatar_url">;
/**
* Hook to get live chat messages for real-time sync.
* Uses Electric SQL for messages + membersAtom (API) for author info.
*/
export function useChatMessagesLive(threadId: number | null) {
const {
data: messagesData,
isLoading: messagesLoading,
isError: messagesError,
error: messagesErrorDetails,
} = useShape<RawMessage>({
url: `${ELECTRIC_URL}/v1/shape`,
params: {
table: "new_chat_messages",
where: `thread_id = ${threadId}`,
},
});
const { data: membersData, isLoading: membersLoading } = useAtomValue(membersAtom);
const messages = useMemo<MessageRecord[]>(() => {
if (!messagesData) return [];
// Build member lookup map
const memberMap = new Map<string, MemberInfo>();
if (membersData) {
for (const member of membersData) {
memberMap.set(member.user_id, {
user_display_name: member.user_display_name,
user_avatar_url: member.user_avatar_url,
});
}
}
// Transform raw messages to MessageRecord with author info
return [...messagesData].map((msg): MessageRecord => {
const author = msg.author_id ? memberMap.get(msg.author_id) : null;
const role = (typeof msg.role === "string" ? msg.role.toLowerCase() : msg.role) as
| "user"
| "assistant"
| "system";
return {
id: msg.id,
thread_id: msg.thread_id,
role,
content: msg.content,
created_at: msg.created_at,
author_id: msg.author_id,
author_display_name: author?.user_display_name ?? null,
author_avatar_url: author?.user_avatar_url ?? null,
};
});
}, [messagesData, membersData]);
return {
messages,
isLoading: messagesLoading || membersLoading,
isError: messagesError,
error: messagesError ? messagesErrorDetails : null,
};
}

View file

@ -0,0 +1,154 @@
"use client";
import { useCallback, useEffect, useRef } from "react";
import type { RawMessage } from "@/contracts/types/chat-messages.types";
import type { SyncHandle } from "@/lib/electric/client";
import { useElectricClient } from "@/lib/electric/context";
/**
* Syncs chat messages for a thread via Electric SQL.
* Calls onMessagesUpdate when messages change.
*/
export function useMessagesElectric(
threadId: number | null,
onMessagesUpdate: (messages: RawMessage[]) => void
) {
const electricClient = useElectricClient();
const syncHandleRef = useRef<SyncHandle | null>(null);
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
const syncKeyRef = useRef<string | null>(null);
const onMessagesUpdateRef = useRef(onMessagesUpdate);
useEffect(() => {
onMessagesUpdateRef.current = onMessagesUpdate;
}, [onMessagesUpdate]);
const handleMessagesUpdate = useCallback((rows: RawMessage[]) => {
onMessagesUpdateRef.current(rows);
}, []);
useEffect(() => {
if (!threadId || !electricClient) {
return;
}
const syncKey = `messages_${threadId}`;
if (syncKeyRef.current === syncKey) {
return;
}
const client = electricClient;
let mounted = true;
syncKeyRef.current = syncKey;
async function startSync() {
try {
const handle = await client.syncShape({
table: "new_chat_messages",
where: `thread_id = ${threadId}`,
columns: ["id", "thread_id", "role", "content", "author_id", "created_at"],
primaryKey: ["id"],
});
if (!handle.isUpToDate && handle.initialSyncPromise) {
try {
await Promise.race([
handle.initialSyncPromise,
new Promise((resolve) => setTimeout(resolve, 3000)),
]);
} catch {
// Timeout
}
}
if (!mounted) {
handle.unsubscribe();
return;
}
syncHandleRef.current = handle;
await fetchMessages();
await setupLiveQuery();
} catch {
// Sync failed
}
}
async function fetchMessages() {
try {
const result = await client.db.query<RawMessage>(
`SELECT id, thread_id, role, content, author_id, created_at
FROM new_chat_messages
WHERE thread_id = $1
ORDER BY created_at ASC`,
[threadId]
);
if (mounted && result.rows) {
handleMessagesUpdate(result.rows);
}
} catch {
// Query failed
}
}
async function setupLiveQuery() {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const db = client.db as any;
if (db.live?.query && typeof db.live.query === "function") {
const liveQuery = await db.live.query(
`SELECT id, thread_id, role, content, author_id, created_at
FROM new_chat_messages
WHERE thread_id = $1
ORDER BY created_at ASC`,
[threadId]
);
if (!mounted) {
liveQuery.unsubscribe?.();
return;
}
if (liveQuery.initialResults?.rows) {
handleMessagesUpdate(liveQuery.initialResults.rows);
} else if (liveQuery.rows) {
handleMessagesUpdate(liveQuery.rows);
}
if (typeof liveQuery.subscribe === "function") {
liveQuery.subscribe((result: { rows: RawMessage[] }) => {
if (mounted && result.rows) {
handleMessagesUpdate(result.rows);
}
});
}
if (typeof liveQuery.unsubscribe === "function") {
liveQueryRef.current = liveQuery;
}
}
} catch {
// Live query failed
}
}
startSync();
return () => {
mounted = false;
syncKeyRef.current = null;
if (syncHandleRef.current) {
syncHandleRef.current.unsubscribe();
syncHandleRef.current = null;
}
if (liveQueryRef.current) {
liveQueryRef.current.unsubscribe();
liveQueryRef.current = null;
}
};
}, [threadId, electricClient, handleMessagesUpdate]);
}

View file

@ -258,6 +258,21 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
CREATE INDEX IF NOT EXISTS idx_chat_comments_parent_id ON chat_comments(parent_id);
`);
// Create new_chat_messages table for live message sync
await db.exec(`
CREATE TABLE IF NOT EXISTS new_chat_messages (
id INTEGER PRIMARY KEY,
thread_id INTEGER NOT NULL,
role TEXT NOT NULL,
content JSONB NOT NULL,
author_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_new_chat_messages_thread_id ON new_chat_messages(thread_id);
CREATE INDEX IF NOT EXISTS idx_new_chat_messages_created_at ON new_chat_messages(created_at);
`);
const electricUrl = getElectricUrl();
// STEP 4: Create the client wrapper