Tidy alembic migration version scripts

This commit is contained in:
CREDO23 2026-04-24 18:49:22 +02:00
parent d1080b1298
commit dfa6c0423d
2 changed files with 55 additions and 39 deletions

View file

@ -79,40 +79,44 @@ def _terminate_blocked_pids(conn, table: str) -> None:
def upgrade() -> None: def upgrade() -> None:
conn = op.get_bind() conn = op.get_bind()
# asyncpg requires LOCK TABLE inside a transaction block. Alembic already
# opened one via context.begin_transaction(), but the driver still errors
# unless we use an explicit SAVEPOINT (nested transaction) for this block.
tx = conn.begin_nested() if conn.in_transaction() else conn.begin()
with tx:
conn.execute(sa.text("SET lock_timeout = '10s'"))
conn.execute(sa.text("SET lock_timeout = '10s'")) for tbl in sorted(TABLES_WITH_FULL_IDENTITY):
_terminate_blocked_pids(conn, tbl)
conn.execute(sa.text(f'LOCK TABLE "{tbl}" IN ACCESS EXCLUSIVE MODE'))
for tbl in sorted(TABLES_WITH_FULL_IDENTITY): for tbl in TABLES_WITH_FULL_IDENTITY:
_terminate_blocked_pids(conn, tbl) conn.execute(sa.text(f'ALTER TABLE "{tbl}" REPLICA IDENTITY DEFAULT'))
conn.execute(sa.text(f'LOCK TABLE "{tbl}" IN ACCESS EXCLUSIVE MODE'))
for tbl in TABLES_WITH_FULL_IDENTITY: conn.execute(sa.text(f"DROP PUBLICATION IF EXISTS {PUBLICATION_NAME}"))
conn.execute(sa.text(f'ALTER TABLE "{tbl}" REPLICA IDENTITY DEFAULT'))
conn.execute(sa.text(f"DROP PUBLICATION IF EXISTS {PUBLICATION_NAME}")) has_zero_ver = conn.execute(
sa.text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_name = 'documents' AND column_name = '_0_version'"
)
).fetchone()
has_zero_ver = conn.execute( cols = DOCUMENT_COLS + (['"_0_version"'] if has_zero_ver else [])
sa.text( col_list = ", ".join(cols)
"SELECT 1 FROM information_schema.columns "
"WHERE table_name = 'documents' AND column_name = '_0_version'" conn.execute(
sa.text(
f"CREATE PUBLICATION {PUBLICATION_NAME} FOR TABLE "
f"notifications, "
f"documents ({col_list}), "
f"folders, "
f"search_source_connectors, "
f"new_chat_messages, "
f"chat_comments, "
f"chat_session_state"
)
) )
).fetchone()
cols = DOCUMENT_COLS + (['"_0_version"'] if has_zero_ver else [])
col_list = ", ".join(cols)
conn.execute(
sa.text(
f"CREATE PUBLICATION {PUBLICATION_NAME} FOR TABLE "
f"notifications, "
f"documents ({col_list}), "
f"folders, "
f"search_source_connectors, "
f"new_chat_messages, "
f"chat_comments, "
f"chat_session_state"
)
)
def downgrade() -> None: def downgrade() -> None:

View file

@ -12,8 +12,6 @@ from __future__ import annotations
from collections.abc import Sequence from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op from alembic import op
revision: str = "121" revision: str = "121"
@ -23,16 +21,30 @@ depends_on: str | Sequence[str] | None = None
def upgrade() -> None: def upgrade() -> None:
op.add_column( # Idempotent: column(s) may already exist after a failed run or manual DDL.
"user", op.execute(
sa.Column("memory_md", sa.Text(), nullable=True, server_default=""), """
) DO $$
op.add_column( BEGIN
"searchspaces", IF NOT EXISTS (
sa.Column("shared_memory_md", sa.Text(), nullable=True, server_default=""), SELECT 1 FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'user'
AND column_name = 'memory_md'
) THEN
ALTER TABLE "user" ADD COLUMN memory_md TEXT DEFAULT '';
END IF;
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'searchspaces'
AND column_name = 'shared_memory_md'
) THEN
ALTER TABLE searchspaces ADD COLUMN shared_memory_md TEXT DEFAULT '';
END IF;
END$$;
"""
) )
def downgrade() -> None: def downgrade() -> None:
op.drop_column("searchspaces", "shared_memory_md") op.execute("ALTER TABLE searchspaces DROP COLUMN IF EXISTS shared_memory_md")
op.drop_column("user", "memory_md") op.execute('ALTER TABLE "user" DROP COLUMN IF EXISTS memory_md')