dograh/api/alembic/versions/4735a1f0cdb3_add_queued_runs_table.py
Abhishek Kumar 4f2a629340 Initial Commit 🚀 🚀
2025-09-09 14:37:32 +05:30

94 lines
3.4 KiB
Python

"""add queued runs table
Revision ID: 4735a1f0cdb3
Revises: 08bb6e7f1397
Create Date: 2025-07-24 16:04:07.899092
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "4735a1f0cdb3"
down_revision: Union[str, None] = "08bb6e7f1397"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"queued_runs",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("campaign_id", sa.Integer(), nullable=False),
sa.Column("source_uuid", sa.String(), nullable=False),
sa.Column("context_variables", sa.JSON(), nullable=False),
sa.Column(
"state",
sa.Enum("queued", "processed", name="queued_run_state"),
nullable=False,
),
sa.Column("workflow_run_id", sa.Integer(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["campaign_id"], ["campaigns.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(
["workflow_run_id"],
["workflow_runs.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"campaign_id", "source_uuid", name="unique_campaign_source_uuid"
),
)
op.create_index(
"idx_queued_runs_campaign_state",
"queued_runs",
["campaign_id", "state"],
unique=False,
)
op.create_index(
"idx_queued_runs_created", "queued_runs", ["created_at"], unique=False
)
op.create_index(
"idx_queued_runs_source_uuid", "queued_runs", ["source_uuid"], unique=False
)
op.create_index(op.f("ix_queued_runs_id"), "queued_runs", ["id"], unique=False)
op.add_column(
"campaigns", sa.Column("rate_limit_per_second", sa.Integer(), nullable=False)
)
op.add_column("campaigns", sa.Column("max_retries", sa.Integer(), nullable=False))
op.add_column(
"campaigns", sa.Column("source_sync_status", sa.String(), nullable=False)
)
op.add_column(
"campaigns",
sa.Column("source_last_synced_at", sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
"campaigns", sa.Column("source_sync_error", sa.String(), nullable=True)
)
# Add syncing in the campaign_state
op.execute(
"ALTER TYPE campaign_state ADD VALUE IF NOT EXISTS 'syncing' AFTER 'created';"
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("campaigns", "source_sync_error")
op.drop_column("campaigns", "source_last_synced_at")
op.drop_column("campaigns", "source_sync_status")
op.drop_column("campaigns", "max_retries")
op.drop_column("campaigns", "rate_limit_per_second")
op.drop_index(op.f("ix_queued_runs_id"), table_name="queued_runs")
op.drop_index("idx_queued_runs_source_uuid", table_name="queued_runs")
op.drop_index("idx_queued_runs_created", table_name="queued_runs")
op.drop_index("idx_queued_runs_campaign_state", table_name="queued_runs")
op.drop_table("queued_runs")
# ### end Alembic commands ###