feat(gateway): add messaging gateway persistence schema

This commit is contained in:
Anish Sarkar 2026-05-27 23:34:46 +05:30
parent 69abf0d916
commit 81cf63ac96
3 changed files with 1005 additions and 0 deletions

View file

@ -14,6 +14,7 @@ from sqlalchemy import (
TIMESTAMP,
BigInteger,
Boolean,
CheckConstraint,
Column,
Enum as SQLAlchemyEnum,
ForeignKey,
@ -573,6 +574,73 @@ class ChatVisibility(StrEnum):
# PUBLIC = "PUBLIC" # Reserved for future implementation
class GatewayPlatform(StrEnum):
TELEGRAM = "telegram"
WHATSAPP = "whatsapp"
SIGNAL = "signal"
class GatewayAccountMode(StrEnum):
CLOUD_SHARED = "cloud_shared"
SELF_HOST_BYO = "self_host_byo"
class GatewayHealthStatus(StrEnum):
UNKNOWN = "unknown"
OK = "ok"
FAILING = "failing"
class GatewayBindingState(StrEnum):
PENDING = "pending"
BOUND = "bound"
REVOKED = "revoked"
SUSPENDED = "suspended"
class GatewayPeerKind(StrEnum):
DIRECT = "direct"
GROUP = "group"
CHANNEL = "channel"
UNKNOWN = "unknown"
class GatewaySessionScope(StrEnum):
PER_BINDING = "per_binding"
PER_USER_SEARCH_SPACE = "per_user_search_space"
EPHEMERAL = "ephemeral"
class GatewayDmPolicy(StrEnum):
ENABLED = "enabled"
DISABLED = "disabled"
class GatewayGroupPolicy(StrEnum):
DISABLED = "disabled"
ALLOWLIST = "allowlist"
MENTION_REQUIRED = "mention_required"
class GatewayEventKind(StrEnum):
MESSAGE = "message"
EDITED_MESSAGE = "edited_message"
CALLBACK_QUERY = "callback_query"
OTHER = "other"
class GatewayEventStatus(StrEnum):
RECEIVED = "received"
PROCESSING = "processing"
PROCESSED = "processed"
IGNORED = "ignored"
FAILED = "failed"
def _enum_values(enum_cls):
return [item.value for item in enum_cls]
class NewChatThread(BaseModel, TimestampMixin):
"""
Thread model for the new chat feature using assistant-ui.
@ -645,6 +713,16 @@ class NewChatThread(BaseModel, TimestampMixin):
# agent_llm_id changes). Unindexed: all reads are by primary key.
pinned_llm_config_id = Column(Integer, nullable=True)
# Gateway-originated threads are persisted for the agent, but the UI Zero
# publication only exposes ``source='web'`` rows.
source = Column(Text, nullable=False, default="web", server_default="web")
binding_id = Column(
BigInteger,
ForeignKey("gateway_conversation_bindings.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
# Relationships
search_space = relationship("SearchSpace", back_populates="new_chat_threads")
created_by = relationship("User", back_populates="new_chat_threads")
@ -665,6 +743,11 @@ class NewChatThread(BaseModel, TimestampMixin):
back_populates="thread",
cascade="all, delete-orphan",
)
gateway_binding = relationship(
"GatewayConversationBinding",
foreign_keys=[binding_id],
back_populates="threads",
)
class NewChatMessage(BaseModel, TimestampMixin):
@ -718,6 +801,11 @@ class NewChatMessage(BaseModel, TimestampMixin):
# a message back to the LangGraph checkpoint that produced its turn.
turn_id = Column(String(64), nullable=True, index=True)
# Mirrors the parent thread source for publication-level filtering.
# This denormalization avoids join-dependent logical replication rules.
source = Column(Text, nullable=False, default="web", server_default="web")
platform_metadata = Column(JSONB, nullable=True)
# Relationships
thread = relationship("NewChatThread", back_populates="messages")
author = relationship("User")
@ -734,6 +822,298 @@ class NewChatMessage(BaseModel, TimestampMixin):
)
class GatewayPlatformAccount(Base, TimestampMixin):
__tablename__ = "gateway_platform_accounts"
__allow_unmapped__ = True
id = Column(BigInteger, primary_key=True, index=True)
platform = Column(
SQLAlchemyEnum(
GatewayPlatform,
name="gateway_platform",
values_callable=_enum_values,
),
nullable=False,
)
mode = Column(
SQLAlchemyEnum(
GatewayAccountMode,
name="gateway_account_mode",
values_callable=_enum_values,
),
nullable=False,
)
owner_user_id = Column(
UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=True
)
owner_search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True
)
is_system_account = Column(Boolean, nullable=False, default=False, server_default="false")
encrypted_credentials = Column(Text, nullable=True)
account_metadata = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb"))
cursor_state = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb"))
health_status = Column(
SQLAlchemyEnum(
GatewayHealthStatus,
name="gateway_health_status",
values_callable=_enum_values,
),
nullable=False,
default=GatewayHealthStatus.UNKNOWN,
server_default=GatewayHealthStatus.UNKNOWN.value,
)
last_health_check_at = Column(TIMESTAMP(timezone=True), nullable=True)
suspended_at = Column(TIMESTAMP(timezone=True), nullable=True)
suspended_reason = Column(Text, nullable=True)
updated_at = Column(
TIMESTAMP(timezone=True),
nullable=False,
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
server_default=text("(now() AT TIME ZONE 'utc')"),
)
owner = relationship("User", foreign_keys=[owner_user_id])
owner_search_space = relationship("SearchSpace", foreign_keys=[owner_search_space_id])
bindings = relationship(
"GatewayConversationBinding",
back_populates="account",
cascade="all, delete-orphan",
)
inbound_events = relationship(
"GatewayInboundEvent",
back_populates="account",
cascade="all, delete-orphan",
)
__table_args__ = (
CheckConstraint(
"(is_system_account = true AND owner_user_id IS NULL) OR "
"(is_system_account = false AND owner_user_id IS NOT NULL)",
name="ck_gateway_accounts_owner_shape",
),
Index(
"uq_gateway_accounts_owner_platform",
"owner_user_id",
"platform",
unique=True,
postgresql_where=text("is_system_account = false"),
),
Index(
"uq_gateway_accounts_system_platform",
"platform",
unique=True,
postgresql_where=text("is_system_account = true"),
),
)
class GatewayConversationBinding(Base, TimestampMixin):
__tablename__ = "gateway_conversation_bindings"
__allow_unmapped__ = True
id = Column(BigInteger, primary_key=True, index=True)
account_id = Column(
BigInteger,
ForeignKey("gateway_platform_accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
user_id = Column(
UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False
)
state = Column(
SQLAlchemyEnum(
GatewayBindingState,
name="gateway_binding_state",
values_callable=_enum_values,
),
nullable=False,
default=GatewayBindingState.PENDING,
server_default=GatewayBindingState.PENDING.value,
)
pairing_code = Column(Text, nullable=True)
pairing_code_expires_at = Column(TIMESTAMP(timezone=True), nullable=True)
external_peer_id = Column(Text, nullable=True)
external_peer_kind = Column(
SQLAlchemyEnum(
GatewayPeerKind,
name="gateway_peer_kind",
values_callable=_enum_values,
),
nullable=False,
default=GatewayPeerKind.UNKNOWN,
server_default=GatewayPeerKind.UNKNOWN.value,
)
external_thread_id = Column(Text, nullable=True)
external_display_name = Column(Text, nullable=True)
external_username = Column(Text, nullable=True)
external_pii_hashes = Column(JSONB, nullable=True)
external_metadata = Column(JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb"))
active_thread_id = Column(
Integer,
ForeignKey("new_chat_threads.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
session_scope = Column(
SQLAlchemyEnum(
GatewaySessionScope,
name="gateway_session_scope",
values_callable=_enum_values,
),
nullable=False,
default=GatewaySessionScope.PER_BINDING,
server_default=GatewaySessionScope.PER_BINDING.value,
)
dm_policy = Column(
SQLAlchemyEnum(
GatewayDmPolicy,
name="gateway_dm_policy",
values_callable=_enum_values,
),
nullable=False,
default=GatewayDmPolicy.ENABLED,
server_default=GatewayDmPolicy.ENABLED.value,
)
group_policy = Column(
SQLAlchemyEnum(
GatewayGroupPolicy,
name="gateway_group_policy",
values_callable=_enum_values,
),
nullable=False,
default=GatewayGroupPolicy.DISABLED,
server_default=GatewayGroupPolicy.DISABLED.value,
)
revoked_at = Column(TIMESTAMP(timezone=True), nullable=True)
suspended_at = Column(TIMESTAMP(timezone=True), nullable=True)
suspended_reason = Column(Text, nullable=True)
updated_at = Column(
TIMESTAMP(timezone=True),
nullable=False,
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
server_default=text("(now() AT TIME ZONE 'utc')"),
)
account = relationship("GatewayPlatformAccount", back_populates="bindings")
user = relationship("User", foreign_keys=[user_id])
search_space = relationship("SearchSpace", foreign_keys=[search_space_id])
active_thread = relationship("NewChatThread", foreign_keys=[active_thread_id])
threads = relationship(
"NewChatThread",
back_populates="gateway_binding",
foreign_keys="NewChatThread.binding_id",
)
inbound_events = relationship(
"GatewayInboundEvent",
back_populates="binding",
foreign_keys="GatewayInboundEvent.binding_id",
)
__table_args__ = (
Index(
"uq_gateway_bindings_account_peer_active",
"account_id",
"external_peer_id",
unique=True,
postgresql_where=text(
"state IN ('bound', 'suspended') AND external_peer_id IS NOT NULL"
),
),
Index(
"uq_gateway_bindings_pairing_code_pending",
"pairing_code",
unique=True,
postgresql_where=text("state = 'pending'"),
),
Index("ix_gateway_bindings_user_state", "user_id", "state"),
Index("ix_gateway_bindings_search_space_state", "search_space_id", "state"),
)
class GatewayInboundEvent(Base, TimestampMixin):
__tablename__ = "gateway_inbound_events"
__allow_unmapped__ = True
id = Column(BigInteger, primary_key=True, index=True)
account_id = Column(
BigInteger,
ForeignKey("gateway_platform_accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
binding_id = Column(
BigInteger,
ForeignKey("gateway_conversation_bindings.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
platform = Column(
SQLAlchemyEnum(
GatewayPlatform,
name="gateway_platform",
values_callable=_enum_values,
),
nullable=False,
)
event_dedupe_key = Column(Text, nullable=False)
external_event_id = Column(Text, nullable=True)
external_message_id = Column(Text, nullable=True)
event_kind = Column(
SQLAlchemyEnum(
GatewayEventKind,
name="gateway_event_kind",
values_callable=_enum_values,
),
nullable=False,
)
raw_payload = Column(JSONB, nullable=True)
processing_metadata = Column(
JSONB,
nullable=False,
default=dict,
server_default=text("'{}'::jsonb"),
)
status = Column(
SQLAlchemyEnum(
GatewayEventStatus,
name="gateway_event_status",
values_callable=_enum_values,
),
nullable=False,
default=GatewayEventStatus.RECEIVED,
server_default=GatewayEventStatus.RECEIVED.value,
)
attempt_count = Column(Integer, nullable=False, default=0, server_default="0")
last_error = Column(Text, nullable=True)
received_at = Column(
TIMESTAMP(timezone=True),
nullable=False,
default=lambda: datetime.now(UTC),
server_default=text("(now() AT TIME ZONE 'utc')"),
)
processed_at = Column(TIMESTAMP(timezone=True), nullable=True)
account = relationship("GatewayPlatformAccount", back_populates="inbound_events")
binding = relationship("GatewayConversationBinding", back_populates="inbound_events")
__table_args__ = (
UniqueConstraint(
"account_id",
"event_dedupe_key",
name="uq_gateway_inbound_account_dedupe_key",
),
Index("ix_gateway_inbound_status_received_at", "status", "received_at"),
Index("ix_gateway_inbound_binding_received_at", "binding_id", "received_at"),
)
class TokenUsage(BaseModel, TimestampMixin):
"""
Tracks LLM token consumption per assistant turn.