mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-05 05:42:39 +02:00
Merge pull request #727 from CREDO23/sur-100-feat-shared-chats-live-collaboration
[Feature] Implement live collaboration in shared threads
This commit is contained in:
commit
99b8a6c970
21 changed files with 1191 additions and 23 deletions
|
|
@ -0,0 +1,75 @@
|
|||
"""Add chat_session_state table for live collaboration
|
||||
|
||||
Revision ID: 75
|
||||
Revises: 74
|
||||
|
||||
Creates chat_session_state table to track AI responding state per thread.
|
||||
Enables real-time sync via Electric SQL for shared chat collaboration.
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from alembic import op
|
||||
|
||||
revision: str = "75"
|
||||
down_revision: str | None = "74"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Create chat_session_state table with Electric SQL replication."""
|
||||
op.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS chat_session_state (
|
||||
id SERIAL PRIMARY KEY,
|
||||
thread_id INTEGER NOT NULL REFERENCES new_chat_threads(id) ON DELETE CASCADE,
|
||||
ai_responding_to_user_id UUID REFERENCES "user"(id) ON DELETE SET NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (thread_id)
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
op.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_chat_session_state_thread_id ON chat_session_state(thread_id)"
|
||||
)
|
||||
|
||||
op.execute("ALTER TABLE chat_session_state REPLICA IDENTITY FULL;")
|
||||
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_publication_tables
|
||||
WHERE pubname = 'electric_publication_default'
|
||||
AND tablename = 'chat_session_state'
|
||||
) THEN
|
||||
ALTER PUBLICATION electric_publication_default ADD TABLE chat_session_state;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Drop chat_session_state table and remove from Electric SQL replication."""
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM pg_publication_tables
|
||||
WHERE pubname = 'electric_publication_default'
|
||||
AND tablename = 'chat_session_state'
|
||||
) THEN
|
||||
ALTER PUBLICATION electric_publication_default DROP TABLE chat_session_state;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
op.execute("DROP TABLE IF EXISTS chat_session_state;")
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
"""Add live collaboration tables to Electric SQL publication
|
||||
|
||||
Revision ID: 76
|
||||
Revises: 75
|
||||
|
||||
Enables real-time sync for live collaboration features:
|
||||
- new_chat_messages: Live message sync between users
|
||||
- chat_comments: Live comment updates
|
||||
|
||||
Note: User/member info is fetched via API (membersAtom) for client-side joins,
|
||||
not via Electric SQL, to keep where clauses optimized and reduce complexity.
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from alembic import op
|
||||
|
||||
revision: str = "76"
|
||||
down_revision: str | None = "75"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Add live collaboration tables to Electric SQL replication."""
|
||||
# Set REPLICA IDENTITY FULL for Electric SQL sync
|
||||
op.execute("ALTER TABLE new_chat_messages REPLICA IDENTITY FULL;")
|
||||
op.execute("ALTER TABLE chat_comments REPLICA IDENTITY FULL;")
|
||||
|
||||
# Add new_chat_messages to Electric publication
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_publication_tables
|
||||
WHERE pubname = 'electric_publication_default'
|
||||
AND tablename = 'new_chat_messages'
|
||||
) THEN
|
||||
ALTER PUBLICATION electric_publication_default ADD TABLE new_chat_messages;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# Add chat_comments to Electric publication
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_publication_tables
|
||||
WHERE pubname = 'electric_publication_default'
|
||||
AND tablename = 'chat_comments'
|
||||
) THEN
|
||||
ALTER PUBLICATION electric_publication_default ADD TABLE chat_comments;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Remove live collaboration tables from Electric SQL replication."""
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM pg_publication_tables
|
||||
WHERE pubname = 'electric_publication_default'
|
||||
AND tablename = 'new_chat_messages'
|
||||
) THEN
|
||||
ALTER PUBLICATION electric_publication_default DROP TABLE new_chat_messages;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM pg_publication_tables
|
||||
WHERE pubname = 'electric_publication_default'
|
||||
AND tablename = 'chat_comments'
|
||||
) THEN
|
||||
ALTER PUBLICATION electric_publication_default DROP TABLE chat_comments;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# Note: Not reverting REPLICA IDENTITY as it doesn't harm normal operations
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
"""Add thread_id to chat_comments for denormalized Electric subscriptions
|
||||
|
||||
This denormalization allows a single Electric SQL subscription per thread
|
||||
instead of one per message, significantly reducing connection overhead.
|
||||
|
||||
Revision ID: 77
|
||||
Revises: 76
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from alembic import op
|
||||
|
||||
revision: str = "77"
|
||||
down_revision: str | None = "76"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Add thread_id column to chat_comments and backfill from messages."""
|
||||
# Add the column (nullable initially for backfill)
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE chat_comments
|
||||
ADD COLUMN IF NOT EXISTS thread_id INTEGER;
|
||||
"""
|
||||
)
|
||||
|
||||
# Backfill thread_id from the related message
|
||||
op.execute(
|
||||
"""
|
||||
UPDATE chat_comments c
|
||||
SET thread_id = m.thread_id
|
||||
FROM new_chat_messages m
|
||||
WHERE c.message_id = m.id
|
||||
AND c.thread_id IS NULL;
|
||||
"""
|
||||
)
|
||||
|
||||
# Make it NOT NULL after backfill
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE chat_comments
|
||||
ALTER COLUMN thread_id SET NOT NULL;
|
||||
"""
|
||||
)
|
||||
|
||||
# Add FK constraint
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE chat_comments
|
||||
ADD CONSTRAINT fk_chat_comments_thread_id
|
||||
FOREIGN KEY (thread_id) REFERENCES new_chat_threads(id) ON DELETE CASCADE;
|
||||
"""
|
||||
)
|
||||
|
||||
# Add index for efficient Electric subscriptions by thread
|
||||
op.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_chat_comments_thread_id ON chat_comments(thread_id)"
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Remove thread_id column from chat_comments."""
|
||||
op.execute("DROP INDEX IF EXISTS idx_chat_comments_thread_id")
|
||||
op.execute("ALTER TABLE chat_comments DROP CONSTRAINT IF EXISTS fk_chat_comments_thread_id")
|
||||
op.execute("ALTER TABLE chat_comments DROP COLUMN IF EXISTS thread_id")
|
||||
|
|
@ -415,6 +415,13 @@ class ChatComment(BaseModel, TimestampMixin):
|
|||
nullable=False,
|
||||
index=True,
|
||||
)
|
||||
# Denormalized thread_id for efficient Electric SQL subscriptions (one per thread)
|
||||
thread_id = Column(
|
||||
Integer,
|
||||
ForeignKey("new_chat_threads.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
index=True,
|
||||
)
|
||||
parent_id = Column(
|
||||
Integer,
|
||||
ForeignKey("chat_comments.id", ondelete="CASCADE"),
|
||||
|
|
@ -438,6 +445,7 @@ class ChatComment(BaseModel, TimestampMixin):
|
|||
|
||||
# Relationships
|
||||
message = relationship("NewChatMessage", back_populates="comments")
|
||||
thread = relationship("NewChatThread")
|
||||
author = relationship("User")
|
||||
parent = relationship(
|
||||
"ChatComment", remote_side="ChatComment.id", backref="replies"
|
||||
|
|
@ -474,6 +482,38 @@ class ChatCommentMention(BaseModel, TimestampMixin):
|
|||
mentioned_user = relationship("User")
|
||||
|
||||
|
||||
class ChatSessionState(BaseModel):
|
||||
"""
|
||||
Tracks real-time session state for shared chat collaboration.
|
||||
One record per thread, synced via Electric SQL.
|
||||
"""
|
||||
|
||||
__tablename__ = "chat_session_state"
|
||||
|
||||
thread_id = Column(
|
||||
Integer,
|
||||
ForeignKey("new_chat_threads.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
unique=True,
|
||||
index=True,
|
||||
)
|
||||
ai_responding_to_user_id = Column(
|
||||
UUID(as_uuid=True),
|
||||
ForeignKey("user.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
index=True,
|
||||
)
|
||||
updated_at = Column(
|
||||
TIMESTAMP(timezone=True),
|
||||
nullable=False,
|
||||
default=lambda: datetime.now(UTC),
|
||||
onupdate=lambda: datetime.now(UTC),
|
||||
)
|
||||
|
||||
thread = relationship("NewChatThread")
|
||||
ai_responding_to_user = relationship("User")
|
||||
|
||||
|
||||
class MemoryCategory(str, Enum):
|
||||
"""Categories for user memories."""
|
||||
|
||||
|
|
|
|||
|
|
@ -990,7 +990,7 @@ async def handle_new_chat(
|
|||
search_space_id=request.search_space_id,
|
||||
chat_id=request.chat_id,
|
||||
session=session,
|
||||
user_id=str(user.id), # Pass user ID for memory tools
|
||||
user_id=str(user.id), # Pass user ID for memory tools and session state
|
||||
llm_config_id=llm_config_id,
|
||||
attachments=request.attachments,
|
||||
mentioned_document_ids=request.mentioned_document_ids,
|
||||
|
|
|
|||
29
surfsense_backend/app/schemas/chat_session_state.py
Normal file
29
surfsense_backend/app/schemas/chat_session_state.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
"""
|
||||
Pydantic schemas for chat session state (live collaboration).
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class RespondingUser(BaseModel):
|
||||
"""The user that the AI is currently responding to."""
|
||||
|
||||
id: UUID
|
||||
display_name: str | None = None
|
||||
email: str
|
||||
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
|
||||
class ChatSessionStateResponse(BaseModel):
|
||||
"""Current session state for a chat thread."""
|
||||
|
||||
id: int
|
||||
thread_id: int
|
||||
responding_to: RespondingUser | None = None
|
||||
updated_at: datetime
|
||||
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
|
@ -281,8 +281,10 @@ async def create_comment(
|
|||
detail="You don't have permission to create comments in this search space",
|
||||
)
|
||||
|
||||
thread = message.thread
|
||||
comment = ChatComment(
|
||||
message_id=message_id,
|
||||
thread_id=thread.id, # Denormalized for efficient Electric subscriptions
|
||||
author_id=user.id,
|
||||
content=content,
|
||||
)
|
||||
|
|
@ -299,7 +301,6 @@ async def create_comment(
|
|||
user_names = await get_user_names_for_mentions(session, set(mentions_map.keys()))
|
||||
|
||||
# Create notifications for mentioned users (excluding author)
|
||||
thread = message.thread
|
||||
author_name = user.display_name or user.email
|
||||
content_preview = render_mentions(content, user_names)
|
||||
for mentioned_user_id, mention_id in mentions_map.items():
|
||||
|
|
@ -393,8 +394,10 @@ async def create_reply(
|
|||
detail="You don't have permission to create comments in this search space",
|
||||
)
|
||||
|
||||
thread = parent_comment.message.thread
|
||||
reply = ChatComment(
|
||||
message_id=parent_comment.message_id,
|
||||
thread_id=thread.id, # Denormalized for efficient Electric subscriptions
|
||||
parent_id=comment_id,
|
||||
author_id=user.id,
|
||||
content=content,
|
||||
|
|
@ -412,7 +415,6 @@ async def create_reply(
|
|||
user_names = await get_user_names_for_mentions(session, set(mentions_map.keys()))
|
||||
|
||||
# Create notifications for mentioned users (excluding author)
|
||||
thread = parent_comment.message.thread
|
||||
author_name = user.display_name or user.email
|
||||
content_preview = render_mentions(content, user_names)
|
||||
for mentioned_user_id, mention_id in mentions_map.items():
|
||||
|
|
|
|||
65
surfsense_backend/app/services/chat_session_state_service.py
Normal file
65
surfsense_backend/app/services/chat_session_state_service.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
"""
|
||||
Service layer for chat session state (live collaboration).
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.db import ChatSessionState
|
||||
|
||||
|
||||
async def get_session_state(
|
||||
session: AsyncSession,
|
||||
thread_id: int,
|
||||
) -> ChatSessionState | None:
|
||||
"""Get the current session state for a thread."""
|
||||
result = await session.execute(
|
||||
select(ChatSessionState)
|
||||
.options(selectinload(ChatSessionState.ai_responding_to_user))
|
||||
.filter(ChatSessionState.thread_id == thread_id)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
|
||||
async def set_ai_responding(
|
||||
session: AsyncSession,
|
||||
thread_id: int,
|
||||
user_id: UUID,
|
||||
) -> ChatSessionState:
|
||||
"""Mark AI as responding to a specific user. Uses upsert for atomicity."""
|
||||
now = datetime.now(UTC)
|
||||
upsert_query = insert(ChatSessionState).values(
|
||||
thread_id=thread_id,
|
||||
ai_responding_to_user_id=user_id,
|
||||
updated_at=now,
|
||||
)
|
||||
upsert_query = upsert_query.on_conflict_do_update(
|
||||
index_elements=["thread_id"],
|
||||
set_={
|
||||
"ai_responding_to_user_id": user_id,
|
||||
"updated_at": now,
|
||||
},
|
||||
)
|
||||
await session.execute(upsert_query)
|
||||
await session.commit()
|
||||
|
||||
return await get_session_state(session, thread_id)
|
||||
|
||||
|
||||
async def clear_ai_responding(
|
||||
session: AsyncSession,
|
||||
thread_id: int,
|
||||
) -> ChatSessionState | None:
|
||||
"""Clear AI responding state when response is complete."""
|
||||
state = await get_session_state(session, thread_id)
|
||||
if state:
|
||||
state.ai_responding_to_user_id = None
|
||||
state.updated_at = datetime.now(UTC)
|
||||
await session.commit()
|
||||
await session.refresh(state)
|
||||
return state
|
||||
|
|
@ -11,6 +11,7 @@ Supports loading LLM configurations from:
|
|||
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
from uuid import UUID
|
||||
|
||||
from langchain_core.messages import HumanMessage
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
|
@ -27,6 +28,10 @@ from app.agents.new_chat.llm_config import (
|
|||
)
|
||||
from app.db import Document, SurfsenseDocsDocument
|
||||
from app.schemas.new_chat import ChatAttachment
|
||||
from app.services.chat_session_state_service import (
|
||||
clear_ai_responding,
|
||||
set_ai_responding,
|
||||
)
|
||||
from app.services.connector_service import ConnectorService
|
||||
from app.services.new_streaming_service import VercelStreamingService
|
||||
|
||||
|
|
@ -167,9 +172,8 @@ async def stream_new_chat(
|
|||
search_space_id: The search space ID
|
||||
chat_id: The chat ID (used as LangGraph thread_id for memory)
|
||||
session: The database session
|
||||
user_id: The current user's UUID string (for memory tools)
|
||||
user_id: The current user's UUID string (for memory tools and session state)
|
||||
llm_config_id: The LLM configuration ID (default: -1 for first global config)
|
||||
messages: Optional chat history from frontend (list of ChatMessage)
|
||||
attachments: Optional attachments with extracted content
|
||||
mentioned_document_ids: Optional list of document IDs mentioned with @ in the chat
|
||||
mentioned_surfsense_doc_ids: Optional list of SurfSense doc IDs mentioned with @ in the chat
|
||||
|
|
@ -183,6 +187,9 @@ async def stream_new_chat(
|
|||
current_text_id: str | None = None
|
||||
|
||||
try:
|
||||
# Mark AI as responding to this user for live collaboration
|
||||
if user_id:
|
||||
await set_ai_responding(session, chat_id, UUID(user_id))
|
||||
# Load LLM config - supports both YAML (negative IDs) and database (positive IDs)
|
||||
agent_config: AgentConfig | None = None
|
||||
|
||||
|
|
@ -1147,3 +1154,7 @@ async def stream_new_chat(
|
|||
yield streaming_service.format_finish_step()
|
||||
yield streaming_service.format_finish()
|
||||
yield streaming_service.format_done()
|
||||
|
||||
finally:
|
||||
# Clear AI responding state for live collaboration
|
||||
await clear_ai_responding(session, chat_id)
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import {
|
|||
// extractWriteTodosFromContent,
|
||||
hydratePlanStateAtom,
|
||||
} from "@/atoms/chat/plan-state.atom";
|
||||
import { membersAtom } from "@/atoms/members/members-query.atoms";
|
||||
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
|
||||
import { Thread } from "@/components/assistant-ui/thread";
|
||||
import { ChatHeader } from "@/components/new-chat/chat-header";
|
||||
|
|
@ -50,6 +51,8 @@ import {
|
|||
type MessageRecord,
|
||||
type ThreadRecord,
|
||||
} from "@/lib/chat/thread-persistence";
|
||||
import { useChatSessionStateSync } from "@/hooks/use-chat-session-state";
|
||||
import { useMessagesElectric } from "@/hooks/use-messages-electric";
|
||||
import {
|
||||
trackChatCreated,
|
||||
trackChatError,
|
||||
|
|
@ -258,6 +261,44 @@ export default function NewChatPage() {
|
|||
// Get current user for author info in shared chats
|
||||
const { data: currentUser } = useAtomValue(currentUserAtom);
|
||||
|
||||
// Live collaboration: sync session state and messages via Electric SQL
|
||||
useChatSessionStateSync(threadId);
|
||||
const { data: membersData } = useAtomValue(membersAtom);
|
||||
|
||||
const handleElectricMessagesUpdate = useCallback(
|
||||
(electricMessages: { id: number; thread_id: number; role: string; content: unknown; author_id: string | null; created_at: string }[]) => {
|
||||
if (isRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
setMessages((prev) => {
|
||||
if (electricMessages.length < prev.length) {
|
||||
return prev;
|
||||
}
|
||||
|
||||
return electricMessages.map((msg) => {
|
||||
const member = msg.author_id
|
||||
? membersData?.find((m) => m.user_id === msg.author_id)
|
||||
: null;
|
||||
|
||||
return convertToThreadMessage({
|
||||
id: msg.id,
|
||||
thread_id: msg.thread_id,
|
||||
role: msg.role.toLowerCase() as "user" | "assistant" | "system",
|
||||
content: msg.content,
|
||||
author_id: msg.author_id,
|
||||
created_at: msg.created_at,
|
||||
author_display_name: member?.user_display_name ?? null,
|
||||
author_avatar_url: member?.user_avatar_url ?? null,
|
||||
});
|
||||
});
|
||||
});
|
||||
},
|
||||
[isRunning, membersData]
|
||||
);
|
||||
|
||||
useMessagesElectric(threadId, handleElectricMessagesUpdate);
|
||||
|
||||
// Create the attachment adapter for file processing
|
||||
const attachmentAdapter = useMemo(() => createAttachmentAdapter(), []);
|
||||
|
||||
|
|
@ -587,8 +628,6 @@ export default function NewChatPage() {
|
|||
content: persistContent,
|
||||
})
|
||||
.then(() => {
|
||||
// For new threads, the backend updates the title from the first user message
|
||||
// Invalidate threads query so sidebar shows the updated title in real-time
|
||||
if (isNewThread) {
|
||||
queryClient.invalidateQueries({ queryKey: ["threads", String(searchSpaceId)] });
|
||||
}
|
||||
|
|
|
|||
15
surfsense_web/atoms/chat/chat-session-state.atom.ts
Normal file
15
surfsense_web/atoms/chat/chat-session-state.atom.ts
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
"use client";
|
||||
|
||||
import { atom } from "jotai";
|
||||
|
||||
export interface ChatSessionStateData {
|
||||
threadId: number;
|
||||
isAiResponding: boolean;
|
||||
respondingToUserId: string | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Global chat session state atom.
|
||||
* Updated by useChatSessionStateSync hook, read anywhere.
|
||||
*/
|
||||
export const chatSessionStateAtom = atom<ChatSessionStateData | null>(null);
|
||||
|
|
@ -9,7 +9,7 @@ export const membersAtom = atomWithQuery((get) => {
|
|||
return {
|
||||
queryKey: cacheKeys.members.all(searchSpaceId?.toString() ?? ""),
|
||||
enabled: !!searchSpaceId,
|
||||
staleTime: 5 * 60 * 1000, // 5 minutes
|
||||
staleTime: 3 * 1000, // 3 seconds - short staleness for live collaboration
|
||||
queryFn: async () => {
|
||||
if (!searchSpaceId) {
|
||||
return [];
|
||||
|
|
|
|||
|
|
@ -0,0 +1,49 @@
|
|||
"use client";
|
||||
|
||||
import type { FC } from "react";
|
||||
import { Loader2 } from "lucide-react";
|
||||
import { cn } from "@/lib/utils";
|
||||
|
||||
interface ChatSessionStatusProps {
|
||||
isAiResponding: boolean;
|
||||
respondingToUserId: string | null;
|
||||
currentUserId: string | null;
|
||||
members: Array<{
|
||||
user_id: string;
|
||||
user_display_name?: string | null;
|
||||
user_email?: string | null;
|
||||
}>;
|
||||
className?: string;
|
||||
}
|
||||
|
||||
export const ChatSessionStatus: FC<ChatSessionStatusProps> = ({
|
||||
isAiResponding,
|
||||
respondingToUserId,
|
||||
currentUserId,
|
||||
members,
|
||||
className,
|
||||
}) => {
|
||||
if (!isAiResponding || !respondingToUserId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (respondingToUserId === currentUserId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const respondingUser = members.find((m) => m.user_id === respondingToUserId);
|
||||
const displayName = respondingUser?.user_display_name || respondingUser?.user_email || "another user";
|
||||
|
||||
return (
|
||||
<div
|
||||
className={cn(
|
||||
"flex items-center gap-2 px-3 py-2 text-sm text-muted-foreground bg-muted/50 rounded-lg",
|
||||
"animate-in fade-in slide-in-from-bottom-2 duration-300 ease-out",
|
||||
className
|
||||
)}
|
||||
>
|
||||
<Loader2 className="size-3.5 animate-spin" />
|
||||
<span>Currently responding to {displayName}</span>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
|
@ -31,6 +31,7 @@ import {
|
|||
mentionedDocumentIdsAtom,
|
||||
mentionedDocumentsAtom,
|
||||
} from "@/atoms/chat/mentioned-documents.atom";
|
||||
import { membersAtom } from "@/atoms/members/members-query.atoms";
|
||||
import {
|
||||
globalNewLLMConfigsAtom,
|
||||
llmPreferencesAtom,
|
||||
|
|
@ -39,6 +40,7 @@ import {
|
|||
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
|
||||
import { AssistantMessage } from "@/components/assistant-ui/assistant-message";
|
||||
import { ComposerAddAttachment, ComposerAttachments } from "@/components/assistant-ui/attachment";
|
||||
import { ChatSessionStatus } from "@/components/assistant-ui/chat-session-status";
|
||||
import { ConnectorIndicator } from "@/components/assistant-ui/connector-popup";
|
||||
import {
|
||||
InlineMentionEditor,
|
||||
|
|
@ -59,6 +61,8 @@ import {
|
|||
import type { ThinkingStep } from "@/components/tool-ui/deepagent-thinking";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import type { Document } from "@/contracts/types/document.types";
|
||||
import { chatSessionStateAtom } from "@/atoms/chat/chat-session-state.atom";
|
||||
import { useCommentsElectric } from "@/hooks/use-comments-electric";
|
||||
import { cn } from "@/lib/utils";
|
||||
|
||||
interface ThreadProps {
|
||||
|
|
@ -86,6 +90,7 @@ const ThreadContent: FC<{ header?: React.ReactNode }> = ({ header }) => {
|
|||
>
|
||||
<ThreadPrimitive.Viewport
|
||||
turnAnchor="top"
|
||||
autoScroll
|
||||
className={cn(
|
||||
"aui-thread-viewport relative flex flex-1 min-h-0 flex-col overflow-y-auto px-4 pt-4 transition-[padding] duration-300 ease-out",
|
||||
showGutter && "lg:pr-30"
|
||||
|
|
@ -215,7 +220,7 @@ const Composer: FC = () => {
|
|||
const editorRef = useRef<InlineMentionEditorRef>(null);
|
||||
const editorContainerRef = useRef<HTMLDivElement>(null);
|
||||
const documentPickerRef = useRef<DocumentMentionPickerRef>(null);
|
||||
const { search_space_id } = useParams();
|
||||
const { search_space_id, chat_id } = useParams();
|
||||
const setMentionedDocumentIds = useSetAtom(mentionedDocumentIdsAtom);
|
||||
const composerRuntime = useComposerRuntime();
|
||||
const hasAutoFocusedRef = useRef(false);
|
||||
|
|
@ -223,6 +228,23 @@ const Composer: FC = () => {
|
|||
const isThreadEmpty = useAssistantState(({ thread }) => thread.isEmpty);
|
||||
const isThreadRunning = useAssistantState(({ thread }) => thread.isRunning);
|
||||
|
||||
// Live collaboration state
|
||||
const { data: currentUser } = useAtomValue(currentUserAtom);
|
||||
const { data: members } = useAtomValue(membersAtom);
|
||||
const threadId = useMemo(() => {
|
||||
if (Array.isArray(chat_id) && chat_id.length > 0) {
|
||||
return Number.parseInt(chat_id[0], 10) || null;
|
||||
}
|
||||
return typeof chat_id === "string" ? Number.parseInt(chat_id, 10) || null : null;
|
||||
}, [chat_id]);
|
||||
const sessionState = useAtomValue(chatSessionStateAtom);
|
||||
const isAiResponding = sessionState?.isAiResponding ?? false;
|
||||
const respondingToUserId = sessionState?.respondingToUserId ?? null;
|
||||
const isBlockedByOtherUser = isAiResponding && respondingToUserId !== currentUser?.id;
|
||||
|
||||
// Sync comments for the entire thread via Electric SQL (one subscription per thread)
|
||||
useCommentsElectric(threadId);
|
||||
|
||||
// Auto-focus editor on new chat page after mount
|
||||
useEffect(() => {
|
||||
if (isThreadEmpty && !hasAutoFocusedRef.current && editorRef.current) {
|
||||
|
|
@ -298,9 +320,9 @@ const Composer: FC = () => {
|
|||
[showDocumentPopover]
|
||||
);
|
||||
|
||||
// Submit message (blocked during streaming or when document picker is open)
|
||||
// Submit message (blocked during streaming, document picker open, or AI responding to another user)
|
||||
const handleSubmit = useCallback(() => {
|
||||
if (isThreadRunning) {
|
||||
if (isThreadRunning || isBlockedByOtherUser) {
|
||||
return;
|
||||
}
|
||||
if (!showDocumentPopover) {
|
||||
|
|
@ -315,6 +337,7 @@ const Composer: FC = () => {
|
|||
}, [
|
||||
showDocumentPopover,
|
||||
isThreadRunning,
|
||||
isBlockedByOtherUser,
|
||||
composerRuntime,
|
||||
setMentionedDocuments,
|
||||
setMentionedDocumentIds,
|
||||
|
|
@ -374,7 +397,13 @@ const Composer: FC = () => {
|
|||
);
|
||||
|
||||
return (
|
||||
<ComposerPrimitive.Root className="aui-composer-root relative flex w-full flex-col">
|
||||
<ComposerPrimitive.Root className="aui-composer-root relative flex w-full flex-col gap-2">
|
||||
<ChatSessionStatus
|
||||
isAiResponding={isAiResponding}
|
||||
respondingToUserId={respondingToUserId}
|
||||
currentUserId={currentUser?.id ?? null}
|
||||
members={members ?? []}
|
||||
/>
|
||||
<ComposerPrimitive.AttachmentDropzone className="aui-composer-attachment-dropzone flex w-full flex-col rounded-2xl border-input bg-muted px-1 pt-2 outline-none transition-shadow data-[dragging=true]:border-ring data-[dragging=true]:border-dashed data-[dragging=true]:bg-accent/50">
|
||||
<ComposerAttachments />
|
||||
{/* Inline editor with @mention support */}
|
||||
|
|
@ -417,13 +446,17 @@ const Composer: FC = () => {
|
|||
/>,
|
||||
document.body
|
||||
)}
|
||||
<ComposerAction />
|
||||
<ComposerAction isBlockedByOtherUser={isBlockedByOtherUser} />
|
||||
</ComposerPrimitive.AttachmentDropzone>
|
||||
</ComposerPrimitive.Root>
|
||||
);
|
||||
};
|
||||
|
||||
const ComposerAction: FC = () => {
|
||||
interface ComposerActionProps {
|
||||
isBlockedByOtherUser?: boolean;
|
||||
}
|
||||
|
||||
const ComposerAction: FC<ComposerActionProps> = ({ isBlockedByOtherUser = false }) => {
|
||||
// Check if any attachments are still being processed (running AND progress < 100)
|
||||
// When progress is 100, processing is done but waiting for send()
|
||||
const hasProcessingAttachments = useAssistantState(({ composer }) =>
|
||||
|
|
@ -458,7 +491,8 @@ const ComposerAction: FC = () => {
|
|||
return userConfigs?.some((c) => c.id === agentLlmId) ?? false;
|
||||
}, [preferences, globalConfigs, userConfigs]);
|
||||
|
||||
const isSendDisabled = hasProcessingAttachments || isComposerEmpty || !hasModelConfigured;
|
||||
const isSendDisabled =
|
||||
hasProcessingAttachments || isComposerEmpty || !hasModelConfigured || isBlockedByOtherUser;
|
||||
|
||||
return (
|
||||
<div className="aui-composer-action-wrapper relative mx-2 mb-2 flex items-center justify-between">
|
||||
|
|
@ -487,13 +521,15 @@ const ComposerAction: FC = () => {
|
|||
<ComposerPrimitive.Send asChild disabled={isSendDisabled}>
|
||||
<TooltipIconButton
|
||||
tooltip={
|
||||
!hasModelConfigured
|
||||
? "Please select a model from the header to start chatting"
|
||||
: hasProcessingAttachments
|
||||
? "Wait for attachments to process"
|
||||
: isComposerEmpty
|
||||
? "Enter a message to send"
|
||||
: "Send message"
|
||||
isBlockedByOtherUser
|
||||
? "Wait for AI to finish responding"
|
||||
: !hasModelConfigured
|
||||
? "Please select a model from the header to start chatting"
|
||||
: hasProcessingAttachments
|
||||
? "Wait for attachments to process"
|
||||
: isComposerEmpty
|
||||
? "Enter a message to send"
|
||||
: "Send message"
|
||||
}
|
||||
side="bottom"
|
||||
type="submit"
|
||||
|
|
|
|||
|
|
@ -1,5 +1,19 @@
|
|||
import { z } from "zod";
|
||||
|
||||
/**
|
||||
* Raw comment
|
||||
*/
|
||||
export const rawComment = z.object({
|
||||
id: z.number(),
|
||||
message_id: z.number(),
|
||||
thread_id: z.number(), // Denormalized for efficient Electric subscriptions
|
||||
parent_id: z.number().nullable(),
|
||||
author_id: z.string().nullable(),
|
||||
content: z.string(),
|
||||
created_at: z.string(),
|
||||
updated_at: z.string(),
|
||||
});
|
||||
|
||||
export const author = z.object({
|
||||
id: z.string().uuid(),
|
||||
display_name: z.string().nullable(),
|
||||
|
|
@ -122,6 +136,7 @@ export const getMentionsResponse = z.object({
|
|||
total_count: z.number(),
|
||||
});
|
||||
|
||||
export type RawComment = z.infer<typeof rawComment>;
|
||||
export type Author = z.infer<typeof author>;
|
||||
export type CommentReply = z.infer<typeof commentReply>;
|
||||
export type Comment = z.infer<typeof comment>;
|
||||
|
|
|
|||
15
surfsense_web/contracts/types/chat-messages.types.ts
Normal file
15
surfsense_web/contracts/types/chat-messages.types.ts
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
import { z } from "zod";
|
||||
|
||||
/**
|
||||
* Raw message from database (Electric SQL sync)
|
||||
*/
|
||||
export const rawMessage = z.object({
|
||||
id: z.number(),
|
||||
thread_id: z.number(),
|
||||
role: z.string(),
|
||||
content: z.unknown(),
|
||||
author_id: z.string().nullable(),
|
||||
created_at: z.string(),
|
||||
});
|
||||
|
||||
export type RawMessage = z.infer<typeof rawMessage>;
|
||||
24
surfsense_web/contracts/types/chat-session-state.types.ts
Normal file
24
surfsense_web/contracts/types/chat-session-state.types.ts
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
import { z } from "zod";
|
||||
|
||||
/**
|
||||
* Chat session state for live collaboration.
|
||||
* Tracks which user the AI is currently responding to.
|
||||
*/
|
||||
export const chatSessionState = z.object({
|
||||
id: z.number(),
|
||||
thread_id: z.number(),
|
||||
ai_responding_to_user_id: z.string().uuid().nullable(),
|
||||
updated_at: z.string(),
|
||||
});
|
||||
|
||||
/**
|
||||
* User currently being responded to by the AI.
|
||||
*/
|
||||
export const respondingUser = z.object({
|
||||
id: z.string().uuid(),
|
||||
display_name: z.string().nullable(),
|
||||
email: z.string(),
|
||||
});
|
||||
|
||||
export type ChatSessionState = z.infer<typeof chatSessionState>;
|
||||
export type RespondingUser = z.infer<typeof respondingUser>;
|
||||
39
surfsense_web/hooks/use-chat-session-state.ts
Normal file
39
surfsense_web/hooks/use-chat-session-state.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
"use client";
|
||||
|
||||
import { useShape } from "@electric-sql/react";
|
||||
import { useSetAtom } from "jotai";
|
||||
import { useEffect } from "react";
|
||||
import { chatSessionStateAtom } from "@/atoms/chat/chat-session-state.atom";
|
||||
import type { ChatSessionState } from "@/contracts/types/chat-session-state.types";
|
||||
|
||||
const ELECTRIC_URL = process.env.NEXT_PUBLIC_ELECTRIC_URL || "http://localhost:5133";
|
||||
|
||||
/**
|
||||
* Syncs chat session state for a thread via Electric SQL.
|
||||
* Call once per thread (in page.tsx). Updates global atom.
|
||||
*/
|
||||
export function useChatSessionStateSync(threadId: number | null) {
|
||||
const setSessionState = useSetAtom(chatSessionStateAtom);
|
||||
|
||||
const { data } = useShape<ChatSessionState>({
|
||||
url: `${ELECTRIC_URL}/v1/shape`,
|
||||
params: {
|
||||
table: "chat_session_state",
|
||||
where: `thread_id = ${threadId ?? -1}`,
|
||||
},
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (!threadId) {
|
||||
setSessionState(null);
|
||||
return;
|
||||
}
|
||||
|
||||
const row = data?.[0];
|
||||
setSessionState({
|
||||
threadId,
|
||||
isAiResponding: !!row?.ai_responding_to_user_id,
|
||||
respondingToUserId: row?.ai_responding_to_user_id ?? null,
|
||||
});
|
||||
}, [threadId, data, setSessionState]);
|
||||
}
|
||||
361
surfsense_web/hooks/use-comments-electric.ts
Normal file
361
surfsense_web/hooks/use-comments-electric.ts
Normal file
|
|
@ -0,0 +1,361 @@
|
|||
"use client";
|
||||
|
||||
import { useQueryClient } from "@tanstack/react-query";
|
||||
import { useAtomValue } from "jotai";
|
||||
import { useCallback, useEffect, useMemo, useRef } from "react";
|
||||
import { membersAtom, myAccessAtom } from "@/atoms/members/members-query.atoms";
|
||||
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
|
||||
import type {
|
||||
Comment,
|
||||
CommentReply,
|
||||
Author,
|
||||
} from "@/contracts/types/chat-comments.types";
|
||||
import type { Membership } from "@/contracts/types/members.types";
|
||||
import type { SyncHandle } from "@/lib/electric/client";
|
||||
import { useElectricClient } from "@/lib/electric/context";
|
||||
import { cacheKeys } from "@/lib/query-client/cache-keys";
|
||||
|
||||
// Raw comment from PGlite local database
|
||||
interface RawCommentRow {
|
||||
id: number;
|
||||
message_id: number;
|
||||
thread_id: number;
|
||||
parent_id: number | null;
|
||||
author_id: string | null;
|
||||
content: string;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
// Regex pattern to match @[uuid] mentions (matches backend MENTION_PATTERN)
|
||||
const MENTION_PATTERN = /@\[([0-9a-fA-F-]{36})\]/g;
|
||||
|
||||
type MemberInfo = Pick<Membership, "user_display_name" | "user_avatar_url" | "user_email">;
|
||||
|
||||
/**
|
||||
* Render mentions in content by replacing @[uuid] with @{DisplayName}
|
||||
*/
|
||||
function renderMentions(content: string, memberMap: Map<string, MemberInfo>): string {
|
||||
return content.replace(MENTION_PATTERN, (match, uuid) => {
|
||||
const member = memberMap.get(uuid);
|
||||
if (member?.user_display_name) {
|
||||
return `@{${member.user_display_name}}`;
|
||||
}
|
||||
return match;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Build member lookup map from membersData
|
||||
*/
|
||||
function buildMemberMap(membersData: Membership[] | undefined): Map<string, MemberInfo> {
|
||||
const map = new Map<string, MemberInfo>();
|
||||
if (membersData) {
|
||||
for (const m of membersData) {
|
||||
map.set(m.user_id, {
|
||||
user_display_name: m.user_display_name,
|
||||
user_avatar_url: m.user_avatar_url,
|
||||
user_email: m.user_email,
|
||||
});
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build author object from member data
|
||||
*/
|
||||
function buildAuthor(authorId: string | null, memberMap: Map<string, MemberInfo>): Author | null {
|
||||
if (!authorId) return null;
|
||||
const m = memberMap.get(authorId);
|
||||
if (!m) return null;
|
||||
return {
|
||||
id: authorId,
|
||||
display_name: m.user_display_name ?? null,
|
||||
avatar_url: m.user_avatar_url ?? null,
|
||||
email: m.user_email ?? "",
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a comment has been edited by comparing timestamps.
|
||||
* Uses a small threshold to handle precision differences.
|
||||
*/
|
||||
function isEdited(createdAt: string, updatedAt: string): boolean {
|
||||
const created = new Date(createdAt).getTime();
|
||||
const updated = new Date(updatedAt).getTime();
|
||||
// Consider edited if updated_at is more than 1 second after created_at
|
||||
return updated - created > 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform raw comment to CommentReply
|
||||
*/
|
||||
function transformReply(
|
||||
raw: RawCommentRow,
|
||||
memberMap: Map<string, MemberInfo>,
|
||||
currentUserId: string | undefined,
|
||||
isOwner: boolean
|
||||
): CommentReply {
|
||||
return {
|
||||
id: raw.id,
|
||||
content: raw.content,
|
||||
content_rendered: renderMentions(raw.content, memberMap),
|
||||
author: buildAuthor(raw.author_id, memberMap),
|
||||
created_at: raw.created_at,
|
||||
updated_at: raw.updated_at,
|
||||
is_edited: isEdited(raw.created_at, raw.updated_at),
|
||||
can_edit: currentUserId === raw.author_id,
|
||||
can_delete: currentUserId === raw.author_id || isOwner,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform raw comments to Comment with replies
|
||||
*/
|
||||
function transformComments(
|
||||
rawComments: RawCommentRow[],
|
||||
memberMap: Map<string, MemberInfo>,
|
||||
currentUserId: string | undefined,
|
||||
isOwner: boolean
|
||||
): Map<number, Comment[]> {
|
||||
// Group comments by message_id
|
||||
const byMessage = new Map<number, { topLevel: RawCommentRow[]; replies: Map<number, RawCommentRow[]> }>();
|
||||
|
||||
for (const raw of rawComments) {
|
||||
if (!byMessage.has(raw.message_id)) {
|
||||
byMessage.set(raw.message_id, { topLevel: [], replies: new Map() });
|
||||
}
|
||||
const group = byMessage.get(raw.message_id)!;
|
||||
|
||||
if (raw.parent_id === null) {
|
||||
group.topLevel.push(raw);
|
||||
} else {
|
||||
if (!group.replies.has(raw.parent_id)) {
|
||||
group.replies.set(raw.parent_id, []);
|
||||
}
|
||||
group.replies.get(raw.parent_id)!.push(raw);
|
||||
}
|
||||
}
|
||||
|
||||
// Transform to Comment objects grouped by message_id
|
||||
const result = new Map<number, Comment[]>();
|
||||
|
||||
for (const [messageId, group] of byMessage) {
|
||||
const comments: Comment[] = group.topLevel.map((raw) => {
|
||||
const replies = (group.replies.get(raw.id) || [])
|
||||
.sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime())
|
||||
.map((r) => transformReply(r, memberMap, currentUserId, isOwner));
|
||||
|
||||
return {
|
||||
id: raw.id,
|
||||
message_id: raw.message_id,
|
||||
content: raw.content,
|
||||
content_rendered: renderMentions(raw.content, memberMap),
|
||||
author: buildAuthor(raw.author_id, memberMap),
|
||||
created_at: raw.created_at,
|
||||
updated_at: raw.updated_at,
|
||||
is_edited: isEdited(raw.created_at, raw.updated_at),
|
||||
can_edit: currentUserId === raw.author_id,
|
||||
can_delete: currentUserId === raw.author_id || isOwner,
|
||||
reply_count: replies.length,
|
||||
replies,
|
||||
};
|
||||
});
|
||||
|
||||
// Sort by created_at
|
||||
comments.sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime());
|
||||
result.set(messageId, comments);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook for syncing comments with Electric SQL real-time sync.
|
||||
*
|
||||
* Syncs ALL comments for a thread in ONE subscription, then updates
|
||||
* React Query cache for each message. This avoids N subscriptions for N messages.
|
||||
*
|
||||
* @param threadId - The thread ID to sync comments for
|
||||
*/
|
||||
export function useCommentsElectric(threadId: number | null) {
|
||||
const electricClient = useElectricClient();
|
||||
const queryClient = useQueryClient();
|
||||
|
||||
const { data: membersData } = useAtomValue(membersAtom);
|
||||
const { data: currentUser } = useAtomValue(currentUserAtom);
|
||||
const { data: myAccess } = useAtomValue(myAccessAtom);
|
||||
|
||||
const memberMap = useMemo(() => buildMemberMap(membersData), [membersData]);
|
||||
const currentUserId = currentUser?.id;
|
||||
const isOwner = myAccess?.is_owner ?? false;
|
||||
|
||||
// Use refs for values needed in live query callback to avoid stale closures
|
||||
const memberMapRef = useRef(memberMap);
|
||||
const currentUserIdRef = useRef(currentUserId);
|
||||
const isOwnerRef = useRef(isOwner);
|
||||
const queryClientRef = useRef(queryClient);
|
||||
|
||||
// Keep refs updated
|
||||
useEffect(() => {
|
||||
memberMapRef.current = memberMap;
|
||||
currentUserIdRef.current = currentUserId;
|
||||
isOwnerRef.current = isOwner;
|
||||
queryClientRef.current = queryClient;
|
||||
}, [memberMap, currentUserId, isOwner, queryClient]);
|
||||
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
const syncKeyRef = useRef<string | null>(null);
|
||||
|
||||
// Stable callback that uses refs for fresh values
|
||||
const updateReactQueryCache = useCallback((rows: RawCommentRow[]) => {
|
||||
const commentsByMessage = transformComments(
|
||||
rows,
|
||||
memberMapRef.current,
|
||||
currentUserIdRef.current,
|
||||
isOwnerRef.current
|
||||
);
|
||||
|
||||
for (const [messageId, comments] of commentsByMessage) {
|
||||
const cacheKey = cacheKeys.comments.byMessage(messageId);
|
||||
queryClientRef.current.setQueryData(cacheKey, {
|
||||
comments,
|
||||
total_count: comments.length,
|
||||
});
|
||||
}
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
if (!threadId || !electricClient) {
|
||||
return;
|
||||
}
|
||||
|
||||
const syncKey = `comments_${threadId}`;
|
||||
if (syncKeyRef.current === syncKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Capture in local variable for use in async functions
|
||||
const client = electricClient;
|
||||
|
||||
let mounted = true;
|
||||
syncKeyRef.current = syncKey;
|
||||
|
||||
async function startSync() {
|
||||
try {
|
||||
const handle = await client.syncShape({
|
||||
table: "chat_comments",
|
||||
where: `thread_id = ${threadId}`,
|
||||
columns: ["id", "message_id", "thread_id", "parent_id", "author_id", "content", "created_at", "updated_at"],
|
||||
primaryKey: ["id"],
|
||||
});
|
||||
|
||||
if (!handle.isUpToDate && handle.initialSyncPromise) {
|
||||
try {
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise((resolve) => setTimeout(resolve, 3000)),
|
||||
]);
|
||||
} catch {
|
||||
// Initial sync timeout - continue anyway
|
||||
}
|
||||
}
|
||||
|
||||
if (!mounted) {
|
||||
handle.unsubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
syncHandleRef.current = handle;
|
||||
|
||||
// Fetch initial comments and update cache
|
||||
await fetchAndUpdateCache();
|
||||
|
||||
// Set up live query for real-time updates
|
||||
await setupLiveQuery();
|
||||
} catch {
|
||||
// Sync failed - will retry on next mount
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchAndUpdateCache() {
|
||||
try {
|
||||
const result = await client.db.query<RawCommentRow>(
|
||||
`SELECT id, message_id, thread_id, parent_id, author_id, content, created_at, updated_at
|
||||
FROM chat_comments
|
||||
WHERE thread_id = $1
|
||||
ORDER BY created_at ASC`,
|
||||
[threadId]
|
||||
);
|
||||
|
||||
if (mounted && result.rows) {
|
||||
updateReactQueryCache(result.rows);
|
||||
}
|
||||
} catch {
|
||||
// Query failed - data will be fetched from API
|
||||
}
|
||||
}
|
||||
|
||||
async function setupLiveQuery() {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const db = client.db as any;
|
||||
|
||||
if (db.live?.query && typeof db.live.query === "function") {
|
||||
const liveQuery = await db.live.query(
|
||||
`SELECT id, message_id, thread_id, parent_id, author_id, content, created_at, updated_at
|
||||
FROM chat_comments
|
||||
WHERE thread_id = $1
|
||||
ORDER BY created_at ASC`,
|
||||
[threadId]
|
||||
);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
// Set initial results
|
||||
if (liveQuery.initialResults?.rows) {
|
||||
updateReactQueryCache(liveQuery.initialResults.rows);
|
||||
} else if (liveQuery.rows) {
|
||||
updateReactQueryCache(liveQuery.rows);
|
||||
}
|
||||
|
||||
// Subscribe to changes
|
||||
if (typeof liveQuery.subscribe === "function") {
|
||||
liveQuery.subscribe((result: { rows: RawCommentRow[] }) => {
|
||||
if (mounted && result.rows) {
|
||||
updateReactQueryCache(result.rows);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (typeof liveQuery.unsubscribe === "function") {
|
||||
liveQueryRef.current = liveQuery;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Live query setup failed - will use initial fetch only
|
||||
}
|
||||
}
|
||||
|
||||
startSync();
|
||||
|
||||
return () => {
|
||||
mounted = false;
|
||||
syncKeyRef.current = null;
|
||||
|
||||
if (syncHandleRef.current) {
|
||||
syncHandleRef.current.unsubscribe();
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
if (liveQueryRef.current) {
|
||||
liveQueryRef.current.unsubscribe();
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [threadId, electricClient, updateReactQueryCache]);
|
||||
}
|
||||
154
surfsense_web/hooks/use-messages-electric.ts
Normal file
154
surfsense_web/hooks/use-messages-electric.ts
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
"use client";
|
||||
|
||||
import { useCallback, useEffect, useRef } from "react";
|
||||
import type { RawMessage } from "@/contracts/types/chat-messages.types";
|
||||
import type { SyncHandle } from "@/lib/electric/client";
|
||||
import { useElectricClient } from "@/lib/electric/context";
|
||||
|
||||
/**
|
||||
* Syncs chat messages for a thread via Electric SQL.
|
||||
* Calls onMessagesUpdate when messages change.
|
||||
*/
|
||||
export function useMessagesElectric(
|
||||
threadId: number | null,
|
||||
onMessagesUpdate: (messages: RawMessage[]) => void
|
||||
) {
|
||||
const electricClient = useElectricClient();
|
||||
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
const syncKeyRef = useRef<string | null>(null);
|
||||
const onMessagesUpdateRef = useRef(onMessagesUpdate);
|
||||
|
||||
useEffect(() => {
|
||||
onMessagesUpdateRef.current = onMessagesUpdate;
|
||||
}, [onMessagesUpdate]);
|
||||
|
||||
const handleMessagesUpdate = useCallback((rows: RawMessage[]) => {
|
||||
onMessagesUpdateRef.current(rows);
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
if (!threadId || !electricClient) {
|
||||
return;
|
||||
}
|
||||
|
||||
const syncKey = `messages_${threadId}`;
|
||||
if (syncKeyRef.current === syncKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
const client = electricClient;
|
||||
let mounted = true;
|
||||
syncKeyRef.current = syncKey;
|
||||
|
||||
async function startSync() {
|
||||
try {
|
||||
const handle = await client.syncShape({
|
||||
table: "new_chat_messages",
|
||||
where: `thread_id = ${threadId}`,
|
||||
columns: ["id", "thread_id", "role", "content", "author_id", "created_at"],
|
||||
primaryKey: ["id"],
|
||||
});
|
||||
|
||||
if (!handle.isUpToDate && handle.initialSyncPromise) {
|
||||
try {
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise((resolve) => setTimeout(resolve, 3000)),
|
||||
]);
|
||||
} catch {
|
||||
// Timeout
|
||||
}
|
||||
}
|
||||
|
||||
if (!mounted) {
|
||||
handle.unsubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
syncHandleRef.current = handle;
|
||||
await fetchMessages();
|
||||
await setupLiveQuery();
|
||||
} catch {
|
||||
// Sync failed
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchMessages() {
|
||||
try {
|
||||
const result = await client.db.query<RawMessage>(
|
||||
`SELECT id, thread_id, role, content, author_id, created_at
|
||||
FROM new_chat_messages
|
||||
WHERE thread_id = $1
|
||||
ORDER BY created_at ASC`,
|
||||
[threadId]
|
||||
);
|
||||
|
||||
if (mounted && result.rows) {
|
||||
handleMessagesUpdate(result.rows);
|
||||
}
|
||||
} catch {
|
||||
// Query failed
|
||||
}
|
||||
}
|
||||
|
||||
async function setupLiveQuery() {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const db = client.db as any;
|
||||
|
||||
if (db.live?.query && typeof db.live.query === "function") {
|
||||
const liveQuery = await db.live.query(
|
||||
`SELECT id, thread_id, role, content, author_id, created_at
|
||||
FROM new_chat_messages
|
||||
WHERE thread_id = $1
|
||||
ORDER BY created_at ASC`,
|
||||
[threadId]
|
||||
);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
if (liveQuery.initialResults?.rows) {
|
||||
handleMessagesUpdate(liveQuery.initialResults.rows);
|
||||
} else if (liveQuery.rows) {
|
||||
handleMessagesUpdate(liveQuery.rows);
|
||||
}
|
||||
|
||||
if (typeof liveQuery.subscribe === "function") {
|
||||
liveQuery.subscribe((result: { rows: RawMessage[] }) => {
|
||||
if (mounted && result.rows) {
|
||||
handleMessagesUpdate(result.rows);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (typeof liveQuery.unsubscribe === "function") {
|
||||
liveQueryRef.current = liveQuery;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Live query failed
|
||||
}
|
||||
}
|
||||
|
||||
startSync();
|
||||
|
||||
return () => {
|
||||
mounted = false;
|
||||
syncKeyRef.current = null;
|
||||
|
||||
if (syncHandleRef.current) {
|
||||
syncHandleRef.current.unsubscribe();
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
if (liveQueryRef.current) {
|
||||
liveQueryRef.current.unsubscribe();
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [threadId, electricClient, handleMessagesUpdate]);
|
||||
}
|
||||
|
|
@ -229,7 +229,6 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
|
|||
CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type);
|
||||
`);
|
||||
|
||||
// Create the chat_comment_mentions table schema in PGlite
|
||||
await db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS chat_comment_mentions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
|
|
@ -242,6 +241,39 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
|
|||
CREATE INDEX IF NOT EXISTS idx_chat_comment_mentions_comment_id ON chat_comment_mentions(comment_id);
|
||||
`);
|
||||
|
||||
// Create chat_comments table for live comment sync
|
||||
await db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS chat_comments (
|
||||
id INTEGER PRIMARY KEY,
|
||||
message_id INTEGER NOT NULL,
|
||||
thread_id INTEGER NOT NULL,
|
||||
parent_id INTEGER,
|
||||
author_id TEXT,
|
||||
content TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_chat_comments_thread_id ON chat_comments(thread_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_chat_comments_message_id ON chat_comments(message_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_chat_comments_parent_id ON chat_comments(parent_id);
|
||||
`);
|
||||
|
||||
// Create new_chat_messages table for live message sync
|
||||
await db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS new_chat_messages (
|
||||
id INTEGER PRIMARY KEY,
|
||||
thread_id INTEGER NOT NULL,
|
||||
role TEXT NOT NULL,
|
||||
content JSONB NOT NULL,
|
||||
author_id TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_new_chat_messages_thread_id ON new_chat_messages(thread_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_new_chat_messages_created_at ON new_chat_messages(created_at);
|
||||
`);
|
||||
|
||||
const electricUrl = getElectricUrl();
|
||||
|
||||
// STEP 4: Create the client wrapper
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue