Merge pull request #692 from CREDO23/feat/idempotent-migrations

[Improvement] Make Alembic migrations idempotent
This commit is contained in:
Rohan Verma 2026-01-14 13:16:03 -08:00 committed by GitHub
commit cee1a8be68
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 322 additions and 105 deletions

View file

@ -0,0 +1,54 @@
"""Initial schema setup
Revision ID: 0
Revises: None
Creates all tables from SQLAlchemy models. Idempotent - safe to run on existing databases.
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "0"
down_revision: str | None = None
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
from app.db import Base
connection = op.get_bind()
# Create tables
op.execute(sa.text("CREATE EXTENSION IF NOT EXISTS vector"))
Base.metadata.create_all(bind=connection)
# Set up indexes
op.execute(
sa.text(
"CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)"
)
)
op.execute(
sa.text(
"CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))"
)
)
op.execute(
sa.text(
"CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)"
)
)
op.execute(
sa.text(
"CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))"
)
)
def downgrade() -> None:
pass

View file

@ -6,6 +6,8 @@ Revises: 9
from collections.abc import Sequence from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op from alembic import op
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
@ -18,9 +20,37 @@ depends_on: str | Sequence[str] | None = None
CHAT_TYPE_ENUM = "chattype" CHAT_TYPE_ENUM = "chattype"
def enum_exists(enum_name: str) -> bool:
"""Check if an enum type exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM pg_type WHERE typname = :enum_name)"
),
{"enum_name": enum_name},
)
return result.scalar()
def table_exists(table_name: str) -> bool:
"""Check if a table exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = :table_name)"
),
{"table_name": table_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
"""Upgrade schema - replace ChatType enum values with new QNA/REPORT structure.""" """Upgrade schema - replace ChatType enum values with new QNA/REPORT structure."""
# Skip if chats table or chattype enum doesn't exist (fresh database)
if not table_exists("chats") or not enum_exists(CHAT_TYPE_ENUM):
return
# Old enum name for temporary storage # Old enum name for temporary storage
old_enum_name = f"{CHAT_TYPE_ENUM}_old" old_enum_name = f"{CHAT_TYPE_ENUM}_old"
@ -72,6 +102,10 @@ def upgrade() -> None:
def downgrade() -> None: def downgrade() -> None:
"""Downgrade schema - revert ChatType enum to old GENERAL/DEEP/DEEPER/DEEPEST structure.""" """Downgrade schema - revert ChatType enum to old GENERAL/DEEP/DEEPER/DEEPEST structure."""
# Skip if chats table or chattype enum doesn't exist
if not table_exists("chats") or not enum_exists(CHAT_TYPE_ENUM):
return
# Old enum name for temporary storage # Old enum name for temporary storage
old_enum_name = f"{CHAT_TYPE_ENUM}_old" old_enum_name = f"{CHAT_TYPE_ENUM}_old"

View file

@ -7,22 +7,36 @@ Revises:
from collections.abc import Sequence from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op from alembic import op
# Import pgvector if needed for other types, though not for this ENUM change
# import pgvector
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision: str = "1" revision: str = "1"
down_revision: str | None = None down_revision: str | None = "0"
branch_labels: str | Sequence[str] | None = None branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None
def enum_exists(enum_name: str) -> bool:
"""Check if an enum type exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM pg_type WHERE typname = :enum_name)"
),
{"enum_name": enum_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
# Skip if the enum doesn't exist (fresh DB after downgrade - create_db_and_tables will handle it)
if not enum_exists("searchsourceconnectortype"):
return
# Manually add the command to add the enum value # Manually add the command to add the enum value
# Note: It's generally better to let autogenerate handle this, but we're bypassing it # Note: It's generally better to let autogenerate handle this, but we're bypassing it
op.execute( op.execute(
@ -51,6 +65,10 @@ END$$;
def downgrade() -> None: def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
# Skip if the enum doesn't exist
if not enum_exists("searchsourceconnectortype"):
return
# Downgrading removal of an enum value is complex and potentially dangerous # Downgrading removal of an enum value is complex and potentially dangerous
# if the value is in use. Often omitted or requires manual SQL based on context. # if the value is in use. Often omitted or requires manual SQL based on context.
# For now, we'll just pass. If you needed to reverse this, you'd likely # For now, we'll just pass. If you needed to reverse this, you'd likely

View file

@ -7,6 +7,8 @@ Revises: 23
from collections.abc import Sequence from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op from alembic import op
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
@ -16,11 +18,27 @@ branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None
def table_exists(table_name: str) -> bool:
"""Check if a table exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = :table_name)"
),
{"table_name": table_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
""" """
Fix any chats with NULL type values by setting them to QNA. Fix any chats with NULL type values by setting them to QNA.
This handles edge cases from previous migrations where type values were not properly migrated. This handles edge cases from previous migrations where type values were not properly migrated.
""" """
# Skip if chats table doesn't exist (fresh database)
if not table_exists("chats"):
return
# Update any NULL type values to QNA (the default chat type) # Update any NULL type values to QNA (the default chat type)
op.execute( op.execute(
""" """

View file

@ -10,6 +10,8 @@ Revises: 33
from collections.abc import Sequence from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op from alembic import op
# revision identifiers # revision identifiers
@ -19,42 +21,59 @@ branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None
def table_exists(table_name: str) -> bool:
"""Check if a table exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = :table_name)"
),
{"table_name": table_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
"""Add columns only if they don't already exist (safe for re-runs).""" """Add columns only if they don't already exist (safe for re-runs)."""
# Add 'state_version' column to chats table (default 1) # Add 'state_version' column to chats table (default 1)
op.execute(""" # Skip if chats table doesn't exist (fresh database)
ALTER TABLE chats if table_exists("chats"):
ADD COLUMN IF NOT EXISTS state_version BIGINT DEFAULT 1 NOT NULL op.execute("""
""") ALTER TABLE chats
ADD COLUMN IF NOT EXISTS state_version BIGINT DEFAULT 1 NOT NULL
""")
# Add 'chat_state_version' column to podcasts table # Add 'chat_state_version' column to podcasts table
op.execute(""" if table_exists("podcasts"):
ALTER TABLE podcasts op.execute("""
ADD COLUMN IF NOT EXISTS chat_state_version BIGINT ALTER TABLE podcasts
""") ADD COLUMN IF NOT EXISTS chat_state_version BIGINT
""")
# Add 'chat_id' column to podcasts table # Add 'chat_id' column to podcasts table
op.execute(""" op.execute("""
ALTER TABLE podcasts ALTER TABLE podcasts
ADD COLUMN IF NOT EXISTS chat_id INTEGER ADD COLUMN IF NOT EXISTS chat_id INTEGER
""") """)
def downgrade() -> None: def downgrade() -> None:
"""Remove columns only if they exist.""" """Remove columns only if they exist."""
op.execute(""" if table_exists("podcasts"):
ALTER TABLE podcasts op.execute("""
DROP COLUMN IF EXISTS chat_state_version ALTER TABLE podcasts
""") DROP COLUMN IF EXISTS chat_state_version
""")
op.execute(""" op.execute("""
ALTER TABLE podcasts ALTER TABLE podcasts
DROP COLUMN IF EXISTS chat_id DROP COLUMN IF EXISTS chat_id
""") """)
op.execute(""" if table_exists("chats"):
ALTER TABLE chats op.execute("""
DROP COLUMN IF EXISTS state_version ALTER TABLE chats
""") DROP COLUMN IF EXISTS state_version
""")

View file

@ -62,8 +62,25 @@ def parse_timestamp(ts, fallback):
return fallback return fallback
def table_exists(table_name: str) -> bool:
"""Check if a table exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = :table_name)"
),
{"table_name": table_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
"""Migrate old chats to new_chat_threads and remove old tables.""" """Migrate old chats to new_chat_threads and remove old tables."""
# Skip if chats table doesn't exist (fresh database)
if not table_exists("chats"):
print("[Migration 49] Chats table does not exist, skipping migration")
return
connection = op.get_bind() connection = op.get_bind()
# Get all old chats # Get all old chats
@ -176,36 +193,49 @@ def upgrade() -> None:
print("[Migration 49] Migration complete!") print("[Migration 49] Migration complete!")
def enum_exists(enum_name: str) -> bool:
"""Check if an enum type exists in the database."""
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT EXISTS (SELECT 1 FROM pg_type WHERE typname = :enum_name)"
),
{"enum_name": enum_name},
)
return result.scalar()
def downgrade() -> None: def downgrade() -> None:
"""Recreate old chats table (data cannot be restored).""" """Recreate old chats table (data cannot be restored)."""
# Recreate chattype enum # Skip if chats table already exists
if table_exists("chats"):
print("[Migration 49 Downgrade] Chats table already exists, skipping")
return
# Recreate chattype enum if it doesn't exist
if not enum_exists("chattype"):
op.execute(
sa.text("""
CREATE TYPE chattype AS ENUM ('QNA')
""")
)
# Recreate chats table using raw SQL to avoid SQLAlchemy trying to create the enum
op.execute( op.execute(
sa.text(""" sa.text("""
CREATE TYPE chattype AS ENUM ('QNA') CREATE TABLE chats (
id SERIAL PRIMARY KEY,
type chattype NOT NULL,
title VARCHAR NOT NULL,
initial_connectors VARCHAR[],
messages JSON NOT NULL,
state_version BIGINT NOT NULL DEFAULT 1,
search_space_id INTEGER NOT NULL REFERENCES searchspaces(id) ON DELETE CASCADE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""") """)
) )
op.execute(sa.text("CREATE INDEX ix_chats_id ON chats (id)"))
# Recreate chats table op.execute(sa.text("CREATE INDEX ix_chats_title ON chats (title)"))
op.create_table(
"chats",
sa.Column("id", sa.Integer(), primary_key=True, index=True),
sa.Column("type", sa.Enum("QNA", name="chattype"), nullable=False),
sa.Column("title", sa.String(), nullable=False, index=True),
sa.Column("initial_connectors", sa.ARRAY(sa.String()), nullable=True),
sa.Column("messages", sa.JSON(), nullable=False),
sa.Column("state_version", sa.BigInteger(), nullable=False, default=1),
sa.Column(
"search_space_id",
sa.Integer(),
sa.ForeignKey("searchspaces.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
print("[Migration 49 Downgrade] Chats table recreated (data not restored)") print("[Migration 49 Downgrade] Chats table recreated (data not restored)")

View file

@ -39,7 +39,7 @@ def upgrade():
""" """
) )
# Rename columns (only if they exist with old names) # Rename columns (only if source exists and target doesn't already exist)
op.execute( op.execute(
""" """
DO $$ DO $$
@ -47,6 +47,9 @@ def upgrade():
IF EXISTS ( IF EXISTS (
SELECT 1 FROM information_schema.columns SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'fast_llm_id' WHERE table_name = 'searchspaces' AND column_name = 'fast_llm_id'
) AND NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'agent_llm_id'
) THEN ) THEN
ALTER TABLE searchspaces RENAME COLUMN fast_llm_id TO agent_llm_id; ALTER TABLE searchspaces RENAME COLUMN fast_llm_id TO agent_llm_id;
END IF; END IF;
@ -61,6 +64,9 @@ def upgrade():
IF EXISTS ( IF EXISTS (
SELECT 1 FROM information_schema.columns SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'long_context_llm_id' WHERE table_name = 'searchspaces' AND column_name = 'long_context_llm_id'
) AND NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'document_summary_llm_id'
) THEN ) THEN
ALTER TABLE searchspaces RENAME COLUMN long_context_llm_id TO document_summary_llm_id; ALTER TABLE searchspaces RENAME COLUMN long_context_llm_id TO document_summary_llm_id;
END IF; END IF;
@ -100,7 +106,7 @@ def downgrade():
""" """
) )
# Rename columns back # Rename columns back (only if source exists and target doesn't already exist)
op.execute( op.execute(
""" """
DO $$ DO $$
@ -108,6 +114,9 @@ def downgrade():
IF EXISTS ( IF EXISTS (
SELECT 1 FROM information_schema.columns SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'agent_llm_id' WHERE table_name = 'searchspaces' AND column_name = 'agent_llm_id'
) AND NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'fast_llm_id'
) THEN ) THEN
ALTER TABLE searchspaces RENAME COLUMN agent_llm_id TO fast_llm_id; ALTER TABLE searchspaces RENAME COLUMN agent_llm_id TO fast_llm_id;
END IF; END IF;
@ -122,6 +131,9 @@ def downgrade():
IF EXISTS ( IF EXISTS (
SELECT 1 FROM information_schema.columns SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'document_summary_llm_id' WHERE table_name = 'searchspaces' AND column_name = 'document_summary_llm_id'
) AND NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'searchspaces' AND column_name = 'long_context_llm_id'
) THEN ) THEN
ALTER TABLE searchspaces RENAME COLUMN document_summary_llm_id TO long_context_llm_id; ALTER TABLE searchspaces RENAME COLUMN document_summary_llm_id TO long_context_llm_id;
END IF; END IF;

View file

@ -60,14 +60,28 @@ def downgrade() -> None:
connection = op.get_bind() connection = op.get_bind()
connection.execute( # Only update if the target enum value exists (it won't on fresh databases)
result = connection.execute(
text( text(
""" """
UPDATE documents SELECT EXISTS (
SET document_type = 'GOOGLE_DRIVE_CONNECTOR' SELECT 1 FROM pg_type t
WHERE document_type = 'GOOGLE_DRIVE_FILE'; JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_DRIVE_CONNECTOR'
);
""" """
) )
) )
enum_exists = result.scalar()
connection.commit() if enum_exists:
connection.execute(
text(
"""
UPDATE documents
SET document_type = 'GOOGLE_DRIVE_CONNECTOR'
WHERE document_type = 'GOOGLE_DRIVE_FILE';
"""
)
)
connection.commit()

View file

@ -18,59 +18,77 @@ branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None
def upgrade() -> None: def table_exists(table_name: str) -> bool:
# Alter Chat table """Check if a table exists in the database."""
op.alter_column( conn = op.get_bind()
"chats", result = conn.execute(
"title", sa.text(
existing_type=sa.String(200), "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = :table_name)"
type_=sa.String(), ),
existing_nullable=False, {"table_name": table_name},
) )
return result.scalar()
def upgrade() -> None:
# Alter Chat table (may not exist on fresh databases, removed in migration 49)
if table_exists("chats"):
op.alter_column(
"chats",
"title",
existing_type=sa.String(200),
type_=sa.String(),
existing_nullable=False,
)
# Alter Document table # Alter Document table
op.alter_column( if table_exists("documents"):
"documents", op.alter_column(
"title", "documents",
existing_type=sa.String(200), "title",
type_=sa.String(), existing_type=sa.String(200),
existing_nullable=False, type_=sa.String(),
) existing_nullable=False,
)
# Alter Podcast table # Alter Podcast table
op.alter_column( if table_exists("podcasts"):
"podcasts", op.alter_column(
"title", "podcasts",
existing_type=sa.String(200), "title",
type_=sa.String(), existing_type=sa.String(200),
existing_nullable=False, type_=sa.String(),
) existing_nullable=False,
)
def downgrade() -> None: def downgrade() -> None:
# Revert Chat table # Revert Chat table
op.alter_column( if table_exists("chats"):
"chats", op.alter_column(
"title", "chats",
existing_type=sa.String(), "title",
type_=sa.String(200), existing_type=sa.String(),
existing_nullable=False, type_=sa.String(200),
) existing_nullable=False,
)
# Revert Document table # Revert Document table
op.alter_column( if table_exists("documents"):
"documents", op.alter_column(
"title", "documents",
existing_type=sa.String(), "title",
type_=sa.String(200), existing_type=sa.String(),
existing_nullable=False, type_=sa.String(200),
) existing_nullable=False,
)
# Revert Podcast table # Revert Podcast table
op.alter_column( if table_exists("podcasts"):
"podcasts", op.alter_column(
"title", "podcasts",
existing_type=sa.String(), "title",
type_=sa.String(200), existing_type=sa.String(),
existing_nullable=False, type_=sa.String(200),
) existing_nullable=False,
)