feat: add Electric SQL replication for chat_comment_mentions

This commit is contained in:
CREDO23 2026-01-19 17:28:44 +02:00
parent 6b9695e848
commit 2839501503
3 changed files with 191 additions and 17 deletions

View file

@ -1,17 +1,16 @@
"""Add Electric SQL replication for chat_comments table
"""Add Electric SQL replication for chat_comment_mentions table
Revision ID: 71
Revises: 70
Enables Electric SQL replication for the chat_comments table to support
real-time live updates for comments in chat threads.
Enables Electric SQL replication for the chat_comment_mentions table to support
real-time live updates for mentions.
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "71"
down_revision: str | None = "70"
branch_labels: str | Sequence[str] | None = None
@ -19,22 +18,19 @@ depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Enable Electric SQL replication for chat_comments table."""
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
op.execute("ALTER TABLE chat_comments REPLICA IDENTITY FULL;")
"""Enable Electric SQL replication for chat_comment_mentions table."""
op.execute("ALTER TABLE chat_comment_mentions REPLICA IDENTITY FULL;")
# Add chat_comments to Electric SQL publication for replication
op.execute(
"""
DO $$
BEGIN
-- Add chat_comments if not already added
IF NOT EXISTS (
SELECT 1 FROM pg_publication_tables
WHERE pubname = 'electric_publication_default'
AND tablename = 'chat_comments'
AND tablename = 'chat_comment_mentions'
) THEN
ALTER PUBLICATION electric_publication_default ADD TABLE chat_comments;
ALTER PUBLICATION electric_publication_default ADD TABLE chat_comment_mentions;
END IF;
END
$$;
@ -43,8 +39,7 @@ def upgrade() -> None:
def downgrade() -> None:
"""Remove chat_comments from Electric SQL replication."""
# Remove chat_comments from publication
"""Remove chat_comment_mentions from Electric SQL replication."""
op.execute(
"""
DO $$
@ -52,14 +47,13 @@ def downgrade() -> None:
IF EXISTS (
SELECT 1 FROM pg_publication_tables
WHERE pubname = 'electric_publication_default'
AND tablename = 'chat_comments'
AND tablename = 'chat_comment_mentions'
) THEN
ALTER PUBLICATION electric_publication_default DROP TABLE chat_comments;
ALTER PUBLICATION electric_publication_default DROP TABLE chat_comment_mentions;
END IF;
END
$$;
"""
)
# Reset REPLICA IDENTITY to default
op.execute("ALTER TABLE chat_comments REPLICA IDENTITY DEFAULT;")
op.execute("ALTER TABLE chat_comment_mentions REPLICA IDENTITY DEFAULT;")

View file

@ -0,0 +1,167 @@
"use client";
import { useEffect, useState, useRef } from "react";
import { useElectricClient } from "@/lib/electric/context";
import type { SyncHandle } from "@/lib/electric/client";
export interface ElectricMention {
id: number;
comment_id: number;
mentioned_user_id: string;
created_at: string;
}
/**
* Hook for syncing mentions with Electric SQL for real-time updates.
* Syncs all mentions for the current user.
* @param userId - The user ID to sync mentions for
*/
export function useMentionsElectric(userId: string | null) {
const electricClient = useElectricClient();
const [mentions, setMentions] = useState<ElectricMention[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<Error | null>(null);
const syncHandleRef = useRef<SyncHandle | null>(null);
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
const syncKeyRef = useRef<string | null>(null);
useEffect(() => {
if (!electricClient) {
setLoading(false);
setError(new Error("Electric SQL not configured"));
return;
}
if (!userId) {
setMentions([]);
setLoading(false);
return;
}
const syncKey = `mentions_${userId}`;
if (syncKeyRef.current === syncKey) {
return;
}
let mounted = true;
syncKeyRef.current = syncKey;
const client = electricClient;
async function startSync() {
try {
const handle = await client.syncShape({
table: "chat_comment_mentions",
where: `mentioned_user_id = '${userId}'`,
primaryKey: ["id"],
});
if (!handle.isUpToDate && handle.initialSyncPromise) {
try {
await Promise.race([
handle.initialSyncPromise,
new Promise((resolve) => setTimeout(resolve, 2000)),
]);
} catch (syncErr) {
console.error("[useMentionsElectric] Initial sync failed:", syncErr);
}
}
if (!mounted) {
handle.unsubscribe();
return;
}
syncHandleRef.current = handle;
setLoading(false);
setError(null);
await fetchMentions();
await setupLiveQuery();
} catch (err) {
if (!mounted) return;
console.error("[useMentionsElectric] Failed to start sync:", err);
setError(err instanceof Error ? err : new Error("Failed to sync mentions"));
setLoading(false);
}
}
async function fetchMentions() {
try {
const result = await client.db.query<ElectricMention>(
`SELECT id, comment_id, mentioned_user_id, created_at
FROM chat_comment_mentions
WHERE mentioned_user_id = $1
ORDER BY created_at DESC`,
[userId]
);
if (mounted) {
setMentions(result.rows || []);
}
} catch (err) {
console.error("[useMentionsElectric] Failed to fetch:", err);
}
}
async function setupLiveQuery() {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const db = client.db as any;
if (db.live?.query && typeof db.live.query === "function") {
const liveQuery = await db.live.query(
`SELECT id, comment_id, mentioned_user_id, created_at
FROM chat_comment_mentions
WHERE mentioned_user_id = $1
ORDER BY created_at DESC`,
[userId]
);
if (!mounted) {
liveQuery.unsubscribe?.();
return;
}
if (liveQuery.initialResults?.rows) {
setMentions(liveQuery.initialResults.rows);
} else if (liveQuery.rows) {
setMentions(liveQuery.rows);
}
if (typeof liveQuery.subscribe === "function") {
liveQuery.subscribe((result: { rows: ElectricMention[] }) => {
if (mounted && result.rows) {
setMentions(result.rows);
}
});
}
if (typeof liveQuery.unsubscribe === "function") {
liveQueryRef.current = liveQuery;
}
}
} catch (liveErr) {
console.error("[useMentionsElectric] Failed to set up live query:", liveErr);
}
}
startSync();
return () => {
mounted = false;
syncKeyRef.current = null;
if (syncHandleRef.current) {
syncHandleRef.current.unsubscribe();
syncHandleRef.current = null;
}
if (liveQueryRef.current) {
liveQueryRef.current.unsubscribe();
liveQueryRef.current = null;
}
};
}, [userId, electricClient]);
return { mentions, loading, error };
}

View file

@ -224,6 +224,19 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type);
`);
// Create the chat_comment_mentions table schema in PGlite
await db.exec(`
CREATE TABLE IF NOT EXISTS chat_comment_mentions (
id INTEGER PRIMARY KEY,
comment_id INTEGER NOT NULL,
mentioned_user_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_chat_comment_mentions_user_id ON chat_comment_mentions(mentioned_user_id);
CREATE INDEX IF NOT EXISTS idx_chat_comment_mentions_comment_id ON chat_comment_mentions(comment_id);
`);
const electricUrl = getElectricUrl();
// STEP 4: Create the client wrapper