diff --git a/surfsense_backend/alembic/versions/75_add_chat_session_state_table.py b/surfsense_backend/alembic/versions/75_add_chat_session_state_table.py new file mode 100644 index 000000000..46bf7b9b9 --- /dev/null +++ b/surfsense_backend/alembic/versions/75_add_chat_session_state_table.py @@ -0,0 +1,75 @@ +"""Add chat_session_state table for live collaboration + +Revision ID: 75 +Revises: 74 + +Creates chat_session_state table to track AI responding state per thread. +Enables real-time sync via Electric SQL for shared chat collaboration. +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "75" +down_revision: str | None = "74" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Create chat_session_state table with Electric SQL replication.""" + op.execute( + """ + CREATE TABLE IF NOT EXISTS chat_session_state ( + id SERIAL PRIMARY KEY, + thread_id INTEGER NOT NULL REFERENCES new_chat_threads(id) ON DELETE CASCADE, + ai_responding_to_user_id UUID REFERENCES "user"(id) ON DELETE SET NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (thread_id) + ) + """ + ) + + op.execute( + "CREATE INDEX IF NOT EXISTS idx_chat_session_state_thread_id ON chat_session_state(thread_id)" + ) + + op.execute("ALTER TABLE chat_session_state REPLICA IDENTITY FULL;") + + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'chat_session_state' + ) THEN + ALTER PUBLICATION electric_publication_default ADD TABLE chat_session_state; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Drop chat_session_state table and remove from Electric SQL replication.""" + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'chat_session_state' + ) THEN + ALTER PUBLICATION electric_publication_default DROP TABLE chat_session_state; + END IF; + END + $$; + """ + ) + + op.execute("DROP TABLE IF EXISTS chat_session_state;") diff --git a/surfsense_backend/alembic/versions/76_add_live_collaboration_tables_electric_replication.py b/surfsense_backend/alembic/versions/76_add_live_collaboration_tables_electric_replication.py new file mode 100644 index 000000000..68bca4fc1 --- /dev/null +++ b/surfsense_backend/alembic/versions/76_add_live_collaboration_tables_electric_replication.py @@ -0,0 +1,99 @@ +"""Add live collaboration tables to Electric SQL publication + +Revision ID: 76 +Revises: 75 + +Enables real-time sync for live collaboration features: +- new_chat_messages: Live message sync between users +- chat_comments: Live comment updates + +Note: User/member info is fetched via API (membersAtom) for client-side joins, +not via Electric SQL, to keep where clauses optimized and reduce complexity. +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "76" +down_revision: str | None = "75" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add live collaboration tables to Electric SQL replication.""" + # Set REPLICA IDENTITY FULL for Electric SQL sync + op.execute("ALTER TABLE new_chat_messages REPLICA IDENTITY FULL;") + op.execute("ALTER TABLE chat_comments REPLICA IDENTITY FULL;") + + # Add new_chat_messages to Electric publication + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'new_chat_messages' + ) THEN + ALTER PUBLICATION electric_publication_default ADD TABLE new_chat_messages; + END IF; + END + $$; + """ + ) + + # Add chat_comments to Electric publication + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'chat_comments' + ) THEN + ALTER PUBLICATION electric_publication_default ADD TABLE chat_comments; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Remove live collaboration tables from Electric SQL replication.""" + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'new_chat_messages' + ) THEN + ALTER PUBLICATION electric_publication_default DROP TABLE new_chat_messages; + END IF; + END + $$; + """ + ) + + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_publication_tables + WHERE pubname = 'electric_publication_default' + AND tablename = 'chat_comments' + ) THEN + ALTER PUBLICATION electric_publication_default DROP TABLE chat_comments; + END IF; + END + $$; + """ + ) + + # Note: Not reverting REPLICA IDENTITY as it doesn't harm normal operations diff --git a/surfsense_backend/alembic/versions/77_add_thread_id_to_chat_comments.py b/surfsense_backend/alembic/versions/77_add_thread_id_to_chat_comments.py new file mode 100644 index 000000000..0a2615e84 --- /dev/null +++ b/surfsense_backend/alembic/versions/77_add_thread_id_to_chat_comments.py @@ -0,0 +1,68 @@ +"""Add thread_id to chat_comments for denormalized Electric subscriptions + +This denormalization allows a single Electric SQL subscription per thread +instead of one per message, significantly reducing connection overhead. + +Revision ID: 77 +Revises: 76 +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "77" +down_revision: str | None = "76" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add thread_id column to chat_comments and backfill from messages.""" + # Add the column (nullable initially for backfill) + op.execute( + """ + ALTER TABLE chat_comments + ADD COLUMN IF NOT EXISTS thread_id INTEGER; + """ + ) + + # Backfill thread_id from the related message + op.execute( + """ + UPDATE chat_comments c + SET thread_id = m.thread_id + FROM new_chat_messages m + WHERE c.message_id = m.id + AND c.thread_id IS NULL; + """ + ) + + # Make it NOT NULL after backfill + op.execute( + """ + ALTER TABLE chat_comments + ALTER COLUMN thread_id SET NOT NULL; + """ + ) + + # Add FK constraint + op.execute( + """ + ALTER TABLE chat_comments + ADD CONSTRAINT fk_chat_comments_thread_id + FOREIGN KEY (thread_id) REFERENCES new_chat_threads(id) ON DELETE CASCADE; + """ + ) + + # Add index for efficient Electric subscriptions by thread + op.execute( + "CREATE INDEX IF NOT EXISTS idx_chat_comments_thread_id ON chat_comments(thread_id)" + ) + + +def downgrade() -> None: + """Remove thread_id column from chat_comments.""" + op.execute("DROP INDEX IF EXISTS idx_chat_comments_thread_id") + op.execute("ALTER TABLE chat_comments DROP CONSTRAINT IF EXISTS fk_chat_comments_thread_id") + op.execute("ALTER TABLE chat_comments DROP COLUMN IF EXISTS thread_id") diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index b56f37373..4b9be6f4a 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -415,6 +415,13 @@ class ChatComment(BaseModel, TimestampMixin): nullable=False, index=True, ) + # Denormalized thread_id for efficient Electric SQL subscriptions (one per thread) + thread_id = Column( + Integer, + ForeignKey("new_chat_threads.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) parent_id = Column( Integer, ForeignKey("chat_comments.id", ondelete="CASCADE"), @@ -438,6 +445,7 @@ class ChatComment(BaseModel, TimestampMixin): # Relationships message = relationship("NewChatMessage", back_populates="comments") + thread = relationship("NewChatThread") author = relationship("User") parent = relationship( "ChatComment", remote_side="ChatComment.id", backref="replies" @@ -474,6 +482,38 @@ class ChatCommentMention(BaseModel, TimestampMixin): mentioned_user = relationship("User") +class ChatSessionState(BaseModel): + """ + Tracks real-time session state for shared chat collaboration. + One record per thread, synced via Electric SQL. + """ + + __tablename__ = "chat_session_state" + + thread_id = Column( + Integer, + ForeignKey("new_chat_threads.id", ondelete="CASCADE"), + nullable=False, + unique=True, + index=True, + ) + ai_responding_to_user_id = Column( + UUID(as_uuid=True), + ForeignKey("user.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + updated_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + default=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + ) + + thread = relationship("NewChatThread") + ai_responding_to_user = relationship("User") + + class MemoryCategory(str, Enum): """Categories for user memories.""" diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 4b8600fab..25c53b69e 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -990,7 +990,7 @@ async def handle_new_chat( search_space_id=request.search_space_id, chat_id=request.chat_id, session=session, - user_id=str(user.id), # Pass user ID for memory tools + user_id=str(user.id), # Pass user ID for memory tools and session state llm_config_id=llm_config_id, attachments=request.attachments, mentioned_document_ids=request.mentioned_document_ids, diff --git a/surfsense_backend/app/schemas/chat_session_state.py b/surfsense_backend/app/schemas/chat_session_state.py new file mode 100644 index 000000000..6eca0e26f --- /dev/null +++ b/surfsense_backend/app/schemas/chat_session_state.py @@ -0,0 +1,29 @@ +""" +Pydantic schemas for chat session state (live collaboration). +""" + +from datetime import datetime +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + + +class RespondingUser(BaseModel): + """The user that the AI is currently responding to.""" + + id: UUID + display_name: str | None = None + email: str + + model_config = ConfigDict(from_attributes=True) + + +class ChatSessionStateResponse(BaseModel): + """Current session state for a chat thread.""" + + id: int + thread_id: int + responding_to: RespondingUser | None = None + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) diff --git a/surfsense_backend/app/services/chat_comments_service.py b/surfsense_backend/app/services/chat_comments_service.py index 6f81c0158..dc3b51238 100644 --- a/surfsense_backend/app/services/chat_comments_service.py +++ b/surfsense_backend/app/services/chat_comments_service.py @@ -281,8 +281,10 @@ async def create_comment( detail="You don't have permission to create comments in this search space", ) + thread = message.thread comment = ChatComment( message_id=message_id, + thread_id=thread.id, # Denormalized for efficient Electric subscriptions author_id=user.id, content=content, ) @@ -299,7 +301,6 @@ async def create_comment( user_names = await get_user_names_for_mentions(session, set(mentions_map.keys())) # Create notifications for mentioned users (excluding author) - thread = message.thread author_name = user.display_name or user.email content_preview = render_mentions(content, user_names) for mentioned_user_id, mention_id in mentions_map.items(): @@ -393,8 +394,10 @@ async def create_reply( detail="You don't have permission to create comments in this search space", ) + thread = parent_comment.message.thread reply = ChatComment( message_id=parent_comment.message_id, + thread_id=thread.id, # Denormalized for efficient Electric subscriptions parent_id=comment_id, author_id=user.id, content=content, @@ -412,7 +415,6 @@ async def create_reply( user_names = await get_user_names_for_mentions(session, set(mentions_map.keys())) # Create notifications for mentioned users (excluding author) - thread = parent_comment.message.thread author_name = user.display_name or user.email content_preview = render_mentions(content, user_names) for mentioned_user_id, mention_id in mentions_map.items(): diff --git a/surfsense_backend/app/services/chat_session_state_service.py b/surfsense_backend/app/services/chat_session_state_service.py new file mode 100644 index 000000000..d82fff3a7 --- /dev/null +++ b/surfsense_backend/app/services/chat_session_state_service.py @@ -0,0 +1,65 @@ +""" +Service layer for chat session state (live collaboration). +""" + +from datetime import UTC, datetime +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from app.db import ChatSessionState + + +async def get_session_state( + session: AsyncSession, + thread_id: int, +) -> ChatSessionState | None: + """Get the current session state for a thread.""" + result = await session.execute( + select(ChatSessionState) + .options(selectinload(ChatSessionState.ai_responding_to_user)) + .filter(ChatSessionState.thread_id == thread_id) + ) + return result.scalar_one_or_none() + + +async def set_ai_responding( + session: AsyncSession, + thread_id: int, + user_id: UUID, +) -> ChatSessionState: + """Mark AI as responding to a specific user. Uses upsert for atomicity.""" + now = datetime.now(UTC) + upsert_query = insert(ChatSessionState).values( + thread_id=thread_id, + ai_responding_to_user_id=user_id, + updated_at=now, + ) + upsert_query = upsert_query.on_conflict_do_update( + index_elements=["thread_id"], + set_={ + "ai_responding_to_user_id": user_id, + "updated_at": now, + }, + ) + await session.execute(upsert_query) + await session.commit() + + return await get_session_state(session, thread_id) + + +async def clear_ai_responding( + session: AsyncSession, + thread_id: int, +) -> ChatSessionState | None: + """Clear AI responding state when response is complete.""" + state = await get_session_state(session, thread_id) + if state: + state.ai_responding_to_user_id = None + state.updated_at = datetime.now(UTC) + await session.commit() + await session.refresh(state) + return state diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 7d2cf4172..31229a59b 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -11,6 +11,7 @@ Supports loading LLM configurations from: import json from collections.abc import AsyncGenerator +from uuid import UUID from langchain_core.messages import HumanMessage from sqlalchemy.ext.asyncio import AsyncSession @@ -27,6 +28,10 @@ from app.agents.new_chat.llm_config import ( ) from app.db import Document, SurfsenseDocsDocument from app.schemas.new_chat import ChatAttachment +from app.services.chat_session_state_service import ( + clear_ai_responding, + set_ai_responding, +) from app.services.connector_service import ConnectorService from app.services.new_streaming_service import VercelStreamingService @@ -167,9 +172,8 @@ async def stream_new_chat( search_space_id: The search space ID chat_id: The chat ID (used as LangGraph thread_id for memory) session: The database session - user_id: The current user's UUID string (for memory tools) + user_id: The current user's UUID string (for memory tools and session state) llm_config_id: The LLM configuration ID (default: -1 for first global config) - messages: Optional chat history from frontend (list of ChatMessage) attachments: Optional attachments with extracted content mentioned_document_ids: Optional list of document IDs mentioned with @ in the chat mentioned_surfsense_doc_ids: Optional list of SurfSense doc IDs mentioned with @ in the chat @@ -183,6 +187,9 @@ async def stream_new_chat( current_text_id: str | None = None try: + # Mark AI as responding to this user for live collaboration + if user_id: + await set_ai_responding(session, chat_id, UUID(user_id)) # Load LLM config - supports both YAML (negative IDs) and database (positive IDs) agent_config: AgentConfig | None = None @@ -1147,3 +1154,7 @@ async def stream_new_chat( yield streaming_service.format_finish_step() yield streaming_service.format_finish() yield streaming_service.format_done() + + finally: + # Clear AI responding state for live collaboration + await clear_ai_responding(session, chat_id) diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 88c532cf2..c61dad660 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -24,6 +24,7 @@ import { // extractWriteTodosFromContent, hydratePlanStateAtom, } from "@/atoms/chat/plan-state.atom"; +import { membersAtom } from "@/atoms/members/members-query.atoms"; import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { Thread } from "@/components/assistant-ui/thread"; import { ChatHeader } from "@/components/new-chat/chat-header"; @@ -50,6 +51,8 @@ import { type MessageRecord, type ThreadRecord, } from "@/lib/chat/thread-persistence"; +import { useChatSessionStateSync } from "@/hooks/use-chat-session-state"; +import { useMessagesElectric } from "@/hooks/use-messages-electric"; import { trackChatCreated, trackChatError, @@ -258,6 +261,44 @@ export default function NewChatPage() { // Get current user for author info in shared chats const { data: currentUser } = useAtomValue(currentUserAtom); + // Live collaboration: sync session state and messages via Electric SQL + useChatSessionStateSync(threadId); + const { data: membersData } = useAtomValue(membersAtom); + + const handleElectricMessagesUpdate = useCallback( + (electricMessages: { id: number; thread_id: number; role: string; content: unknown; author_id: string | null; created_at: string }[]) => { + if (isRunning) { + return; + } + + setMessages((prev) => { + if (electricMessages.length < prev.length) { + return prev; + } + + return electricMessages.map((msg) => { + const member = msg.author_id + ? membersData?.find((m) => m.user_id === msg.author_id) + : null; + + return convertToThreadMessage({ + id: msg.id, + thread_id: msg.thread_id, + role: msg.role.toLowerCase() as "user" | "assistant" | "system", + content: msg.content, + author_id: msg.author_id, + created_at: msg.created_at, + author_display_name: member?.user_display_name ?? null, + author_avatar_url: member?.user_avatar_url ?? null, + }); + }); + }); + }, + [isRunning, membersData] + ); + + useMessagesElectric(threadId, handleElectricMessagesUpdate); + // Create the attachment adapter for file processing const attachmentAdapter = useMemo(() => createAttachmentAdapter(), []); @@ -587,8 +628,6 @@ export default function NewChatPage() { content: persistContent, }) .then(() => { - // For new threads, the backend updates the title from the first user message - // Invalidate threads query so sidebar shows the updated title in real-time if (isNewThread) { queryClient.invalidateQueries({ queryKey: ["threads", String(searchSpaceId)] }); } diff --git a/surfsense_web/atoms/chat/chat-session-state.atom.ts b/surfsense_web/atoms/chat/chat-session-state.atom.ts new file mode 100644 index 000000000..4d83a45d4 --- /dev/null +++ b/surfsense_web/atoms/chat/chat-session-state.atom.ts @@ -0,0 +1,15 @@ +"use client"; + +import { atom } from "jotai"; + +export interface ChatSessionStateData { + threadId: number; + isAiResponding: boolean; + respondingToUserId: string | null; +} + +/** + * Global chat session state atom. + * Updated by useChatSessionStateSync hook, read anywhere. + */ +export const chatSessionStateAtom = atom(null); diff --git a/surfsense_web/atoms/members/members-query.atoms.ts b/surfsense_web/atoms/members/members-query.atoms.ts index 8ed56ef0c..f486dc02b 100644 --- a/surfsense_web/atoms/members/members-query.atoms.ts +++ b/surfsense_web/atoms/members/members-query.atoms.ts @@ -9,7 +9,7 @@ export const membersAtom = atomWithQuery((get) => { return { queryKey: cacheKeys.members.all(searchSpaceId?.toString() ?? ""), enabled: !!searchSpaceId, - staleTime: 5 * 60 * 1000, // 5 minutes + staleTime: 3 * 1000, // 3 seconds - short staleness for live collaboration queryFn: async () => { if (!searchSpaceId) { return []; diff --git a/surfsense_web/components/assistant-ui/chat-session-status.tsx b/surfsense_web/components/assistant-ui/chat-session-status.tsx new file mode 100644 index 000000000..62f7c33ce --- /dev/null +++ b/surfsense_web/components/assistant-ui/chat-session-status.tsx @@ -0,0 +1,49 @@ +"use client"; + +import type { FC } from "react"; +import { Loader2 } from "lucide-react"; +import { cn } from "@/lib/utils"; + +interface ChatSessionStatusProps { + isAiResponding: boolean; + respondingToUserId: string | null; + currentUserId: string | null; + members: Array<{ + user_id: string; + user_display_name?: string | null; + user_email?: string | null; + }>; + className?: string; +} + +export const ChatSessionStatus: FC = ({ + isAiResponding, + respondingToUserId, + currentUserId, + members, + className, +}) => { + if (!isAiResponding || !respondingToUserId) { + return null; + } + + if (respondingToUserId === currentUserId) { + return null; + } + + const respondingUser = members.find((m) => m.user_id === respondingToUserId); + const displayName = respondingUser?.user_display_name || respondingUser?.user_email || "another user"; + + return ( +
+ + Currently responding to {displayName} +
+ ); +}; diff --git a/surfsense_web/components/assistant-ui/thread.tsx b/surfsense_web/components/assistant-ui/thread.tsx index eaf30fc96..2127d4d1d 100644 --- a/surfsense_web/components/assistant-ui/thread.tsx +++ b/surfsense_web/components/assistant-ui/thread.tsx @@ -31,6 +31,7 @@ import { mentionedDocumentIdsAtom, mentionedDocumentsAtom, } from "@/atoms/chat/mentioned-documents.atom"; +import { membersAtom } from "@/atoms/members/members-query.atoms"; import { globalNewLLMConfigsAtom, llmPreferencesAtom, @@ -39,6 +40,7 @@ import { import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { AssistantMessage } from "@/components/assistant-ui/assistant-message"; import { ComposerAddAttachment, ComposerAttachments } from "@/components/assistant-ui/attachment"; +import { ChatSessionStatus } from "@/components/assistant-ui/chat-session-status"; import { ConnectorIndicator } from "@/components/assistant-ui/connector-popup"; import { InlineMentionEditor, @@ -59,6 +61,8 @@ import { import type { ThinkingStep } from "@/components/tool-ui/deepagent-thinking"; import { Button } from "@/components/ui/button"; import type { Document } from "@/contracts/types/document.types"; +import { chatSessionStateAtom } from "@/atoms/chat/chat-session-state.atom"; +import { useCommentsElectric } from "@/hooks/use-comments-electric"; import { cn } from "@/lib/utils"; interface ThreadProps { @@ -86,6 +90,7 @@ const ThreadContent: FC<{ header?: React.ReactNode }> = ({ header }) => { > { const editorRef = useRef(null); const editorContainerRef = useRef(null); const documentPickerRef = useRef(null); - const { search_space_id } = useParams(); + const { search_space_id, chat_id } = useParams(); const setMentionedDocumentIds = useSetAtom(mentionedDocumentIdsAtom); const composerRuntime = useComposerRuntime(); const hasAutoFocusedRef = useRef(false); @@ -223,6 +228,23 @@ const Composer: FC = () => { const isThreadEmpty = useAssistantState(({ thread }) => thread.isEmpty); const isThreadRunning = useAssistantState(({ thread }) => thread.isRunning); + // Live collaboration state + const { data: currentUser } = useAtomValue(currentUserAtom); + const { data: members } = useAtomValue(membersAtom); + const threadId = useMemo(() => { + if (Array.isArray(chat_id) && chat_id.length > 0) { + return Number.parseInt(chat_id[0], 10) || null; + } + return typeof chat_id === "string" ? Number.parseInt(chat_id, 10) || null : null; + }, [chat_id]); + const sessionState = useAtomValue(chatSessionStateAtom); + const isAiResponding = sessionState?.isAiResponding ?? false; + const respondingToUserId = sessionState?.respondingToUserId ?? null; + const isBlockedByOtherUser = isAiResponding && respondingToUserId !== currentUser?.id; + + // Sync comments for the entire thread via Electric SQL (one subscription per thread) + useCommentsElectric(threadId); + // Auto-focus editor on new chat page after mount useEffect(() => { if (isThreadEmpty && !hasAutoFocusedRef.current && editorRef.current) { @@ -298,9 +320,9 @@ const Composer: FC = () => { [showDocumentPopover] ); - // Submit message (blocked during streaming or when document picker is open) + // Submit message (blocked during streaming, document picker open, or AI responding to another user) const handleSubmit = useCallback(() => { - if (isThreadRunning) { + if (isThreadRunning || isBlockedByOtherUser) { return; } if (!showDocumentPopover) { @@ -315,6 +337,7 @@ const Composer: FC = () => { }, [ showDocumentPopover, isThreadRunning, + isBlockedByOtherUser, composerRuntime, setMentionedDocuments, setMentionedDocumentIds, @@ -374,7 +397,13 @@ const Composer: FC = () => { ); return ( - + + {/* Inline editor with @mention support */} @@ -417,13 +446,17 @@ const Composer: FC = () => { />, document.body )} - + ); }; -const ComposerAction: FC = () => { +interface ComposerActionProps { + isBlockedByOtherUser?: boolean; +} + +const ComposerAction: FC = ({ isBlockedByOtherUser = false }) => { // Check if any attachments are still being processed (running AND progress < 100) // When progress is 100, processing is done but waiting for send() const hasProcessingAttachments = useAssistantState(({ composer }) => @@ -458,7 +491,8 @@ const ComposerAction: FC = () => { return userConfigs?.some((c) => c.id === agentLlmId) ?? false; }, [preferences, globalConfigs, userConfigs]); - const isSendDisabled = hasProcessingAttachments || isComposerEmpty || !hasModelConfigured; + const isSendDisabled = + hasProcessingAttachments || isComposerEmpty || !hasModelConfigured || isBlockedByOtherUser; return (
@@ -487,13 +521,15 @@ const ComposerAction: FC = () => { ; export type Author = z.infer; export type CommentReply = z.infer; export type Comment = z.infer; diff --git a/surfsense_web/contracts/types/chat-messages.types.ts b/surfsense_web/contracts/types/chat-messages.types.ts new file mode 100644 index 000000000..78bf7b043 --- /dev/null +++ b/surfsense_web/contracts/types/chat-messages.types.ts @@ -0,0 +1,15 @@ +import { z } from "zod"; + +/** + * Raw message from database (Electric SQL sync) + */ +export const rawMessage = z.object({ + id: z.number(), + thread_id: z.number(), + role: z.string(), + content: z.unknown(), + author_id: z.string().nullable(), + created_at: z.string(), +}); + +export type RawMessage = z.infer; diff --git a/surfsense_web/contracts/types/chat-session-state.types.ts b/surfsense_web/contracts/types/chat-session-state.types.ts new file mode 100644 index 000000000..cf73859e6 --- /dev/null +++ b/surfsense_web/contracts/types/chat-session-state.types.ts @@ -0,0 +1,24 @@ +import { z } from "zod"; + +/** + * Chat session state for live collaboration. + * Tracks which user the AI is currently responding to. + */ +export const chatSessionState = z.object({ + id: z.number(), + thread_id: z.number(), + ai_responding_to_user_id: z.string().uuid().nullable(), + updated_at: z.string(), +}); + +/** + * User currently being responded to by the AI. + */ +export const respondingUser = z.object({ + id: z.string().uuid(), + display_name: z.string().nullable(), + email: z.string(), +}); + +export type ChatSessionState = z.infer; +export type RespondingUser = z.infer; diff --git a/surfsense_web/hooks/use-chat-session-state.ts b/surfsense_web/hooks/use-chat-session-state.ts new file mode 100644 index 000000000..f3bdd7722 --- /dev/null +++ b/surfsense_web/hooks/use-chat-session-state.ts @@ -0,0 +1,39 @@ +"use client"; + +import { useShape } from "@electric-sql/react"; +import { useSetAtom } from "jotai"; +import { useEffect } from "react"; +import { chatSessionStateAtom } from "@/atoms/chat/chat-session-state.atom"; +import type { ChatSessionState } from "@/contracts/types/chat-session-state.types"; + +const ELECTRIC_URL = process.env.NEXT_PUBLIC_ELECTRIC_URL || "http://localhost:5133"; + +/** + * Syncs chat session state for a thread via Electric SQL. + * Call once per thread (in page.tsx). Updates global atom. + */ +export function useChatSessionStateSync(threadId: number | null) { + const setSessionState = useSetAtom(chatSessionStateAtom); + + const { data } = useShape({ + url: `${ELECTRIC_URL}/v1/shape`, + params: { + table: "chat_session_state", + where: `thread_id = ${threadId ?? -1}`, + }, + }); + + useEffect(() => { + if (!threadId) { + setSessionState(null); + return; + } + + const row = data?.[0]; + setSessionState({ + threadId, + isAiResponding: !!row?.ai_responding_to_user_id, + respondingToUserId: row?.ai_responding_to_user_id ?? null, + }); + }, [threadId, data, setSessionState]); +} diff --git a/surfsense_web/hooks/use-comments-electric.ts b/surfsense_web/hooks/use-comments-electric.ts new file mode 100644 index 000000000..83a019ef3 --- /dev/null +++ b/surfsense_web/hooks/use-comments-electric.ts @@ -0,0 +1,361 @@ +"use client"; + +import { useQueryClient } from "@tanstack/react-query"; +import { useAtomValue } from "jotai"; +import { useCallback, useEffect, useMemo, useRef } from "react"; +import { membersAtom, myAccessAtom } from "@/atoms/members/members-query.atoms"; +import { currentUserAtom } from "@/atoms/user/user-query.atoms"; +import type { + Comment, + CommentReply, + Author, +} from "@/contracts/types/chat-comments.types"; +import type { Membership } from "@/contracts/types/members.types"; +import type { SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; +import { cacheKeys } from "@/lib/query-client/cache-keys"; + +// Raw comment from PGlite local database +interface RawCommentRow { + id: number; + message_id: number; + thread_id: number; + parent_id: number | null; + author_id: string | null; + content: string; + created_at: string; + updated_at: string; +} + +// Regex pattern to match @[uuid] mentions (matches backend MENTION_PATTERN) +const MENTION_PATTERN = /@\[([0-9a-fA-F-]{36})\]/g; + +type MemberInfo = Pick; + +/** + * Render mentions in content by replacing @[uuid] with @{DisplayName} + */ +function renderMentions(content: string, memberMap: Map): string { + return content.replace(MENTION_PATTERN, (match, uuid) => { + const member = memberMap.get(uuid); + if (member?.user_display_name) { + return `@{${member.user_display_name}}`; + } + return match; + }); +} + +/** + * Build member lookup map from membersData + */ +function buildMemberMap(membersData: Membership[] | undefined): Map { + const map = new Map(); + if (membersData) { + for (const m of membersData) { + map.set(m.user_id, { + user_display_name: m.user_display_name, + user_avatar_url: m.user_avatar_url, + user_email: m.user_email, + }); + } + } + return map; +} + +/** + * Build author object from member data + */ +function buildAuthor(authorId: string | null, memberMap: Map): Author | null { + if (!authorId) return null; + const m = memberMap.get(authorId); + if (!m) return null; + return { + id: authorId, + display_name: m.user_display_name ?? null, + avatar_url: m.user_avatar_url ?? null, + email: m.user_email ?? "", + }; +} + +/** + * Check if a comment has been edited by comparing timestamps. + * Uses a small threshold to handle precision differences. + */ +function isEdited(createdAt: string, updatedAt: string): boolean { + const created = new Date(createdAt).getTime(); + const updated = new Date(updatedAt).getTime(); + // Consider edited if updated_at is more than 1 second after created_at + return updated - created > 1000; +} + +/** + * Transform raw comment to CommentReply + */ +function transformReply( + raw: RawCommentRow, + memberMap: Map, + currentUserId: string | undefined, + isOwner: boolean +): CommentReply { + return { + id: raw.id, + content: raw.content, + content_rendered: renderMentions(raw.content, memberMap), + author: buildAuthor(raw.author_id, memberMap), + created_at: raw.created_at, + updated_at: raw.updated_at, + is_edited: isEdited(raw.created_at, raw.updated_at), + can_edit: currentUserId === raw.author_id, + can_delete: currentUserId === raw.author_id || isOwner, + }; +} + +/** + * Transform raw comments to Comment with replies + */ +function transformComments( + rawComments: RawCommentRow[], + memberMap: Map, + currentUserId: string | undefined, + isOwner: boolean +): Map { + // Group comments by message_id + const byMessage = new Map }>(); + + for (const raw of rawComments) { + if (!byMessage.has(raw.message_id)) { + byMessage.set(raw.message_id, { topLevel: [], replies: new Map() }); + } + const group = byMessage.get(raw.message_id)!; + + if (raw.parent_id === null) { + group.topLevel.push(raw); + } else { + if (!group.replies.has(raw.parent_id)) { + group.replies.set(raw.parent_id, []); + } + group.replies.get(raw.parent_id)!.push(raw); + } + } + + // Transform to Comment objects grouped by message_id + const result = new Map(); + + for (const [messageId, group] of byMessage) { + const comments: Comment[] = group.topLevel.map((raw) => { + const replies = (group.replies.get(raw.id) || []) + .sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()) + .map((r) => transformReply(r, memberMap, currentUserId, isOwner)); + + return { + id: raw.id, + message_id: raw.message_id, + content: raw.content, + content_rendered: renderMentions(raw.content, memberMap), + author: buildAuthor(raw.author_id, memberMap), + created_at: raw.created_at, + updated_at: raw.updated_at, + is_edited: isEdited(raw.created_at, raw.updated_at), + can_edit: currentUserId === raw.author_id, + can_delete: currentUserId === raw.author_id || isOwner, + reply_count: replies.length, + replies, + }; + }); + + // Sort by created_at + comments.sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()); + result.set(messageId, comments); + } + + return result; +} + +/** + * Hook for syncing comments with Electric SQL real-time sync. + * + * Syncs ALL comments for a thread in ONE subscription, then updates + * React Query cache for each message. This avoids N subscriptions for N messages. + * + * @param threadId - The thread ID to sync comments for + */ +export function useCommentsElectric(threadId: number | null) { + const electricClient = useElectricClient(); + const queryClient = useQueryClient(); + + const { data: membersData } = useAtomValue(membersAtom); + const { data: currentUser } = useAtomValue(currentUserAtom); + const { data: myAccess } = useAtomValue(myAccessAtom); + + const memberMap = useMemo(() => buildMemberMap(membersData), [membersData]); + const currentUserId = currentUser?.id; + const isOwner = myAccess?.is_owner ?? false; + + // Use refs for values needed in live query callback to avoid stale closures + const memberMapRef = useRef(memberMap); + const currentUserIdRef = useRef(currentUserId); + const isOwnerRef = useRef(isOwner); + const queryClientRef = useRef(queryClient); + + // Keep refs updated + useEffect(() => { + memberMapRef.current = memberMap; + currentUserIdRef.current = currentUserId; + isOwnerRef.current = isOwner; + queryClientRef.current = queryClient; + }, [memberMap, currentUserId, isOwner, queryClient]); + + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); + const syncKeyRef = useRef(null); + + // Stable callback that uses refs for fresh values + const updateReactQueryCache = useCallback((rows: RawCommentRow[]) => { + const commentsByMessage = transformComments( + rows, + memberMapRef.current, + currentUserIdRef.current, + isOwnerRef.current + ); + + for (const [messageId, comments] of commentsByMessage) { + const cacheKey = cacheKeys.comments.byMessage(messageId); + queryClientRef.current.setQueryData(cacheKey, { + comments, + total_count: comments.length, + }); + } + }, []); + + useEffect(() => { + if (!threadId || !electricClient) { + return; + } + + const syncKey = `comments_${threadId}`; + if (syncKeyRef.current === syncKey) { + return; + } + + // Capture in local variable for use in async functions + const client = electricClient; + + let mounted = true; + syncKeyRef.current = syncKey; + + async function startSync() { + try { + const handle = await client.syncShape({ + table: "chat_comments", + where: `thread_id = ${threadId}`, + columns: ["id", "message_id", "thread_id", "parent_id", "author_id", "content", "created_at", "updated_at"], + primaryKey: ["id"], + }); + + if (!handle.isUpToDate && handle.initialSyncPromise) { + try { + await Promise.race([ + handle.initialSyncPromise, + new Promise((resolve) => setTimeout(resolve, 3000)), + ]); + } catch { + // Initial sync timeout - continue anyway + } + } + + if (!mounted) { + handle.unsubscribe(); + return; + } + + syncHandleRef.current = handle; + + // Fetch initial comments and update cache + await fetchAndUpdateCache(); + + // Set up live query for real-time updates + await setupLiveQuery(); + } catch { + // Sync failed - will retry on next mount + } + } + + async function fetchAndUpdateCache() { + try { + const result = await client.db.query( + `SELECT id, message_id, thread_id, parent_id, author_id, content, created_at, updated_at + FROM chat_comments + WHERE thread_id = $1 + ORDER BY created_at ASC`, + [threadId] + ); + + if (mounted && result.rows) { + updateReactQueryCache(result.rows); + } + } catch { + // Query failed - data will be fetched from API + } + } + + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = client.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + const liveQuery = await db.live.query( + `SELECT id, message_id, thread_id, parent_id, author_id, content, created_at, updated_at + FROM chat_comments + WHERE thread_id = $1 + ORDER BY created_at ASC`, + [threadId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + // Set initial results + if (liveQuery.initialResults?.rows) { + updateReactQueryCache(liveQuery.initialResults.rows); + } else if (liveQuery.rows) { + updateReactQueryCache(liveQuery.rows); + } + + // Subscribe to changes + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: RawCommentRow[] }) => { + if (mounted && result.rows) { + updateReactQueryCache(result.rows); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch { + // Live query setup failed - will use initial fetch only + } + } + + startSync(); + + return () => { + mounted = false; + syncKeyRef.current = null; + + if (syncHandleRef.current) { + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; + } + if (liveQueryRef.current) { + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; + } + }; + }, [threadId, electricClient, updateReactQueryCache]); +} diff --git a/surfsense_web/hooks/use-messages-electric.ts b/surfsense_web/hooks/use-messages-electric.ts new file mode 100644 index 000000000..e8c82e92b --- /dev/null +++ b/surfsense_web/hooks/use-messages-electric.ts @@ -0,0 +1,154 @@ +"use client"; + +import { useCallback, useEffect, useRef } from "react"; +import type { RawMessage } from "@/contracts/types/chat-messages.types"; +import type { SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; + +/** + * Syncs chat messages for a thread via Electric SQL. + * Calls onMessagesUpdate when messages change. + */ +export function useMessagesElectric( + threadId: number | null, + onMessagesUpdate: (messages: RawMessage[]) => void +) { + const electricClient = useElectricClient(); + + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); + const syncKeyRef = useRef(null); + const onMessagesUpdateRef = useRef(onMessagesUpdate); + + useEffect(() => { + onMessagesUpdateRef.current = onMessagesUpdate; + }, [onMessagesUpdate]); + + const handleMessagesUpdate = useCallback((rows: RawMessage[]) => { + onMessagesUpdateRef.current(rows); + }, []); + + useEffect(() => { + if (!threadId || !electricClient) { + return; + } + + const syncKey = `messages_${threadId}`; + if (syncKeyRef.current === syncKey) { + return; + } + + const client = electricClient; + let mounted = true; + syncKeyRef.current = syncKey; + + async function startSync() { + try { + const handle = await client.syncShape({ + table: "new_chat_messages", + where: `thread_id = ${threadId}`, + columns: ["id", "thread_id", "role", "content", "author_id", "created_at"], + primaryKey: ["id"], + }); + + if (!handle.isUpToDate && handle.initialSyncPromise) { + try { + await Promise.race([ + handle.initialSyncPromise, + new Promise((resolve) => setTimeout(resolve, 3000)), + ]); + } catch { + // Timeout + } + } + + if (!mounted) { + handle.unsubscribe(); + return; + } + + syncHandleRef.current = handle; + await fetchMessages(); + await setupLiveQuery(); + } catch { + // Sync failed + } + } + + async function fetchMessages() { + try { + const result = await client.db.query( + `SELECT id, thread_id, role, content, author_id, created_at + FROM new_chat_messages + WHERE thread_id = $1 + ORDER BY created_at ASC`, + [threadId] + ); + + if (mounted && result.rows) { + handleMessagesUpdate(result.rows); + } + } catch { + // Query failed + } + } + + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = client.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + const liveQuery = await db.live.query( + `SELECT id, thread_id, role, content, author_id, created_at + FROM new_chat_messages + WHERE thread_id = $1 + ORDER BY created_at ASC`, + [threadId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + if (liveQuery.initialResults?.rows) { + handleMessagesUpdate(liveQuery.initialResults.rows); + } else if (liveQuery.rows) { + handleMessagesUpdate(liveQuery.rows); + } + + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: RawMessage[] }) => { + if (mounted && result.rows) { + handleMessagesUpdate(result.rows); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch { + // Live query failed + } + } + + startSync(); + + return () => { + mounted = false; + syncKeyRef.current = null; + + if (syncHandleRef.current) { + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; + } + if (liveQueryRef.current) { + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; + } + }; + }, [threadId, electricClient, handleMessagesUpdate]); +} diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 6a9d87b88..148da58ec 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -229,7 +229,6 @@ export async function initElectric(userId: string): Promise { CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type); `); - // Create the chat_comment_mentions table schema in PGlite await db.exec(` CREATE TABLE IF NOT EXISTS chat_comment_mentions ( id INTEGER PRIMARY KEY, @@ -242,6 +241,39 @@ export async function initElectric(userId: string): Promise { CREATE INDEX IF NOT EXISTS idx_chat_comment_mentions_comment_id ON chat_comment_mentions(comment_id); `); + // Create chat_comments table for live comment sync + await db.exec(` + CREATE TABLE IF NOT EXISTS chat_comments ( + id INTEGER PRIMARY KEY, + message_id INTEGER NOT NULL, + thread_id INTEGER NOT NULL, + parent_id INTEGER, + author_id TEXT, + content TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_chat_comments_thread_id ON chat_comments(thread_id); + CREATE INDEX IF NOT EXISTS idx_chat_comments_message_id ON chat_comments(message_id); + CREATE INDEX IF NOT EXISTS idx_chat_comments_parent_id ON chat_comments(parent_id); + `); + + // Create new_chat_messages table for live message sync + await db.exec(` + CREATE TABLE IF NOT EXISTS new_chat_messages ( + id INTEGER PRIMARY KEY, + thread_id INTEGER NOT NULL, + role TEXT NOT NULL, + content JSONB NOT NULL, + author_id TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_new_chat_messages_thread_id ON new_chat_messages(thread_id); + CREATE INDEX IF NOT EXISTS idx_new_chat_messages_created_at ON new_chat_messages(created_at); + `); + const electricUrl = getElectricUrl(); // STEP 4: Create the client wrapper