diff --git a/surfsense_backend/alembic/versions/144_add_gateway_tables.py b/surfsense_backend/alembic/versions/144_add_gateway_tables.py new file mode 100644 index 000000000..011333d69 --- /dev/null +++ b/surfsense_backend/alembic/versions/144_add_gateway_tables.py @@ -0,0 +1,623 @@ +"""add gateway tables for Telegram messaging gateway + +Revision ID: 144 +Revises: 143 +Create Date: 2026-05-27 + +Adds the lean v6 gateway schema: + +* gateway_platform_accounts +* gateway_conversation_bindings +* gateway_inbound_events + +The gateway stores Telegram-originated conversations in the existing chat +tables but keeps them out of UI replication. This migration adds ``source`` to +``new_chat_messages`` as a denormalized Zero publication boundary and publishes +only ``source = 'web'`` rows. Gateway control-plane tables are served through +REST in v1, so they are intentionally not added to ``zero_publication``. +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision: str = "144" +down_revision: str | None = "143" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +PUBLICATION_NAME = "zero_publication" + +DOCUMENT_COLS = [ + "id", + "title", + "document_type", + "search_space_id", + "folder_id", + "created_by_id", + "status", + "created_at", + "updated_at", +] + +USER_COLS = [ + "id", + "pages_limit", + "pages_used", + "premium_credit_micros_limit", + "premium_credit_micros_used", +] + +def _has_zero_version(conn, table: str) -> bool: + return ( + conn.execute( + sa.text( + "SELECT 1 FROM information_schema.columns " + "WHERE table_name = :tbl AND column_name = '_0_version'" + ), + {"tbl": table}, + ).fetchone() + is not None + ) + + +def _cols(columns: list[str]) -> str: + return ", ".join(columns) + + +def _table_exists(conn, table: str) -> bool: + return ( + conn.execute( + sa.text( + "SELECT 1 FROM information_schema.tables " + "WHERE table_schema = current_schema() AND table_name = :tbl" + ), + {"tbl": table}, + ).fetchone() + is not None + ) + + +def _column_exists(conn, table: str, column: str) -> bool: + return ( + conn.execute( + sa.text( + "SELECT 1 FROM information_schema.columns " + "WHERE table_schema = current_schema() " + "AND table_name = :tbl AND column_name = :col" + ), + {"tbl": table, "col": column}, + ).fetchone() + is not None + ) + + +def _index_exists(conn, index_name: str) -> bool: + return ( + conn.execute( + sa.text( + "SELECT 1 FROM pg_indexes " + "WHERE schemaname = current_schema() AND indexname = :name" + ), + {"name": index_name}, + ).fetchone() + is not None + ) + + +def _constraint_exists(conn, table: str, constraint_name: str) -> bool: + return ( + conn.execute( + sa.text( + "SELECT 1 FROM information_schema.table_constraints " + "WHERE table_schema = current_schema() " + "AND table_name = :tbl AND constraint_name = :name" + ), + {"tbl": table, "name": constraint_name}, + ).fetchone() + is not None + ) + + +def _drop_index_if_exists(index_name: str, table_name: str) -> None: + if _index_exists(op.get_bind(), index_name): + op.drop_index(index_name, table_name=table_name) + + +def _drop_column_if_exists(table_name: str, column_name: str) -> None: + if _column_exists(op.get_bind(), table_name, column_name): + op.drop_column(table_name, column_name) + + +def _build_set_table_ddl( + *, documents_has_zero_ver: bool, user_has_zero_ver: bool +) -> str: + doc_cols = DOCUMENT_COLS + (['"_0_version"'] if documents_has_zero_ver else []) + user_cols = USER_COLS + (['"_0_version"'] if user_has_zero_ver else []) + + return ( + f"ALTER PUBLICATION {PUBLICATION_NAME} SET TABLE " + f"notifications, " + f"documents ({_cols(doc_cols)}), " + f"folders, " + f"search_source_connectors, " + f"new_chat_messages WHERE (source = 'web'), " + f"chat_comments, " + f"chat_session_state, " + f'"user" ({_cols(user_cols)})' + ) + + +def _create_enum(name: str, values: tuple[str, ...]) -> postgresql.ENUM: + enum = postgresql.ENUM(*values, name=name) + enum.create(op.get_bind(), checkfirst=True) + return postgresql.ENUM(*values, name=name, create_type=False) + + +def upgrade() -> None: + conn = op.get_bind() + gateway_platform_enum = _create_enum( + "gateway_platform", ("telegram", "whatsapp", "signal") + ) + gateway_account_mode_enum = _create_enum( + "gateway_account_mode", ("cloud_shared", "self_host_byo") + ) + gateway_health_status_enum = _create_enum( + "gateway_health_status", ("unknown", "ok", "failing") + ) + gateway_binding_state_enum = _create_enum( + "gateway_binding_state", ("pending", "bound", "revoked", "suspended") + ) + gateway_peer_kind_enum = _create_enum( + "gateway_peer_kind", ("direct", "group", "channel", "unknown") + ) + gateway_session_scope_enum = _create_enum( + "gateway_session_scope", + ("per_binding", "per_user_search_space", "ephemeral"), + ) + gateway_dm_policy_enum = _create_enum("gateway_dm_policy", ("enabled", "disabled")) + gateway_group_policy_enum = _create_enum( + "gateway_group_policy", ("disabled", "allowlist", "mention_required") + ) + gateway_event_kind_enum = _create_enum( + "gateway_event_kind", ("message", "edited_message", "callback_query", "other") + ) + gateway_event_status_enum = _create_enum( + "gateway_event_status", + ("received", "processing", "processed", "ignored", "failed"), + ) + + if not _table_exists(conn, "gateway_platform_accounts"): + op.create_table( + "gateway_platform_accounts", + sa.Column("id", sa.BigInteger(), primary_key=True), + sa.Column("platform", gateway_platform_enum, nullable=False), + sa.Column("mode", gateway_account_mode_enum, nullable=False), + sa.Column("owner_user_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("owner_search_space_id", sa.Integer(), nullable=True), + sa.Column("is_system_account", sa.Boolean(), nullable=False, server_default="false"), + sa.Column("encrypted_credentials", sa.Text(), nullable=True), + sa.Column( + "account_metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + sa.Column( + "cursor_state", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + sa.Column( + "health_status", + gateway_health_status_enum, + nullable=False, + server_default="unknown", + ), + sa.Column("last_health_check_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("suspended_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("suspended_reason", sa.Text(), nullable=True), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("(now() AT TIME ZONE 'utc')"), + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("(now() AT TIME ZONE 'utc')"), + ), + sa.CheckConstraint( + "(is_system_account = true AND owner_user_id IS NULL) OR " + "(is_system_account = false AND owner_user_id IS NOT NULL)", + name="ck_gateway_accounts_owner_shape", + ), + sa.ForeignKeyConstraint(["owner_user_id"], ["user.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint( + ["owner_search_space_id"], ["searchspaces.id"], ondelete="CASCADE" + ), + ) + op.create_index( + "uq_gateway_accounts_owner_platform", + "gateway_platform_accounts", + ["owner_user_id", "platform"], + unique=True, + postgresql_where=sa.text("is_system_account = false"), + if_not_exists=True, + ) + op.create_index( + "uq_gateway_accounts_system_platform", + "gateway_platform_accounts", + ["platform"], + unique=True, + postgresql_where=sa.text("is_system_account = true"), + if_not_exists=True, + ) + + if not _table_exists(conn, "gateway_conversation_bindings"): + op.create_table( + "gateway_conversation_bindings", + sa.Column("id", sa.BigInteger(), primary_key=True), + sa.Column("account_id", sa.BigInteger(), nullable=False), + sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("search_space_id", sa.Integer(), nullable=False), + sa.Column( + "state", + gateway_binding_state_enum, + nullable=False, + server_default="pending", + ), + sa.Column("pairing_code", sa.Text(), nullable=True), + sa.Column("pairing_code_expires_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("external_peer_id", sa.Text(), nullable=True), + sa.Column( + "external_peer_kind", + gateway_peer_kind_enum, + nullable=False, + server_default="unknown", + ), + sa.Column("external_thread_id", sa.Text(), nullable=True), + sa.Column("external_display_name", sa.Text(), nullable=True), + sa.Column("external_username", sa.Text(), nullable=True), + sa.Column("external_pii_hashes", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column( + "external_metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + sa.Column("active_thread_id", sa.Integer(), nullable=True), + sa.Column( + "session_scope", + gateway_session_scope_enum, + nullable=False, + server_default="per_binding", + ), + sa.Column( + "dm_policy", + gateway_dm_policy_enum, + nullable=False, + server_default="enabled", + ), + sa.Column( + "group_policy", + gateway_group_policy_enum, + nullable=False, + server_default="disabled", + ), + sa.Column("revoked_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("suspended_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("suspended_reason", sa.Text(), nullable=True), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("(now() AT TIME ZONE 'utc')"), + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("(now() AT TIME ZONE 'utc')"), + ), + sa.ForeignKeyConstraint( + ["account_id"], ["gateway_platform_accounts.id"], ondelete="CASCADE" + ), + sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["search_space_id"], ["searchspaces.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint( + ["active_thread_id"], ["new_chat_threads.id"], ondelete="SET NULL" + ), + ) + op.create_index( + "uq_gateway_bindings_account_peer_active", + "gateway_conversation_bindings", + ["account_id", "external_peer_id"], + unique=True, + postgresql_where=sa.text( + "state IN ('bound', 'suspended') AND external_peer_id IS NOT NULL" + ), + if_not_exists=True, + ) + op.create_index( + "uq_gateway_bindings_pairing_code_pending", + "gateway_conversation_bindings", + ["pairing_code"], + unique=True, + postgresql_where=sa.text("state = 'pending'"), + if_not_exists=True, + ) + op.create_index( + "ix_gateway_bindings_user_state", + "gateway_conversation_bindings", + ["user_id", "state"], + if_not_exists=True, + ) + op.create_index( + "ix_gateway_bindings_search_space_state", + "gateway_conversation_bindings", + ["search_space_id", "state"], + if_not_exists=True, + ) + + if not _table_exists(conn, "gateway_inbound_events"): + op.create_table( + "gateway_inbound_events", + sa.Column("id", sa.BigInteger(), primary_key=True), + sa.Column("account_id", sa.BigInteger(), nullable=False), + sa.Column("binding_id", sa.BigInteger(), nullable=True), + sa.Column("platform", gateway_platform_enum, nullable=False), + sa.Column("event_dedupe_key", sa.Text(), nullable=False), + sa.Column("external_event_id", sa.Text(), nullable=True), + sa.Column("external_message_id", sa.Text(), nullable=True), + sa.Column("event_kind", gateway_event_kind_enum, nullable=False), + sa.Column( + "raw_payload", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + ), + sa.Column( + "processing_metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + sa.Column( + "status", + gateway_event_status_enum, + nullable=False, + server_default="received", + ), + sa.Column("attempt_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("last_error", sa.Text(), nullable=True), + sa.Column( + "received_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("(now() AT TIME ZONE 'utc')"), + ), + sa.Column("processed_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("(now() AT TIME ZONE 'utc')"), + ), + sa.ForeignKeyConstraint( + ["account_id"], ["gateway_platform_accounts.id"], ondelete="CASCADE" + ), + sa.ForeignKeyConstraint( + ["binding_id"], ["gateway_conversation_bindings.id"], ondelete="SET NULL" + ), + sa.UniqueConstraint( + "account_id", + "event_dedupe_key", + name="uq_gateway_inbound_account_dedupe_key", + ), + ) + op.create_index( + "ix_gateway_inbound_status_received_at", + "gateway_inbound_events", + ["status", "received_at"], + if_not_exists=True, + ) + op.create_index( + "ix_gateway_inbound_binding_received_at", + "gateway_inbound_events", + ["binding_id", "received_at"], + if_not_exists=True, + ) + + if not _column_exists(conn, "new_chat_threads", "source"): + op.add_column( + "new_chat_threads", + sa.Column("source", sa.Text(), nullable=False, server_default="web"), + ) + op.alter_column("new_chat_threads", "source", type_=sa.Text()) + if not _column_exists(conn, "new_chat_threads", "binding_id"): + op.add_column( + "new_chat_threads", + sa.Column("binding_id", sa.BigInteger(), nullable=True), + ) + if not _constraint_exists( + conn, "new_chat_threads", "fk_new_chat_threads_gateway_binding_id" + ): + op.create_foreign_key( + "fk_new_chat_threads_gateway_binding_id", + "new_chat_threads", + "gateway_conversation_bindings", + ["binding_id"], + ["id"], + ondelete="SET NULL", + ) + op.create_index("ix_new_chat_threads_source", "new_chat_threads", ["source"], if_not_exists=True) + op.create_index( + "ix_new_chat_threads_binding_id", + "new_chat_threads", + ["binding_id"], + if_not_exists=True, + ) + + if not _column_exists(conn, "new_chat_messages", "source"): + op.add_column( + "new_chat_messages", + sa.Column("source", sa.Text(), nullable=False, server_default="web"), + ) + op.alter_column("new_chat_messages", "source", type_=sa.Text()) + if not _column_exists(conn, "new_chat_messages", "platform_metadata"): + op.add_column( + "new_chat_messages", + sa.Column("platform_metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + ) + op.create_index( + "ix_new_chat_messages_source", + "new_chat_messages", + ["source"], + if_not_exists=True, + ) + op.create_index( + "uq_new_chat_messages_inbound_platform", + "new_chat_messages", + [ + "thread_id", + sa.text("(platform_metadata->>'platform')"), + sa.text("(platform_metadata->>'external_message_id')"), + ], + unique=True, + postgresql_where=sa.text( + "platform_metadata IS NOT NULL " + "AND platform_metadata->>'direction' = 'inbound'" + ), + if_not_exists=True, + ) + op.execute("ALTER TABLE new_chat_messages REPLICA IDENTITY FULL") + + exists = conn.execute( + sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"), + {"name": PUBLICATION_NAME}, + ).fetchone() + if exists: + documents_has_zero_ver = _has_zero_version(conn, "documents") + user_has_zero_ver = _has_zero_version(conn, "user") + tx = conn.begin_nested() if conn.in_transaction() else conn.begin() + with tx: + conn.execute( + sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-144-gateway'") + ) + conn.execute( + sa.text( + _build_set_table_ddl( + documents_has_zero_ver=documents_has_zero_ver, + user_has_zero_ver=user_has_zero_ver, + ) + ) + ) + conn.execute( + sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-144-gateway'") + ) + + +def downgrade() -> None: + conn = op.get_bind() + exists = conn.execute( + sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"), + {"name": PUBLICATION_NAME}, + ).fetchone() + if exists: + documents_has_zero_ver = _has_zero_version(conn, "documents") + user_has_zero_ver = _has_zero_version(conn, "user") + # Restore the publication shape from migration 143. + doc_cols = DOCUMENT_COLS + (['"_0_version"'] if documents_has_zero_ver else []) + user_cols = USER_COLS + (['"_0_version"'] if user_has_zero_ver else []) + ddl = ( + f"ALTER PUBLICATION {PUBLICATION_NAME} SET TABLE " + f"notifications, " + f"documents ({_cols(doc_cols)}), " + f"folders, " + f"search_source_connectors, " + f"new_chat_messages, " + f"chat_comments, " + f"chat_session_state, " + f'"user" ({_cols(user_cols)})' + ) + tx = conn.begin_nested() if conn.in_transaction() else conn.begin() + with tx: + conn.execute( + sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-144-downgrade'") + ) + conn.execute(sa.text(ddl)) + conn.execute( + sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-144-downgrade'") + ) + + if _column_exists(conn, "new_chat_messages", "source"): + op.execute("ALTER TABLE new_chat_messages REPLICA IDENTITY DEFAULT") + _drop_index_if_exists("uq_new_chat_messages_inbound_platform", "new_chat_messages") + _drop_index_if_exists("ix_new_chat_messages_source", "new_chat_messages") + _drop_column_if_exists("new_chat_messages", "platform_metadata") + _drop_column_if_exists("new_chat_messages", "source") + + _drop_index_if_exists("ix_new_chat_threads_binding_id", "new_chat_threads") + _drop_index_if_exists("ix_new_chat_threads_source", "new_chat_threads") + if _constraint_exists( + conn, "new_chat_threads", "fk_new_chat_threads_gateway_binding_id" + ): + op.drop_constraint( + "fk_new_chat_threads_gateway_binding_id", + "new_chat_threads", + type_="foreignkey", + ) + _drop_column_if_exists("new_chat_threads", "binding_id") + _drop_column_if_exists("new_chat_threads", "source") + + _drop_index_if_exists( + "ix_gateway_inbound_binding_received_at", "gateway_inbound_events" + ) + _drop_index_if_exists("ix_gateway_inbound_status_received_at", "gateway_inbound_events") + if _table_exists(conn, "gateway_inbound_events"): + op.drop_table("gateway_inbound_events") + + _drop_index_if_exists( + "ix_gateway_bindings_search_space_state", + "gateway_conversation_bindings", + ) + _drop_index_if_exists( + "ix_gateway_bindings_user_state", "gateway_conversation_bindings" + ) + _drop_index_if_exists( + "uq_gateway_bindings_pairing_code_pending", + "gateway_conversation_bindings", + ) + _drop_index_if_exists( + "uq_gateway_bindings_account_peer_active", + "gateway_conversation_bindings", + ) + if _table_exists(conn, "gateway_conversation_bindings"): + op.drop_table("gateway_conversation_bindings") + + _drop_index_if_exists("uq_gateway_accounts_system_platform", "gateway_platform_accounts") + _drop_index_if_exists("uq_gateway_accounts_owner_platform", "gateway_platform_accounts") + if _table_exists(conn, "gateway_platform_accounts"): + op.drop_table("gateway_platform_accounts") + + for enum_name in ( + "gateway_event_status", + "gateway_event_kind", + "gateway_group_policy", + "gateway_dm_policy", + "gateway_session_scope", + "gateway_peer_kind", + "gateway_binding_state", + "gateway_health_status", + "gateway_account_mode", + "gateway_platform", + ): + postgresql.ENUM(name=enum_name).drop(conn, checkfirst=True) diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 9fc27fb1f..82b641ca6 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -14,6 +14,7 @@ from sqlalchemy import ( TIMESTAMP, BigInteger, Boolean, + CheckConstraint, Column, Enum as SQLAlchemyEnum, ForeignKey, @@ -573,6 +574,73 @@ class ChatVisibility(StrEnum): # PUBLIC = "PUBLIC" # Reserved for future implementation +class GatewayPlatform(StrEnum): + TELEGRAM = "telegram" + WHATSAPP = "whatsapp" + SIGNAL = "signal" + + +class GatewayAccountMode(StrEnum): + CLOUD_SHARED = "cloud_shared" + SELF_HOST_BYO = "self_host_byo" + + +class GatewayHealthStatus(StrEnum): + UNKNOWN = "unknown" + OK = "ok" + FAILING = "failing" + + +class GatewayBindingState(StrEnum): + PENDING = "pending" + BOUND = "bound" + REVOKED = "revoked" + SUSPENDED = "suspended" + + +class GatewayPeerKind(StrEnum): + DIRECT = "direct" + GROUP = "group" + CHANNEL = "channel" + UNKNOWN = "unknown" + + +class GatewaySessionScope(StrEnum): + PER_BINDING = "per_binding" + PER_USER_SEARCH_SPACE = "per_user_search_space" + EPHEMERAL = "ephemeral" + + +class GatewayDmPolicy(StrEnum): + ENABLED = "enabled" + DISABLED = "disabled" + + +class GatewayGroupPolicy(StrEnum): + DISABLED = "disabled" + ALLOWLIST = "allowlist" + MENTION_REQUIRED = "mention_required" + + +class GatewayEventKind(StrEnum): + MESSAGE = "message" + EDITED_MESSAGE = "edited_message" + CALLBACK_QUERY = "callback_query" + OTHER = "other" + + +class GatewayEventStatus(StrEnum): + RECEIVED = "received" + PROCESSING = "processing" + PROCESSED = "processed" + IGNORED = "ignored" + FAILED = "failed" + + +def _enum_values(enum_cls): + return [item.value for item in enum_cls] + + class NewChatThread(BaseModel, TimestampMixin): """ Thread model for the new chat feature using assistant-ui. @@ -645,6 +713,16 @@ class NewChatThread(BaseModel, TimestampMixin): # agent_llm_id changes). Unindexed: all reads are by primary key. pinned_llm_config_id = Column(Integer, nullable=True) + # Gateway-originated threads are persisted for the agent, but the UI Zero + # publication only exposes ``source='web'`` rows. + source = Column(Text, nullable=False, default="web", server_default="web") + binding_id = Column( + BigInteger, + ForeignKey("gateway_conversation_bindings.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + # Relationships search_space = relationship("SearchSpace", back_populates="new_chat_threads") created_by = relationship("User", back_populates="new_chat_threads") @@ -665,6 +743,11 @@ class NewChatThread(BaseModel, TimestampMixin): back_populates="thread", cascade="all, delete-orphan", ) + gateway_binding = relationship( + "GatewayConversationBinding", + foreign_keys=[binding_id], + back_populates="threads", + ) class NewChatMessage(BaseModel, TimestampMixin): @@ -718,6 +801,11 @@ class NewChatMessage(BaseModel, TimestampMixin): # a message back to the LangGraph checkpoint that produced its turn. turn_id = Column(String(64), nullable=True, index=True) + # Mirrors the parent thread source for publication-level filtering. + # This denormalization avoids join-dependent logical replication rules. + source = Column(Text, nullable=False, default="web", server_default="web") + platform_metadata = Column(JSONB, nullable=True) + # Relationships thread = relationship("NewChatThread", back_populates="messages") author = relationship("User") @@ -734,6 +822,298 @@ class NewChatMessage(BaseModel, TimestampMixin): ) +class GatewayPlatformAccount(Base, TimestampMixin): + __tablename__ = "gateway_platform_accounts" + __allow_unmapped__ = True + + id = Column(BigInteger, primary_key=True, index=True) + platform = Column( + SQLAlchemyEnum( + GatewayPlatform, + name="gateway_platform", + values_callable=_enum_values, + ), + nullable=False, + ) + mode = Column( + SQLAlchemyEnum( + GatewayAccountMode, + name="gateway_account_mode", + values_callable=_enum_values, + ), + nullable=False, + ) + owner_user_id = Column( + UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=True + ) + owner_search_space_id = Column( + Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True + ) + is_system_account = Column(Boolean, nullable=False, default=False, server_default="false") + encrypted_credentials = Column(Text, nullable=True) + account_metadata = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")) + cursor_state = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")) + health_status = Column( + SQLAlchemyEnum( + GatewayHealthStatus, + name="gateway_health_status", + values_callable=_enum_values, + ), + nullable=False, + default=GatewayHealthStatus.UNKNOWN, + server_default=GatewayHealthStatus.UNKNOWN.value, + ) + last_health_check_at = Column(TIMESTAMP(timezone=True), nullable=True) + suspended_at = Column(TIMESTAMP(timezone=True), nullable=True) + suspended_reason = Column(Text, nullable=True) + updated_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + default=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + server_default=text("(now() AT TIME ZONE 'utc')"), + ) + + owner = relationship("User", foreign_keys=[owner_user_id]) + owner_search_space = relationship("SearchSpace", foreign_keys=[owner_search_space_id]) + bindings = relationship( + "GatewayConversationBinding", + back_populates="account", + cascade="all, delete-orphan", + ) + inbound_events = relationship( + "GatewayInboundEvent", + back_populates="account", + cascade="all, delete-orphan", + ) + + __table_args__ = ( + CheckConstraint( + "(is_system_account = true AND owner_user_id IS NULL) OR " + "(is_system_account = false AND owner_user_id IS NOT NULL)", + name="ck_gateway_accounts_owner_shape", + ), + Index( + "uq_gateway_accounts_owner_platform", + "owner_user_id", + "platform", + unique=True, + postgresql_where=text("is_system_account = false"), + ), + Index( + "uq_gateway_accounts_system_platform", + "platform", + unique=True, + postgresql_where=text("is_system_account = true"), + ), + ) + + +class GatewayConversationBinding(Base, TimestampMixin): + __tablename__ = "gateway_conversation_bindings" + __allow_unmapped__ = True + + id = Column(BigInteger, primary_key=True, index=True) + account_id = Column( + BigInteger, + ForeignKey("gateway_platform_accounts.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + user_id = Column( + UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + search_space_id = Column( + Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False + ) + state = Column( + SQLAlchemyEnum( + GatewayBindingState, + name="gateway_binding_state", + values_callable=_enum_values, + ), + nullable=False, + default=GatewayBindingState.PENDING, + server_default=GatewayBindingState.PENDING.value, + ) + pairing_code = Column(Text, nullable=True) + pairing_code_expires_at = Column(TIMESTAMP(timezone=True), nullable=True) + external_peer_id = Column(Text, nullable=True) + external_peer_kind = Column( + SQLAlchemyEnum( + GatewayPeerKind, + name="gateway_peer_kind", + values_callable=_enum_values, + ), + nullable=False, + default=GatewayPeerKind.UNKNOWN, + server_default=GatewayPeerKind.UNKNOWN.value, + ) + external_thread_id = Column(Text, nullable=True) + external_display_name = Column(Text, nullable=True) + external_username = Column(Text, nullable=True) + external_pii_hashes = Column(JSONB, nullable=True) + external_metadata = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")) + active_thread_id = Column( + Integer, + ForeignKey("new_chat_threads.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + session_scope = Column( + SQLAlchemyEnum( + GatewaySessionScope, + name="gateway_session_scope", + values_callable=_enum_values, + ), + nullable=False, + default=GatewaySessionScope.PER_BINDING, + server_default=GatewaySessionScope.PER_BINDING.value, + ) + dm_policy = Column( + SQLAlchemyEnum( + GatewayDmPolicy, + name="gateway_dm_policy", + values_callable=_enum_values, + ), + nullable=False, + default=GatewayDmPolicy.ENABLED, + server_default=GatewayDmPolicy.ENABLED.value, + ) + group_policy = Column( + SQLAlchemyEnum( + GatewayGroupPolicy, + name="gateway_group_policy", + values_callable=_enum_values, + ), + nullable=False, + default=GatewayGroupPolicy.DISABLED, + server_default=GatewayGroupPolicy.DISABLED.value, + ) + revoked_at = Column(TIMESTAMP(timezone=True), nullable=True) + suspended_at = Column(TIMESTAMP(timezone=True), nullable=True) + suspended_reason = Column(Text, nullable=True) + updated_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + default=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + server_default=text("(now() AT TIME ZONE 'utc')"), + ) + + account = relationship("GatewayPlatformAccount", back_populates="bindings") + user = relationship("User", foreign_keys=[user_id]) + search_space = relationship("SearchSpace", foreign_keys=[search_space_id]) + active_thread = relationship("NewChatThread", foreign_keys=[active_thread_id]) + threads = relationship( + "NewChatThread", + back_populates="gateway_binding", + foreign_keys="NewChatThread.binding_id", + ) + inbound_events = relationship( + "GatewayInboundEvent", + back_populates="binding", + foreign_keys="GatewayInboundEvent.binding_id", + ) + + __table_args__ = ( + Index( + "uq_gateway_bindings_account_peer_active", + "account_id", + "external_peer_id", + unique=True, + postgresql_where=text( + "state IN ('bound', 'suspended') AND external_peer_id IS NOT NULL" + ), + ), + Index( + "uq_gateway_bindings_pairing_code_pending", + "pairing_code", + unique=True, + postgresql_where=text("state = 'pending'"), + ), + Index("ix_gateway_bindings_user_state", "user_id", "state"), + Index("ix_gateway_bindings_search_space_state", "search_space_id", "state"), + ) + + +class GatewayInboundEvent(Base, TimestampMixin): + __tablename__ = "gateway_inbound_events" + __allow_unmapped__ = True + + id = Column(BigInteger, primary_key=True, index=True) + account_id = Column( + BigInteger, + ForeignKey("gateway_platform_accounts.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + binding_id = Column( + BigInteger, + ForeignKey("gateway_conversation_bindings.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + platform = Column( + SQLAlchemyEnum( + GatewayPlatform, + name="gateway_platform", + values_callable=_enum_values, + ), + nullable=False, + ) + event_dedupe_key = Column(Text, nullable=False) + external_event_id = Column(Text, nullable=True) + external_message_id = Column(Text, nullable=True) + event_kind = Column( + SQLAlchemyEnum( + GatewayEventKind, + name="gateway_event_kind", + values_callable=_enum_values, + ), + nullable=False, + ) + raw_payload = Column(JSONB, nullable=True) + processing_metadata = Column( + JSONB, + nullable=False, + default=dict, + server_default=text("'{}'::jsonb"), + ) + status = Column( + SQLAlchemyEnum( + GatewayEventStatus, + name="gateway_event_status", + values_callable=_enum_values, + ), + nullable=False, + default=GatewayEventStatus.RECEIVED, + server_default=GatewayEventStatus.RECEIVED.value, + ) + attempt_count = Column(Integer, nullable=False, default=0, server_default="0") + last_error = Column(Text, nullable=True) + received_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + default=lambda: datetime.now(UTC), + server_default=text("(now() AT TIME ZONE 'utc')"), + ) + processed_at = Column(TIMESTAMP(timezone=True), nullable=True) + + account = relationship("GatewayPlatformAccount", back_populates="inbound_events") + binding = relationship("GatewayConversationBinding", back_populates="inbound_events") + + __table_args__ = ( + UniqueConstraint( + "account_id", + "event_dedupe_key", + name="uq_gateway_inbound_account_dedupe_key", + ), + Index("ix_gateway_inbound_status_received_at", "status", "received_at"), + Index("ix_gateway_inbound_binding_received_at", "binding_id", "received_at"), + ) + + class TokenUsage(BaseModel, TimestampMixin): """ Tracks LLM token consumption per assistant turn. diff --git a/surfsense_web/zero/schema/chat.ts b/surfsense_web/zero/schema/chat.ts index fb3d7651e..8da41ee45 100644 --- a/surfsense_web/zero/schema/chat.ts +++ b/surfsense_web/zero/schema/chat.ts @@ -8,6 +8,8 @@ export const newChatMessageTable = table("new_chat_messages") threadId: number().from("thread_id"), authorId: string().optional().from("author_id"), createdAt: number().from("created_at"), + source: string(), + platformMetadata: json().optional().from("platform_metadata"), // Per-turn correlation id sourced from ``configurable.turn_id`` // at streaming time. Required by the inline Revert button's // (chat_turn_id, tool_name, position) fallback in tool-fallback.tsx