refactor(gateway): rename persistence models to external chat

This commit is contained in:
Anish Sarkar 2026-05-28 04:37:27 +05:30
parent f2d82234d4
commit a57b741d5e
8 changed files with 274 additions and 323 deletions

View file

@ -1,20 +1,21 @@
"""add gateway tables for Telegram messaging gateway
"""add external chat surface tables
Revision ID: 144
Revises: 143
Create Date: 2026-05-27
Adds the lean v6 gateway schema:
Adds the lean external chat surface schema:
* gateway_platform_accounts
* gateway_conversation_bindings
* gateway_inbound_events
* external_chat_accounts
* external_chat_bindings
* external_chat_inbound_events
The gateway stores Telegram-originated conversations in the existing chat
tables but keeps them out of UI replication. This migration adds ``source`` to
``new_chat_messages`` as a denormalized Zero publication boundary and publishes
only ``source = 'web'`` rows. Gateway control-plane tables are served through
REST in v1, so they are intentionally not added to ``zero_publication``.
External chat surfaces store Telegram-originated conversations in the existing
chat tables. This migration adds ``source`` to ``new_chat_threads`` and
``new_chat_messages`` as UI metadata while publishing all chat-message sources
through Zero so a future SurfSense UI layer can render external chats. External
chat adapter tables are served through REST in v1, so they are intentionally not
added to ``zero_publication``.
"""
from __future__ import annotations
@ -146,7 +147,7 @@ def _build_set_table_ddl(
f"documents ({_cols(doc_cols)}), "
f"folders, "
f"search_source_connectors, "
f"new_chat_messages WHERE (source = 'web'), "
f"new_chat_messages, "
f"chat_comments, "
f"chat_session_state, "
f'"user" ({_cols(user_cols)})'
@ -161,53 +162,41 @@ def _create_enum(name: str, values: tuple[str, ...]) -> postgresql.ENUM:
def upgrade() -> None:
conn = op.get_bind()
gateway_platform_enum = _create_enum(
"gateway_platform", ("telegram", "whatsapp", "signal")
external_chat_platform_enum = _create_enum(
"external_chat_platform", ("telegram", "whatsapp", "signal")
)
gateway_account_mode_enum = _create_enum(
"gateway_account_mode", ("cloud_shared", "self_host_byo")
external_chat_account_mode_enum = _create_enum(
"external_chat_account_mode", ("cloud_shared", "self_host_byo")
)
gateway_health_status_enum = _create_enum(
"gateway_health_status", ("unknown", "ok", "failing")
external_chat_health_status_enum = _create_enum(
"external_chat_health_status", ("unknown", "ok", "failing")
)
gateway_binding_state_enum = _create_enum(
"gateway_binding_state", ("pending", "bound", "revoked", "suspended")
external_chat_binding_state_enum = _create_enum(
"external_chat_binding_state", ("pending", "bound", "revoked", "suspended")
)
gateway_peer_kind_enum = _create_enum(
"gateway_peer_kind", ("direct", "group", "channel", "unknown")
external_chat_peer_kind_enum = _create_enum(
"external_chat_peer_kind", ("direct", "group", "channel", "unknown")
)
gateway_session_scope_enum = _create_enum(
"gateway_session_scope",
("per_binding", "per_user_search_space", "ephemeral"),
external_chat_event_kind_enum = _create_enum(
"external_chat_event_kind", ("message", "edited_message", "callback_query", "other")
)
gateway_dm_policy_enum = _create_enum("gateway_dm_policy", ("enabled", "disabled"))
gateway_group_policy_enum = _create_enum(
"gateway_group_policy", ("disabled", "allowlist", "mention_required")
)
gateway_event_kind_enum = _create_enum(
"gateway_event_kind", ("message", "edited_message", "callback_query", "other")
)
gateway_event_status_enum = _create_enum(
"gateway_event_status",
external_chat_event_status_enum = _create_enum(
"external_chat_event_status",
("received", "processing", "processed", "ignored", "failed"),
)
if not _table_exists(conn, "gateway_platform_accounts"):
if not _table_exists(conn, "external_chat_accounts"):
op.create_table(
"gateway_platform_accounts",
"external_chat_accounts",
sa.Column("id", sa.BigInteger(), primary_key=True),
sa.Column("platform", gateway_platform_enum, nullable=False),
sa.Column("mode", gateway_account_mode_enum, nullable=False),
sa.Column("platform", external_chat_platform_enum, nullable=False),
sa.Column("mode", external_chat_account_mode_enum, nullable=False),
sa.Column("owner_user_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("owner_search_space_id", sa.Integer(), nullable=True),
sa.Column("is_system_account", sa.Boolean(), nullable=False, server_default="false"),
sa.Column("encrypted_credentials", sa.Text(), nullable=True),
sa.Column(
"account_metadata",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column("bot_username", sa.String(255), nullable=True),
sa.Column("webhook_secret", sa.String(64), nullable=True),
sa.Column(
"cursor_state",
postgresql.JSONB(astext_type=sa.Text()),
@ -216,7 +205,7 @@ def upgrade() -> None:
),
sa.Column(
"health_status",
gateway_health_status_enum,
external_chat_health_status_enum,
nullable=False,
server_default="unknown",
),
@ -238,7 +227,7 @@ def upgrade() -> None:
sa.CheckConstraint(
"(is_system_account = true AND owner_user_id IS NULL) OR "
"(is_system_account = false AND owner_user_id IS NOT NULL)",
name="ck_gateway_accounts_owner_shape",
name="ck_external_chat_accounts_owner_shape",
),
sa.ForeignKeyConstraint(["owner_user_id"], ["user.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(
@ -246,32 +235,40 @@ def upgrade() -> None:
),
)
op.create_index(
"uq_gateway_accounts_owner_platform",
"gateway_platform_accounts",
"uq_external_chat_accounts_owner_platform",
"external_chat_accounts",
["owner_user_id", "platform"],
unique=True,
postgresql_where=sa.text("is_system_account = false"),
if_not_exists=True,
)
op.create_index(
"uq_gateway_accounts_system_platform",
"gateway_platform_accounts",
"uq_external_chat_accounts_system_platform",
"external_chat_accounts",
["platform"],
unique=True,
postgresql_where=sa.text("is_system_account = true"),
if_not_exists=True,
)
op.create_index(
"uq_external_chat_accounts_webhook_secret",
"external_chat_accounts",
["webhook_secret"],
unique=True,
postgresql_where=sa.text("webhook_secret IS NOT NULL"),
if_not_exists=True,
)
if not _table_exists(conn, "gateway_conversation_bindings"):
if not _table_exists(conn, "external_chat_bindings"):
op.create_table(
"gateway_conversation_bindings",
"external_chat_bindings",
sa.Column("id", sa.BigInteger(), primary_key=True),
sa.Column("account_id", sa.BigInteger(), nullable=False),
sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("search_space_id", sa.Integer(), nullable=False),
sa.Column(
"state",
gateway_binding_state_enum,
external_chat_binding_state_enum,
nullable=False,
server_default="pending",
),
@ -280,39 +277,25 @@ def upgrade() -> None:
sa.Column("external_peer_id", sa.Text(), nullable=True),
sa.Column(
"external_peer_kind",
gateway_peer_kind_enum,
external_chat_peer_kind_enum,
nullable=False,
server_default="unknown",
),
sa.Column("external_thread_id", sa.Text(), nullable=True),
sa.Column(
"external_thread_id",
sa.Text(),
nullable=True,
comment="Reserved for Telegram message_thread_id when group/forum support lands.",
),
sa.Column("external_display_name", sa.Text(), nullable=True),
sa.Column("external_username", sa.Text(), nullable=True),
sa.Column("external_pii_hashes", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column(
"external_metadata",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column("active_thread_id", sa.Integer(), nullable=True),
sa.Column(
"session_scope",
gateway_session_scope_enum,
nullable=False,
server_default="per_binding",
),
sa.Column(
"dm_policy",
gateway_dm_policy_enum,
nullable=False,
server_default="enabled",
),
sa.Column(
"group_policy",
gateway_group_policy_enum,
nullable=False,
server_default="disabled",
),
sa.Column("new_chat_thread_id", sa.Integer(), nullable=True),
sa.Column("revoked_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("suspended_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("suspended_reason", sa.Text(), nullable=True),
@ -329,17 +312,17 @@ def upgrade() -> None:
server_default=sa.text("(now() AT TIME ZONE 'utc')"),
),
sa.ForeignKeyConstraint(
["account_id"], ["gateway_platform_accounts.id"], ondelete="CASCADE"
["account_id"], ["external_chat_accounts.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["search_space_id"], ["searchspaces.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(
["active_thread_id"], ["new_chat_threads.id"], ondelete="SET NULL"
["new_chat_thread_id"], ["new_chat_threads.id"], ondelete="SET NULL"
),
)
op.create_index(
"uq_gateway_bindings_account_peer_active",
"gateway_conversation_bindings",
"uq_external_chat_bindings_account_peer_active",
"external_chat_bindings",
["account_id", "external_peer_id"],
unique=True,
postgresql_where=sa.text(
@ -348,51 +331,46 @@ def upgrade() -> None:
if_not_exists=True,
)
op.create_index(
"uq_gateway_bindings_pairing_code_pending",
"gateway_conversation_bindings",
"uq_external_chat_bindings_pairing_code_pending",
"external_chat_bindings",
["pairing_code"],
unique=True,
postgresql_where=sa.text("state = 'pending'"),
if_not_exists=True,
)
op.create_index(
"ix_gateway_bindings_user_state",
"gateway_conversation_bindings",
"ix_external_chat_bindings_user_state",
"external_chat_bindings",
["user_id", "state"],
if_not_exists=True,
)
op.create_index(
"ix_gateway_bindings_search_space_state",
"gateway_conversation_bindings",
"ix_external_chat_bindings_search_space_state",
"external_chat_bindings",
["search_space_id", "state"],
if_not_exists=True,
)
if not _table_exists(conn, "gateway_inbound_events"):
if not _table_exists(conn, "external_chat_inbound_events"):
op.create_table(
"gateway_inbound_events",
"external_chat_inbound_events",
sa.Column("id", sa.BigInteger(), primary_key=True),
sa.Column("account_id", sa.BigInteger(), nullable=False),
sa.Column("binding_id", sa.BigInteger(), nullable=True),
sa.Column("platform", gateway_platform_enum, nullable=False),
sa.Column("external_chat_binding_id", sa.BigInteger(), nullable=True),
sa.Column("platform", external_chat_platform_enum, nullable=False),
sa.Column("event_dedupe_key", sa.Text(), nullable=False),
sa.Column("external_event_id", sa.Text(), nullable=True),
sa.Column("external_message_id", sa.Text(), nullable=True),
sa.Column("event_kind", gateway_event_kind_enum, nullable=False),
sa.Column("event_kind", external_chat_event_kind_enum, nullable=False),
sa.Column(
"raw_payload",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
sa.Column(
"processing_metadata",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column("request_id", sa.String(64), nullable=True),
sa.Column(
"status",
gateway_event_status_enum,
external_chat_event_status_enum,
nullable=False,
server_default="received",
),
@ -412,27 +390,34 @@ def upgrade() -> None:
server_default=sa.text("(now() AT TIME ZONE 'utc')"),
),
sa.ForeignKeyConstraint(
["account_id"], ["gateway_platform_accounts.id"], ondelete="CASCADE"
["account_id"], ["external_chat_accounts.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["binding_id"], ["gateway_conversation_bindings.id"], ondelete="SET NULL"
["external_chat_binding_id"], ["external_chat_bindings.id"], ondelete="SET NULL"
),
sa.UniqueConstraint(
"account_id",
"event_dedupe_key",
name="uq_gateway_inbound_account_dedupe_key",
name="uq_external_chat_inbound_account_dedupe_key",
),
)
op.create_index(
"ix_gateway_inbound_status_received_at",
"gateway_inbound_events",
"ix_external_chat_inbound_status_received_at",
"external_chat_inbound_events",
["status", "received_at"],
if_not_exists=True,
)
op.create_index(
"ix_gateway_inbound_binding_received_at",
"gateway_inbound_events",
["binding_id", "received_at"],
"ix_external_chat_inbound_binding_received_at",
"external_chat_inbound_events",
["external_chat_binding_id", "received_at"],
if_not_exists=True,
)
op.create_index(
"ix_external_chat_inbound_request_id",
"external_chat_inbound_events",
["request_id"],
postgresql_where=sa.text("request_id IS NOT NULL"),
if_not_exists=True,
)
@ -442,27 +427,27 @@ def upgrade() -> None:
sa.Column("source", sa.Text(), nullable=False, server_default="web"),
)
op.alter_column("new_chat_threads", "source", type_=sa.Text())
if not _column_exists(conn, "new_chat_threads", "binding_id"):
if not _column_exists(conn, "new_chat_threads", "external_chat_binding_id"):
op.add_column(
"new_chat_threads",
sa.Column("binding_id", sa.BigInteger(), nullable=True),
sa.Column("external_chat_binding_id", sa.BigInteger(), nullable=True),
)
if not _constraint_exists(
conn, "new_chat_threads", "fk_new_chat_threads_gateway_binding_id"
conn, "new_chat_threads", "fk_new_chat_threads_external_chat_external_chat_binding_id"
):
op.create_foreign_key(
"fk_new_chat_threads_gateway_binding_id",
"fk_new_chat_threads_external_chat_external_chat_binding_id",
"new_chat_threads",
"gateway_conversation_bindings",
["binding_id"],
"external_chat_bindings",
["external_chat_binding_id"],
["id"],
ondelete="SET NULL",
)
op.create_index("ix_new_chat_threads_source", "new_chat_threads", ["source"], if_not_exists=True)
op.create_index(
"ix_new_chat_threads_binding_id",
"ix_new_chat_threads_external_chat_binding_id",
"new_chat_threads",
["binding_id"],
["external_chat_binding_id"],
if_not_exists=True,
)
@ -510,7 +495,9 @@ def upgrade() -> None:
tx = conn.begin_nested() if conn.in_transaction() else conn.begin()
with tx:
conn.execute(
sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-144-gateway'")
sa.text(
f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-144-external-chat'"
)
)
conn.execute(
sa.text(
@ -521,7 +508,9 @@ def upgrade() -> None:
)
)
conn.execute(
sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-144-gateway'")
sa.text(
f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-144-external-chat'"
)
)
@ -565,59 +554,58 @@ def downgrade() -> None:
_drop_column_if_exists("new_chat_messages", "platform_metadata")
_drop_column_if_exists("new_chat_messages", "source")
_drop_index_if_exists("ix_new_chat_threads_binding_id", "new_chat_threads")
_drop_index_if_exists("ix_new_chat_threads_external_chat_binding_id", "new_chat_threads")
_drop_index_if_exists("ix_new_chat_threads_source", "new_chat_threads")
if _constraint_exists(
conn, "new_chat_threads", "fk_new_chat_threads_gateway_binding_id"
conn, "new_chat_threads", "fk_new_chat_threads_external_chat_external_chat_binding_id"
):
op.drop_constraint(
"fk_new_chat_threads_gateway_binding_id",
"fk_new_chat_threads_external_chat_external_chat_binding_id",
"new_chat_threads",
type_="foreignkey",
)
_drop_column_if_exists("new_chat_threads", "binding_id")
_drop_column_if_exists("new_chat_threads", "external_chat_binding_id")
_drop_column_if_exists("new_chat_threads", "source")
_drop_index_if_exists(
"ix_gateway_inbound_binding_received_at", "gateway_inbound_events"
"ix_external_chat_inbound_binding_received_at", "external_chat_inbound_events"
)
_drop_index_if_exists("ix_gateway_inbound_status_received_at", "gateway_inbound_events")
if _table_exists(conn, "gateway_inbound_events"):
op.drop_table("gateway_inbound_events")
_drop_index_if_exists("ix_external_chat_inbound_request_id", "external_chat_inbound_events")
_drop_index_if_exists("ix_external_chat_inbound_status_received_at", "external_chat_inbound_events")
if _table_exists(conn, "external_chat_inbound_events"):
op.drop_table("external_chat_inbound_events")
_drop_index_if_exists(
"ix_gateway_bindings_search_space_state",
"gateway_conversation_bindings",
"ix_external_chat_bindings_search_space_state",
"external_chat_bindings",
)
_drop_index_if_exists(
"ix_gateway_bindings_user_state", "gateway_conversation_bindings"
"ix_external_chat_bindings_user_state", "external_chat_bindings"
)
_drop_index_if_exists(
"uq_gateway_bindings_pairing_code_pending",
"gateway_conversation_bindings",
"uq_external_chat_bindings_pairing_code_pending",
"external_chat_bindings",
)
_drop_index_if_exists(
"uq_gateway_bindings_account_peer_active",
"gateway_conversation_bindings",
"uq_external_chat_bindings_account_peer_active",
"external_chat_bindings",
)
if _table_exists(conn, "gateway_conversation_bindings"):
op.drop_table("gateway_conversation_bindings")
if _table_exists(conn, "external_chat_bindings"):
op.drop_table("external_chat_bindings")
_drop_index_if_exists("uq_gateway_accounts_system_platform", "gateway_platform_accounts")
_drop_index_if_exists("uq_gateway_accounts_owner_platform", "gateway_platform_accounts")
if _table_exists(conn, "gateway_platform_accounts"):
op.drop_table("gateway_platform_accounts")
_drop_index_if_exists("uq_external_chat_accounts_system_platform", "external_chat_accounts")
_drop_index_if_exists("uq_external_chat_accounts_owner_platform", "external_chat_accounts")
_drop_index_if_exists("uq_external_chat_accounts_webhook_secret", "external_chat_accounts")
if _table_exists(conn, "external_chat_accounts"):
op.drop_table("external_chat_accounts")
for enum_name in (
"gateway_event_status",
"gateway_event_kind",
"gateway_group_policy",
"gateway_dm_policy",
"gateway_session_scope",
"gateway_peer_kind",
"gateway_binding_state",
"gateway_health_status",
"gateway_account_mode",
"gateway_platform",
"external_chat_event_status",
"external_chat_event_kind",
"external_chat_peer_kind",
"external_chat_binding_state",
"external_chat_health_status",
"external_chat_account_mode",
"external_chat_platform",
):
postgresql.ENUM(name=enum_name).drop(conn, checkfirst=True)