diff --git a/surfsense_backend/alembic/versions/144_add_gateway_tables.py b/surfsense_backend/alembic/versions/144_add_gateway_tables.py index 011333d69..35eb662c8 100644 --- a/surfsense_backend/alembic/versions/144_add_gateway_tables.py +++ b/surfsense_backend/alembic/versions/144_add_gateway_tables.py @@ -1,20 +1,21 @@ -"""add gateway tables for Telegram messaging gateway +"""add external chat surface tables Revision ID: 144 Revises: 143 Create Date: 2026-05-27 -Adds the lean v6 gateway schema: +Adds the lean external chat surface schema: -* gateway_platform_accounts -* gateway_conversation_bindings -* gateway_inbound_events +* external_chat_accounts +* external_chat_bindings +* external_chat_inbound_events -The gateway stores Telegram-originated conversations in the existing chat -tables but keeps them out of UI replication. This migration adds ``source`` to -``new_chat_messages`` as a denormalized Zero publication boundary and publishes -only ``source = 'web'`` rows. Gateway control-plane tables are served through -REST in v1, so they are intentionally not added to ``zero_publication``. +External chat surfaces store Telegram-originated conversations in the existing +chat tables. This migration adds ``source`` to ``new_chat_threads`` and +``new_chat_messages`` as UI metadata while publishing all chat-message sources +through Zero so a future SurfSense UI layer can render external chats. External +chat adapter tables are served through REST in v1, so they are intentionally not +added to ``zero_publication``. """ from __future__ import annotations @@ -146,7 +147,7 @@ def _build_set_table_ddl( f"documents ({_cols(doc_cols)}), " f"folders, " f"search_source_connectors, " - f"new_chat_messages WHERE (source = 'web'), " + f"new_chat_messages, " f"chat_comments, " f"chat_session_state, " f'"user" ({_cols(user_cols)})' @@ -161,53 +162,41 @@ def _create_enum(name: str, values: tuple[str, ...]) -> postgresql.ENUM: def upgrade() -> None: conn = op.get_bind() - gateway_platform_enum = _create_enum( - "gateway_platform", ("telegram", "whatsapp", "signal") + external_chat_platform_enum = _create_enum( + "external_chat_platform", ("telegram", "whatsapp", "signal") ) - gateway_account_mode_enum = _create_enum( - "gateway_account_mode", ("cloud_shared", "self_host_byo") + external_chat_account_mode_enum = _create_enum( + "external_chat_account_mode", ("cloud_shared", "self_host_byo") ) - gateway_health_status_enum = _create_enum( - "gateway_health_status", ("unknown", "ok", "failing") + external_chat_health_status_enum = _create_enum( + "external_chat_health_status", ("unknown", "ok", "failing") ) - gateway_binding_state_enum = _create_enum( - "gateway_binding_state", ("pending", "bound", "revoked", "suspended") + external_chat_binding_state_enum = _create_enum( + "external_chat_binding_state", ("pending", "bound", "revoked", "suspended") ) - gateway_peer_kind_enum = _create_enum( - "gateway_peer_kind", ("direct", "group", "channel", "unknown") + external_chat_peer_kind_enum = _create_enum( + "external_chat_peer_kind", ("direct", "group", "channel", "unknown") ) - gateway_session_scope_enum = _create_enum( - "gateway_session_scope", - ("per_binding", "per_user_search_space", "ephemeral"), + external_chat_event_kind_enum = _create_enum( + "external_chat_event_kind", ("message", "edited_message", "callback_query", "other") ) - gateway_dm_policy_enum = _create_enum("gateway_dm_policy", ("enabled", "disabled")) - gateway_group_policy_enum = _create_enum( - "gateway_group_policy", ("disabled", "allowlist", "mention_required") - ) - gateway_event_kind_enum = _create_enum( - "gateway_event_kind", ("message", "edited_message", "callback_query", "other") - ) - gateway_event_status_enum = _create_enum( - "gateway_event_status", + external_chat_event_status_enum = _create_enum( + "external_chat_event_status", ("received", "processing", "processed", "ignored", "failed"), ) - if not _table_exists(conn, "gateway_platform_accounts"): + if not _table_exists(conn, "external_chat_accounts"): op.create_table( - "gateway_platform_accounts", + "external_chat_accounts", sa.Column("id", sa.BigInteger(), primary_key=True), - sa.Column("platform", gateway_platform_enum, nullable=False), - sa.Column("mode", gateway_account_mode_enum, nullable=False), + sa.Column("platform", external_chat_platform_enum, nullable=False), + sa.Column("mode", external_chat_account_mode_enum, nullable=False), sa.Column("owner_user_id", postgresql.UUID(as_uuid=True), nullable=True), sa.Column("owner_search_space_id", sa.Integer(), nullable=True), sa.Column("is_system_account", sa.Boolean(), nullable=False, server_default="false"), sa.Column("encrypted_credentials", sa.Text(), nullable=True), - sa.Column( - "account_metadata", - postgresql.JSONB(astext_type=sa.Text()), - nullable=False, - server_default=sa.text("'{}'::jsonb"), - ), + sa.Column("bot_username", sa.String(255), nullable=True), + sa.Column("webhook_secret", sa.String(64), nullable=True), sa.Column( "cursor_state", postgresql.JSONB(astext_type=sa.Text()), @@ -216,7 +205,7 @@ def upgrade() -> None: ), sa.Column( "health_status", - gateway_health_status_enum, + external_chat_health_status_enum, nullable=False, server_default="unknown", ), @@ -238,7 +227,7 @@ def upgrade() -> None: sa.CheckConstraint( "(is_system_account = true AND owner_user_id IS NULL) OR " "(is_system_account = false AND owner_user_id IS NOT NULL)", - name="ck_gateway_accounts_owner_shape", + name="ck_external_chat_accounts_owner_shape", ), sa.ForeignKeyConstraint(["owner_user_id"], ["user.id"], ondelete="CASCADE"), sa.ForeignKeyConstraint( @@ -246,32 +235,40 @@ def upgrade() -> None: ), ) op.create_index( - "uq_gateway_accounts_owner_platform", - "gateway_platform_accounts", + "uq_external_chat_accounts_owner_platform", + "external_chat_accounts", ["owner_user_id", "platform"], unique=True, postgresql_where=sa.text("is_system_account = false"), if_not_exists=True, ) op.create_index( - "uq_gateway_accounts_system_platform", - "gateway_platform_accounts", + "uq_external_chat_accounts_system_platform", + "external_chat_accounts", ["platform"], unique=True, postgresql_where=sa.text("is_system_account = true"), if_not_exists=True, ) + op.create_index( + "uq_external_chat_accounts_webhook_secret", + "external_chat_accounts", + ["webhook_secret"], + unique=True, + postgresql_where=sa.text("webhook_secret IS NOT NULL"), + if_not_exists=True, + ) - if not _table_exists(conn, "gateway_conversation_bindings"): + if not _table_exists(conn, "external_chat_bindings"): op.create_table( - "gateway_conversation_bindings", + "external_chat_bindings", sa.Column("id", sa.BigInteger(), primary_key=True), sa.Column("account_id", sa.BigInteger(), nullable=False), sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=False), sa.Column("search_space_id", sa.Integer(), nullable=False), sa.Column( "state", - gateway_binding_state_enum, + external_chat_binding_state_enum, nullable=False, server_default="pending", ), @@ -280,39 +277,25 @@ def upgrade() -> None: sa.Column("external_peer_id", sa.Text(), nullable=True), sa.Column( "external_peer_kind", - gateway_peer_kind_enum, + external_chat_peer_kind_enum, nullable=False, server_default="unknown", ), - sa.Column("external_thread_id", sa.Text(), nullable=True), + sa.Column( + "external_thread_id", + sa.Text(), + nullable=True, + comment="Reserved for Telegram message_thread_id when group/forum support lands.", + ), sa.Column("external_display_name", sa.Text(), nullable=True), sa.Column("external_username", sa.Text(), nullable=True), - sa.Column("external_pii_hashes", postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.Column( "external_metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=False, server_default=sa.text("'{}'::jsonb"), ), - sa.Column("active_thread_id", sa.Integer(), nullable=True), - sa.Column( - "session_scope", - gateway_session_scope_enum, - nullable=False, - server_default="per_binding", - ), - sa.Column( - "dm_policy", - gateway_dm_policy_enum, - nullable=False, - server_default="enabled", - ), - sa.Column( - "group_policy", - gateway_group_policy_enum, - nullable=False, - server_default="disabled", - ), + sa.Column("new_chat_thread_id", sa.Integer(), nullable=True), sa.Column("revoked_at", sa.TIMESTAMP(timezone=True), nullable=True), sa.Column("suspended_at", sa.TIMESTAMP(timezone=True), nullable=True), sa.Column("suspended_reason", sa.Text(), nullable=True), @@ -329,17 +312,17 @@ def upgrade() -> None: server_default=sa.text("(now() AT TIME ZONE 'utc')"), ), sa.ForeignKeyConstraint( - ["account_id"], ["gateway_platform_accounts.id"], ondelete="CASCADE" + ["account_id"], ["external_chat_accounts.id"], ondelete="CASCADE" ), sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), sa.ForeignKeyConstraint(["search_space_id"], ["searchspaces.id"], ondelete="CASCADE"), sa.ForeignKeyConstraint( - ["active_thread_id"], ["new_chat_threads.id"], ondelete="SET NULL" + ["new_chat_thread_id"], ["new_chat_threads.id"], ondelete="SET NULL" ), ) op.create_index( - "uq_gateway_bindings_account_peer_active", - "gateway_conversation_bindings", + "uq_external_chat_bindings_account_peer_active", + "external_chat_bindings", ["account_id", "external_peer_id"], unique=True, postgresql_where=sa.text( @@ -348,51 +331,46 @@ def upgrade() -> None: if_not_exists=True, ) op.create_index( - "uq_gateway_bindings_pairing_code_pending", - "gateway_conversation_bindings", + "uq_external_chat_bindings_pairing_code_pending", + "external_chat_bindings", ["pairing_code"], unique=True, postgresql_where=sa.text("state = 'pending'"), if_not_exists=True, ) op.create_index( - "ix_gateway_bindings_user_state", - "gateway_conversation_bindings", + "ix_external_chat_bindings_user_state", + "external_chat_bindings", ["user_id", "state"], if_not_exists=True, ) op.create_index( - "ix_gateway_bindings_search_space_state", - "gateway_conversation_bindings", + "ix_external_chat_bindings_search_space_state", + "external_chat_bindings", ["search_space_id", "state"], if_not_exists=True, ) - if not _table_exists(conn, "gateway_inbound_events"): + if not _table_exists(conn, "external_chat_inbound_events"): op.create_table( - "gateway_inbound_events", + "external_chat_inbound_events", sa.Column("id", sa.BigInteger(), primary_key=True), sa.Column("account_id", sa.BigInteger(), nullable=False), - sa.Column("binding_id", sa.BigInteger(), nullable=True), - sa.Column("platform", gateway_platform_enum, nullable=False), + sa.Column("external_chat_binding_id", sa.BigInteger(), nullable=True), + sa.Column("platform", external_chat_platform_enum, nullable=False), sa.Column("event_dedupe_key", sa.Text(), nullable=False), sa.Column("external_event_id", sa.Text(), nullable=True), sa.Column("external_message_id", sa.Text(), nullable=True), - sa.Column("event_kind", gateway_event_kind_enum, nullable=False), + sa.Column("event_kind", external_chat_event_kind_enum, nullable=False), sa.Column( "raw_payload", postgresql.JSONB(astext_type=sa.Text()), nullable=True, ), - sa.Column( - "processing_metadata", - postgresql.JSONB(astext_type=sa.Text()), - nullable=False, - server_default=sa.text("'{}'::jsonb"), - ), + sa.Column("request_id", sa.String(64), nullable=True), sa.Column( "status", - gateway_event_status_enum, + external_chat_event_status_enum, nullable=False, server_default="received", ), @@ -412,27 +390,34 @@ def upgrade() -> None: server_default=sa.text("(now() AT TIME ZONE 'utc')"), ), sa.ForeignKeyConstraint( - ["account_id"], ["gateway_platform_accounts.id"], ondelete="CASCADE" + ["account_id"], ["external_chat_accounts.id"], ondelete="CASCADE" ), sa.ForeignKeyConstraint( - ["binding_id"], ["gateway_conversation_bindings.id"], ondelete="SET NULL" + ["external_chat_binding_id"], ["external_chat_bindings.id"], ondelete="SET NULL" ), sa.UniqueConstraint( "account_id", "event_dedupe_key", - name="uq_gateway_inbound_account_dedupe_key", + name="uq_external_chat_inbound_account_dedupe_key", ), ) op.create_index( - "ix_gateway_inbound_status_received_at", - "gateway_inbound_events", + "ix_external_chat_inbound_status_received_at", + "external_chat_inbound_events", ["status", "received_at"], if_not_exists=True, ) op.create_index( - "ix_gateway_inbound_binding_received_at", - "gateway_inbound_events", - ["binding_id", "received_at"], + "ix_external_chat_inbound_binding_received_at", + "external_chat_inbound_events", + ["external_chat_binding_id", "received_at"], + if_not_exists=True, + ) + op.create_index( + "ix_external_chat_inbound_request_id", + "external_chat_inbound_events", + ["request_id"], + postgresql_where=sa.text("request_id IS NOT NULL"), if_not_exists=True, ) @@ -442,27 +427,27 @@ def upgrade() -> None: sa.Column("source", sa.Text(), nullable=False, server_default="web"), ) op.alter_column("new_chat_threads", "source", type_=sa.Text()) - if not _column_exists(conn, "new_chat_threads", "binding_id"): + if not _column_exists(conn, "new_chat_threads", "external_chat_binding_id"): op.add_column( "new_chat_threads", - sa.Column("binding_id", sa.BigInteger(), nullable=True), + sa.Column("external_chat_binding_id", sa.BigInteger(), nullable=True), ) if not _constraint_exists( - conn, "new_chat_threads", "fk_new_chat_threads_gateway_binding_id" + conn, "new_chat_threads", "fk_new_chat_threads_external_chat_external_chat_binding_id" ): op.create_foreign_key( - "fk_new_chat_threads_gateway_binding_id", + "fk_new_chat_threads_external_chat_external_chat_binding_id", "new_chat_threads", - "gateway_conversation_bindings", - ["binding_id"], + "external_chat_bindings", + ["external_chat_binding_id"], ["id"], ondelete="SET NULL", ) op.create_index("ix_new_chat_threads_source", "new_chat_threads", ["source"], if_not_exists=True) op.create_index( - "ix_new_chat_threads_binding_id", + "ix_new_chat_threads_external_chat_binding_id", "new_chat_threads", - ["binding_id"], + ["external_chat_binding_id"], if_not_exists=True, ) @@ -510,7 +495,9 @@ def upgrade() -> None: tx = conn.begin_nested() if conn.in_transaction() else conn.begin() with tx: conn.execute( - sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-144-gateway'") + sa.text( + f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-144-external-chat'" + ) ) conn.execute( sa.text( @@ -521,7 +508,9 @@ def upgrade() -> None: ) ) conn.execute( - sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-144-gateway'") + sa.text( + f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-144-external-chat'" + ) ) @@ -565,59 +554,58 @@ def downgrade() -> None: _drop_column_if_exists("new_chat_messages", "platform_metadata") _drop_column_if_exists("new_chat_messages", "source") - _drop_index_if_exists("ix_new_chat_threads_binding_id", "new_chat_threads") + _drop_index_if_exists("ix_new_chat_threads_external_chat_binding_id", "new_chat_threads") _drop_index_if_exists("ix_new_chat_threads_source", "new_chat_threads") if _constraint_exists( - conn, "new_chat_threads", "fk_new_chat_threads_gateway_binding_id" + conn, "new_chat_threads", "fk_new_chat_threads_external_chat_external_chat_binding_id" ): op.drop_constraint( - "fk_new_chat_threads_gateway_binding_id", + "fk_new_chat_threads_external_chat_external_chat_binding_id", "new_chat_threads", type_="foreignkey", ) - _drop_column_if_exists("new_chat_threads", "binding_id") + _drop_column_if_exists("new_chat_threads", "external_chat_binding_id") _drop_column_if_exists("new_chat_threads", "source") _drop_index_if_exists( - "ix_gateway_inbound_binding_received_at", "gateway_inbound_events" + "ix_external_chat_inbound_binding_received_at", "external_chat_inbound_events" ) - _drop_index_if_exists("ix_gateway_inbound_status_received_at", "gateway_inbound_events") - if _table_exists(conn, "gateway_inbound_events"): - op.drop_table("gateway_inbound_events") + _drop_index_if_exists("ix_external_chat_inbound_request_id", "external_chat_inbound_events") + _drop_index_if_exists("ix_external_chat_inbound_status_received_at", "external_chat_inbound_events") + if _table_exists(conn, "external_chat_inbound_events"): + op.drop_table("external_chat_inbound_events") _drop_index_if_exists( - "ix_gateway_bindings_search_space_state", - "gateway_conversation_bindings", + "ix_external_chat_bindings_search_space_state", + "external_chat_bindings", ) _drop_index_if_exists( - "ix_gateway_bindings_user_state", "gateway_conversation_bindings" + "ix_external_chat_bindings_user_state", "external_chat_bindings" ) _drop_index_if_exists( - "uq_gateway_bindings_pairing_code_pending", - "gateway_conversation_bindings", + "uq_external_chat_bindings_pairing_code_pending", + "external_chat_bindings", ) _drop_index_if_exists( - "uq_gateway_bindings_account_peer_active", - "gateway_conversation_bindings", + "uq_external_chat_bindings_account_peer_active", + "external_chat_bindings", ) - if _table_exists(conn, "gateway_conversation_bindings"): - op.drop_table("gateway_conversation_bindings") + if _table_exists(conn, "external_chat_bindings"): + op.drop_table("external_chat_bindings") - _drop_index_if_exists("uq_gateway_accounts_system_platform", "gateway_platform_accounts") - _drop_index_if_exists("uq_gateway_accounts_owner_platform", "gateway_platform_accounts") - if _table_exists(conn, "gateway_platform_accounts"): - op.drop_table("gateway_platform_accounts") + _drop_index_if_exists("uq_external_chat_accounts_system_platform", "external_chat_accounts") + _drop_index_if_exists("uq_external_chat_accounts_owner_platform", "external_chat_accounts") + _drop_index_if_exists("uq_external_chat_accounts_webhook_secret", "external_chat_accounts") + if _table_exists(conn, "external_chat_accounts"): + op.drop_table("external_chat_accounts") for enum_name in ( - "gateway_event_status", - "gateway_event_kind", - "gateway_group_policy", - "gateway_dm_policy", - "gateway_session_scope", - "gateway_peer_kind", - "gateway_binding_state", - "gateway_health_status", - "gateway_account_mode", - "gateway_platform", + "external_chat_event_status", + "external_chat_event_kind", + "external_chat_peer_kind", + "external_chat_binding_state", + "external_chat_health_status", + "external_chat_account_mode", + "external_chat_platform", ): postgresql.ENUM(name=enum_name).drop(conn, checkfirst=True) diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 82b641ca6..de7792627 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -574,62 +574,45 @@ class ChatVisibility(StrEnum): # PUBLIC = "PUBLIC" # Reserved for future implementation -class GatewayPlatform(StrEnum): +class ExternalChatPlatform(StrEnum): TELEGRAM = "telegram" WHATSAPP = "whatsapp" SIGNAL = "signal" -class GatewayAccountMode(StrEnum): +class ExternalChatAccountMode(StrEnum): CLOUD_SHARED = "cloud_shared" SELF_HOST_BYO = "self_host_byo" -class GatewayHealthStatus(StrEnum): +class ExternalChatHealthStatus(StrEnum): UNKNOWN = "unknown" OK = "ok" FAILING = "failing" -class GatewayBindingState(StrEnum): +class ExternalChatBindingState(StrEnum): PENDING = "pending" BOUND = "bound" REVOKED = "revoked" SUSPENDED = "suspended" -class GatewayPeerKind(StrEnum): +class ExternalChatPeerKind(StrEnum): DIRECT = "direct" GROUP = "group" CHANNEL = "channel" UNKNOWN = "unknown" -class GatewaySessionScope(StrEnum): - PER_BINDING = "per_binding" - PER_USER_SEARCH_SPACE = "per_user_search_space" - EPHEMERAL = "ephemeral" - - -class GatewayDmPolicy(StrEnum): - ENABLED = "enabled" - DISABLED = "disabled" - - -class GatewayGroupPolicy(StrEnum): - DISABLED = "disabled" - ALLOWLIST = "allowlist" - MENTION_REQUIRED = "mention_required" - - -class GatewayEventKind(StrEnum): +class ExternalChatEventKind(StrEnum): MESSAGE = "message" EDITED_MESSAGE = "edited_message" CALLBACK_QUERY = "callback_query" OTHER = "other" -class GatewayEventStatus(StrEnum): +class ExternalChatEventStatus(StrEnum): RECEIVED = "received" PROCESSING = "processing" PROCESSED = "processed" @@ -713,12 +696,12 @@ class NewChatThread(BaseModel, TimestampMixin): # agent_llm_id changes). Unindexed: all reads are by primary key. pinned_llm_config_id = Column(Integer, nullable=True) - # Gateway-originated threads are persisted for the agent, but the UI Zero - # publication only exposes ``source='web'`` rows. + # Surface metadata for web and external chat threads. Zero publishes all + # chat-message sources; the UI can decide which surfaces to render. source = Column(Text, nullable=False, default="web", server_default="web") - binding_id = Column( + external_chat_binding_id = Column( BigInteger, - ForeignKey("gateway_conversation_bindings.id", ondelete="SET NULL"), + ForeignKey("external_chat_bindings.id", ondelete="SET NULL"), nullable=True, index=True, ) @@ -743,9 +726,9 @@ class NewChatThread(BaseModel, TimestampMixin): back_populates="thread", cascade="all, delete-orphan", ) - gateway_binding = relationship( - "GatewayConversationBinding", - foreign_keys=[binding_id], + external_chat_binding = relationship( + "ExternalChatBinding", + foreign_keys=[external_chat_binding_id], back_populates="threads", ) @@ -822,23 +805,23 @@ class NewChatMessage(BaseModel, TimestampMixin): ) -class GatewayPlatformAccount(Base, TimestampMixin): - __tablename__ = "gateway_platform_accounts" +class ExternalChatAccount(Base, TimestampMixin): + __tablename__ = "external_chat_accounts" __allow_unmapped__ = True id = Column(BigInteger, primary_key=True, index=True) platform = Column( SQLAlchemyEnum( - GatewayPlatform, - name="gateway_platform", + ExternalChatPlatform, + name="external_chat_platform", values_callable=_enum_values, ), nullable=False, ) mode = Column( SQLAlchemyEnum( - GatewayAccountMode, - name="gateway_account_mode", + ExternalChatAccountMode, + name="external_chat_account_mode", values_callable=_enum_values, ), nullable=False, @@ -851,17 +834,18 @@ class GatewayPlatformAccount(Base, TimestampMixin): ) is_system_account = Column(Boolean, nullable=False, default=False, server_default="false") encrypted_credentials = Column(Text, nullable=True) - account_metadata = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")) + bot_username = Column(String(255), nullable=True) + webhook_secret = Column(String(64), nullable=True) cursor_state = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")) health_status = Column( SQLAlchemyEnum( - GatewayHealthStatus, - name="gateway_health_status", + ExternalChatHealthStatus, + name="external_chat_health_status", values_callable=_enum_values, ), nullable=False, - default=GatewayHealthStatus.UNKNOWN, - server_default=GatewayHealthStatus.UNKNOWN.value, + default=ExternalChatHealthStatus.UNKNOWN, + server_default=ExternalChatHealthStatus.UNKNOWN.value, ) last_health_check_at = Column(TIMESTAMP(timezone=True), nullable=True) suspended_at = Column(TIMESTAMP(timezone=True), nullable=True) @@ -877,12 +861,12 @@ class GatewayPlatformAccount(Base, TimestampMixin): owner = relationship("User", foreign_keys=[owner_user_id]) owner_search_space = relationship("SearchSpace", foreign_keys=[owner_search_space_id]) bindings = relationship( - "GatewayConversationBinding", + "ExternalChatBinding", back_populates="account", cascade="all, delete-orphan", ) inbound_events = relationship( - "GatewayInboundEvent", + "ExternalChatInboundEvent", back_populates="account", cascade="all, delete-orphan", ) @@ -891,32 +875,38 @@ class GatewayPlatformAccount(Base, TimestampMixin): CheckConstraint( "(is_system_account = true AND owner_user_id IS NULL) OR " "(is_system_account = false AND owner_user_id IS NOT NULL)", - name="ck_gateway_accounts_owner_shape", + name="ck_external_chat_accounts_owner_shape", ), Index( - "uq_gateway_accounts_owner_platform", + "uq_external_chat_accounts_owner_platform", "owner_user_id", "platform", unique=True, postgresql_where=text("is_system_account = false"), ), Index( - "uq_gateway_accounts_system_platform", + "uq_external_chat_accounts_system_platform", "platform", unique=True, postgresql_where=text("is_system_account = true"), ), + Index( + "uq_external_chat_accounts_webhook_secret", + "webhook_secret", + unique=True, + postgresql_where=text("webhook_secret IS NOT NULL"), + ), ) -class GatewayConversationBinding(Base, TimestampMixin): - __tablename__ = "gateway_conversation_bindings" +class ExternalChatBinding(Base, TimestampMixin): + __tablename__ = "external_chat_bindings" __allow_unmapped__ = True id = Column(BigInteger, primary_key=True, index=True) account_id = Column( BigInteger, - ForeignKey("gateway_platform_accounts.id", ondelete="CASCADE"), + ForeignKey("external_chat_accounts.id", ondelete="CASCADE"), nullable=False, index=True, ) @@ -928,68 +918,37 @@ class GatewayConversationBinding(Base, TimestampMixin): ) state = Column( SQLAlchemyEnum( - GatewayBindingState, - name="gateway_binding_state", + ExternalChatBindingState, + name="external_chat_binding_state", values_callable=_enum_values, ), nullable=False, - default=GatewayBindingState.PENDING, - server_default=GatewayBindingState.PENDING.value, + default=ExternalChatBindingState.PENDING, + server_default=ExternalChatBindingState.PENDING.value, ) pairing_code = Column(Text, nullable=True) pairing_code_expires_at = Column(TIMESTAMP(timezone=True), nullable=True) external_peer_id = Column(Text, nullable=True) external_peer_kind = Column( SQLAlchemyEnum( - GatewayPeerKind, - name="gateway_peer_kind", + ExternalChatPeerKind, + name="external_chat_peer_kind", values_callable=_enum_values, ), nullable=False, - default=GatewayPeerKind.UNKNOWN, - server_default=GatewayPeerKind.UNKNOWN.value, + default=ExternalChatPeerKind.UNKNOWN, + server_default=ExternalChatPeerKind.UNKNOWN.value, ) external_thread_id = Column(Text, nullable=True) external_display_name = Column(Text, nullable=True) external_username = Column(Text, nullable=True) - external_pii_hashes = Column(JSONB, nullable=True) external_metadata = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")) - active_thread_id = Column( + new_chat_thread_id = Column( Integer, ForeignKey("new_chat_threads.id", ondelete="SET NULL"), nullable=True, index=True, ) - session_scope = Column( - SQLAlchemyEnum( - GatewaySessionScope, - name="gateway_session_scope", - values_callable=_enum_values, - ), - nullable=False, - default=GatewaySessionScope.PER_BINDING, - server_default=GatewaySessionScope.PER_BINDING.value, - ) - dm_policy = Column( - SQLAlchemyEnum( - GatewayDmPolicy, - name="gateway_dm_policy", - values_callable=_enum_values, - ), - nullable=False, - default=GatewayDmPolicy.ENABLED, - server_default=GatewayDmPolicy.ENABLED.value, - ) - group_policy = Column( - SQLAlchemyEnum( - GatewayGroupPolicy, - name="gateway_group_policy", - values_callable=_enum_values, - ), - nullable=False, - default=GatewayGroupPolicy.DISABLED, - server_default=GatewayGroupPolicy.DISABLED.value, - ) revoked_at = Column(TIMESTAMP(timezone=True), nullable=True) suspended_at = Column(TIMESTAMP(timezone=True), nullable=True) suspended_reason = Column(Text, nullable=True) @@ -1001,24 +960,24 @@ class GatewayConversationBinding(Base, TimestampMixin): server_default=text("(now() AT TIME ZONE 'utc')"), ) - account = relationship("GatewayPlatformAccount", back_populates="bindings") + account = relationship("ExternalChatAccount", back_populates="bindings") user = relationship("User", foreign_keys=[user_id]) search_space = relationship("SearchSpace", foreign_keys=[search_space_id]) - active_thread = relationship("NewChatThread", foreign_keys=[active_thread_id]) + new_chat_thread = relationship("NewChatThread", foreign_keys=[new_chat_thread_id]) threads = relationship( "NewChatThread", - back_populates="gateway_binding", - foreign_keys="NewChatThread.binding_id", + back_populates="external_chat_binding", + foreign_keys="NewChatThread.external_chat_binding_id", ) inbound_events = relationship( - "GatewayInboundEvent", + "ExternalChatInboundEvent", back_populates="binding", - foreign_keys="GatewayInboundEvent.binding_id", + foreign_keys="ExternalChatInboundEvent.external_chat_binding_id", ) __table_args__ = ( Index( - "uq_gateway_bindings_account_peer_active", + "uq_external_chat_bindings_account_peer_active", "account_id", "external_peer_id", unique=True, @@ -1027,37 +986,37 @@ class GatewayConversationBinding(Base, TimestampMixin): ), ), Index( - "uq_gateway_bindings_pairing_code_pending", + "uq_external_chat_bindings_pairing_code_pending", "pairing_code", unique=True, postgresql_where=text("state = 'pending'"), ), - Index("ix_gateway_bindings_user_state", "user_id", "state"), - Index("ix_gateway_bindings_search_space_state", "search_space_id", "state"), + Index("ix_external_chat_bindings_user_state", "user_id", "state"), + Index("ix_external_chat_bindings_search_space_state", "search_space_id", "state"), ) -class GatewayInboundEvent(Base, TimestampMixin): - __tablename__ = "gateway_inbound_events" +class ExternalChatInboundEvent(Base, TimestampMixin): + __tablename__ = "external_chat_inbound_events" __allow_unmapped__ = True id = Column(BigInteger, primary_key=True, index=True) account_id = Column( BigInteger, - ForeignKey("gateway_platform_accounts.id", ondelete="CASCADE"), + ForeignKey("external_chat_accounts.id", ondelete="CASCADE"), nullable=False, index=True, ) - binding_id = Column( + external_chat_binding_id = Column( BigInteger, - ForeignKey("gateway_conversation_bindings.id", ondelete="SET NULL"), + ForeignKey("external_chat_bindings.id", ondelete="SET NULL"), nullable=True, index=True, ) platform = Column( SQLAlchemyEnum( - GatewayPlatform, - name="gateway_platform", + ExternalChatPlatform, + name="external_chat_platform", values_callable=_enum_values, ), nullable=False, @@ -1067,28 +1026,23 @@ class GatewayInboundEvent(Base, TimestampMixin): external_message_id = Column(Text, nullable=True) event_kind = Column( SQLAlchemyEnum( - GatewayEventKind, - name="gateway_event_kind", + ExternalChatEventKind, + name="external_chat_event_kind", values_callable=_enum_values, ), nullable=False, ) raw_payload = Column(JSONB, nullable=True) - processing_metadata = Column( - JSONB, - nullable=False, - default=dict, - server_default=text("'{}'::jsonb"), - ) + request_id = Column(String(64), nullable=True) status = Column( SQLAlchemyEnum( - GatewayEventStatus, - name="gateway_event_status", + ExternalChatEventStatus, + name="external_chat_event_status", values_callable=_enum_values, ), nullable=False, - default=GatewayEventStatus.RECEIVED, - server_default=GatewayEventStatus.RECEIVED.value, + default=ExternalChatEventStatus.RECEIVED, + server_default=ExternalChatEventStatus.RECEIVED.value, ) attempt_count = Column(Integer, nullable=False, default=0, server_default="0") last_error = Column(Text, nullable=True) @@ -1100,17 +1054,26 @@ class GatewayInboundEvent(Base, TimestampMixin): ) processed_at = Column(TIMESTAMP(timezone=True), nullable=True) - account = relationship("GatewayPlatformAccount", back_populates="inbound_events") - binding = relationship("GatewayConversationBinding", back_populates="inbound_events") + account = relationship("ExternalChatAccount", back_populates="inbound_events") + binding = relationship("ExternalChatBinding", back_populates="inbound_events") __table_args__ = ( UniqueConstraint( "account_id", "event_dedupe_key", - name="uq_gateway_inbound_account_dedupe_key", + name="uq_external_chat_inbound_account_dedupe_key", + ), + Index("ix_external_chat_inbound_status_received_at", "status", "received_at"), + Index( + "ix_external_chat_inbound_binding_received_at", + "external_chat_binding_id", + "received_at", + ), + Index( + "ix_external_chat_inbound_request_id", + "request_id", + postgresql_where=text("request_id IS NOT NULL"), ), - Index("ix_gateway_inbound_status_received_at", "status", "received_at"), - Index("ix_gateway_inbound_binding_received_at", "binding_id", "received_at"), ) diff --git a/surfsense_backend/app/gateway/accounts.py b/surfsense_backend/app/gateway/accounts.py index 727d616c1..3e0d86e46 100644 --- a/surfsense_backend/app/gateway/accounts.py +++ b/surfsense_backend/app/gateway/accounts.py @@ -1,4 +1,4 @@ -"""Gateway account helpers.""" +"""External chat account helpers.""" from __future__ import annotations @@ -7,16 +7,16 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.db import ( - GatewayAccountMode, - GatewayHealthStatus, - GatewayPlatform, - GatewayPlatformAccount, + ExternalChatAccountMode, + ExternalChatHealthStatus, + ExternalChatPlatform, + ExternalChatAccount, ) from app.utils.oauth_security import TokenEncryption -def account_token(account: GatewayPlatformAccount) -> str | None: - if account.is_system_account and account.platform == GatewayPlatform.TELEGRAM: +def account_token(account: ExternalChatAccount) -> str | None: + if account.is_system_account and account.platform == ExternalChatPlatform.TELEGRAM: return config.TELEGRAM_SHARED_BOT_TOKEN if not account.encrypted_credentials: return None @@ -27,26 +27,24 @@ def account_token(account: GatewayPlatformAccount) -> str | None: async def get_or_create_system_telegram_account( session: AsyncSession, -) -> GatewayPlatformAccount: +) -> ExternalChatAccount: result = await session.execute( - select(GatewayPlatformAccount).where( - GatewayPlatformAccount.platform == GatewayPlatform.TELEGRAM, - GatewayPlatformAccount.is_system_account.is_(True), + select(ExternalChatAccount).where( + ExternalChatAccount.platform == ExternalChatPlatform.TELEGRAM, + ExternalChatAccount.is_system_account.is_(True), ) ) account = result.scalars().first() if account is not None: return account - account = GatewayPlatformAccount( - platform=GatewayPlatform.TELEGRAM, - mode=GatewayAccountMode.CLOUD_SHARED, + account = ExternalChatAccount( + platform=ExternalChatPlatform.TELEGRAM, + mode=ExternalChatAccountMode.CLOUD_SHARED, is_system_account=True, - account_metadata={ - "bot_username": config.TELEGRAM_SHARED_BOT_USERNAME, - "webhook_secret": config.TELEGRAM_WEBHOOK_SECRET, - }, + bot_username=config.TELEGRAM_SHARED_BOT_USERNAME, + webhook_secret=config.TELEGRAM_WEBHOOK_SECRET, cursor_state={}, - health_status=GatewayHealthStatus.UNKNOWN, + health_status=ExternalChatHealthStatus.UNKNOWN, ) session.add(account) await session.flush() diff --git a/surfsense_backend/app/gateway/auth_invariant.py b/surfsense_backend/app/gateway/auth_invariant.py index 414c69c5c..fba38f64e 100644 --- a/surfsense_backend/app/gateway/auth_invariant.py +++ b/surfsense_backend/app/gateway/auth_invariant.py @@ -1,11 +1,11 @@ -"""Authorization invariants for gateway-routed turns.""" +"""Authorization invariants for external-chat-routed turns.""" from __future__ import annotations from fastapi import HTTPException from sqlalchemy.ext.asyncio import AsyncSession -from app.db import GatewayConversationBinding, Permission, User +from app.db import ExternalChatBinding, Permission, User from app.gateway.bindings import suspend_binding from app.observability.metrics import record_gateway_auth_invariant_failure from app.utils.rbac import check_permission, check_search_space_access @@ -19,7 +19,7 @@ class GatewaySuspendedError(RuntimeError): async def _fail( session: AsyncSession, - binding: GatewayConversationBinding, + binding: ExternalChatBinding, reason: str, ) -> None: suspend_binding(binding, reason) @@ -30,7 +30,7 @@ async def _fail( async def assert_authorization_invariant( session: AsyncSession, - binding: GatewayConversationBinding, + binding: ExternalChatBinding, ) -> User: if binding.state != "bound": await _fail(session, binding, "binding_not_bound") @@ -46,7 +46,7 @@ async def assert_authorization_invariant( user, binding.search_space_id, Permission.CHATS_CREATE.value, - "Gateway owner no longer has permission to chat in this search space", + "External chat owner no longer has permission to chat in this search space", ) except HTTPException as exc: await _fail(session, binding, f"rbac_{exc.status_code}") diff --git a/surfsense_backend/app/gateway/bindings.py b/surfsense_backend/app/gateway/bindings.py index 6f2b641f7..e7205c5f1 100644 --- a/surfsense_backend/app/gateway/bindings.py +++ b/surfsense_backend/app/gateway/bindings.py @@ -1,4 +1,4 @@ -"""Gateway binding helpers.""" +"""External chat binding helpers.""" from __future__ import annotations @@ -9,19 +9,19 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import ( ChatVisibility, - GatewayBindingState, - GatewayConversationBinding, + ExternalChatBindingState, + ExternalChatBinding, NewChatThread, ) async def get_or_create_thread_for_binding( session: AsyncSession, - binding: GatewayConversationBinding, + binding: ExternalChatBinding, ) -> NewChatThread: - if binding.active_thread_id is not None: + if binding.new_chat_thread_id is not None: result = await session.execute( - select(NewChatThread).where(NewChatThread.id == binding.active_thread_id) + select(NewChatThread).where(NewChatThread.id == binding.new_chat_thread_id) ) thread = result.scalars().first() if thread is not None and not thread.archived: @@ -33,30 +33,30 @@ async def get_or_create_thread_for_binding( created_by_id=binding.user_id, visibility=ChatVisibility.PRIVATE, source="telegram", - binding_id=binding.id, + external_chat_binding_id=binding.id, ) session.add(thread) await session.flush() - binding.active_thread_id = thread.id + binding.new_chat_thread_id = thread.id return thread -def suspend_binding(binding: GatewayConversationBinding, reason: str) -> None: +def suspend_binding(binding: ExternalChatBinding, reason: str) -> None: now = datetime.now(UTC) - binding.state = GatewayBindingState.SUSPENDED + binding.state = ExternalChatBindingState.SUSPENDED binding.suspended_at = now binding.suspended_reason = reason -def revoke_binding(binding: GatewayConversationBinding) -> None: +def revoke_binding(binding: ExternalChatBinding) -> None: now = datetime.now(UTC) - binding.state = GatewayBindingState.REVOKED + binding.state = ExternalChatBindingState.REVOKED binding.revoked_at = now - binding.active_thread_id = None + binding.new_chat_thread_id = None -def resume_binding(binding: GatewayConversationBinding) -> None: - binding.state = GatewayBindingState.BOUND +def resume_binding(binding: ExternalChatBinding) -> None: + binding.state = ExternalChatBindingState.BOUND binding.suspended_at = None binding.suspended_reason = None diff --git a/surfsense_backend/app/gateway/inbox.py b/surfsense_backend/app/gateway/inbox.py index c98ee5977..9bc660b9d 100644 --- a/surfsense_backend/app/gateway/inbox.py +++ b/surfsense_backend/app/gateway/inbox.py @@ -5,7 +5,7 @@ from __future__ import annotations from sqlalchemy.dialects.postgresql import insert from sqlalchemy.ext.asyncio import AsyncSession -from app.db import GatewayInboundEvent, GatewayPlatform +from app.db import ExternalChatInboundEvent, ExternalChatPlatform def telegram_event_dedupe_key(update_id: int | str) -> str: @@ -16,15 +16,16 @@ async def persist_inbound_event( session: AsyncSession, *, account_id: int, - platform: GatewayPlatform, + platform: ExternalChatPlatform, event_dedupe_key: str, event_kind: str, raw_payload: dict, external_event_id: str | None = None, external_message_id: str | None = None, + request_id: str | None = None, ) -> int | None: stmt = ( - insert(GatewayInboundEvent) + insert(ExternalChatInboundEvent) .values( account_id=account_id, platform=platform, @@ -33,11 +34,12 @@ async def persist_inbound_event( external_message_id=external_message_id, event_kind=event_kind, raw_payload=raw_payload, + request_id=request_id, ) .on_conflict_do_nothing( index_elements=["account_id", "event_dedupe_key"], ) - .returning(GatewayInboundEvent.id) + .returning(ExternalChatInboundEvent.id) ) result = await session.execute(stmt) return result.scalar_one_or_none() diff --git a/surfsense_backend/app/gateway/pairing.py b/surfsense_backend/app/gateway/pairing.py index 55232022e..7818bed12 100644 --- a/surfsense_backend/app/gateway/pairing.py +++ b/surfsense_backend/app/gateway/pairing.py @@ -1,4 +1,4 @@ -"""Pairing code lifecycle for gateway bindings.""" +"""Pairing code lifecycle for external chat bindings.""" from __future__ import annotations @@ -8,7 +8,7 @@ from datetime import UTC, datetime, timedelta from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from app.db import GatewayBindingState, GatewayConversationBinding +from app.db import ExternalChatBindingState, ExternalChatBinding PAIRING_CODE_TTL = timedelta(minutes=10) @@ -30,19 +30,19 @@ async def redeem_pairing_code( external_display_name: str | None, external_username: str | None, external_metadata: dict | None = None, -) -> GatewayConversationBinding | None: +) -> ExternalChatBinding | None: result = await session.execute( - select(GatewayConversationBinding).where( - GatewayConversationBinding.pairing_code == code, - GatewayConversationBinding.state == GatewayBindingState.PENDING, - GatewayConversationBinding.pairing_code_expires_at > datetime.now(UTC), + select(ExternalChatBinding).where( + ExternalChatBinding.pairing_code == code, + ExternalChatBinding.state == ExternalChatBindingState.PENDING, + ExternalChatBinding.pairing_code_expires_at > datetime.now(UTC), ) ) binding = result.scalars().first() if binding is None: return None - binding.state = GatewayBindingState.BOUND + binding.state = ExternalChatBindingState.BOUND binding.pairing_code = None binding.pairing_code_expires_at = None binding.external_peer_id = external_peer_id diff --git a/surfsense_backend/tests/unit/gateway/test_pairing.py b/surfsense_backend/tests/unit/gateway/test_pairing.py index c50bd6b7c..facf908cd 100644 --- a/surfsense_backend/tests/unit/gateway/test_pairing.py +++ b/surfsense_backend/tests/unit/gateway/test_pairing.py @@ -2,7 +2,7 @@ from datetime import UTC, datetime, timedelta import pytest -from app.db import GatewayBindingState +from app.db import ExternalChatBindingState from app.gateway.pairing import generate_pairing_code, redeem_pairing_code @@ -16,7 +16,7 @@ def test_generate_pairing_code_is_short_display_token(): @pytest.mark.asyncio async def test_redeem_pairing_code_binds_pending_row(mocker): binding = mocker.Mock() - binding.state = GatewayBindingState.PENDING + binding.state = ExternalChatBindingState.PENDING binding.pairing_code_expires_at = datetime.now(UTC) + timedelta(minutes=1) scalars = mocker.Mock() scalars.first.return_value = binding @@ -35,7 +35,7 @@ async def test_redeem_pairing_code_binds_pending_row(mocker): ) assert redeemed is binding - assert binding.state == GatewayBindingState.BOUND + assert binding.state == ExternalChatBindingState.BOUND assert binding.external_peer_id == "telegram:123" assert binding.pairing_code is None