feat(database): enhance podcast lifecycle management by adding temporary unpublishing during migration

This commit is contained in:
Anish Sarkar 2026-06-12 03:35:49 +05:30
parent 72a7fe04b5
commit aba95e4faf
2 changed files with 72 additions and 7 deletions

View file

@ -13,8 +13,34 @@ down_revision: str | None = "157"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
PUBLICATION_NAME = "zero_publication"
def _drop_podcasts_from_zero_publication() -> None:
"""Temporarily unpublish podcasts while changing published columns."""
op.execute(
f"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_publication_tables
WHERE pubname = '{PUBLICATION_NAME}'
AND schemaname = current_schema()
AND tablename = 'podcasts'
) THEN
ALTER PUBLICATION "{PUBLICATION_NAME}" DROP TABLE "podcasts";
END IF;
END
$$;
"""
)
def upgrade() -> None:
_drop_podcasts_from_zero_publication()
# Retype the status enum by swapping in a fresh type and casting existing
# rows. The legacy transient value 'generating' maps onto 'rendering'.
op.execute("ALTER TYPE podcast_status RENAME TO podcast_status_old;")
@ -57,6 +83,8 @@ def upgrade() -> None:
def downgrade() -> None:
_drop_podcasts_from_zero_publication()
op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS error;")
op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS duration_seconds;")
op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS storage_key;")

View file

@ -100,12 +100,32 @@ def _column_exists(conn: Connection, table: str, column: str) -> bool:
)
def _expected_columns(conn: Connection, table: str) -> list[str] | None:
def _table_exists(conn: Connection, table: str) -> bool:
return (
conn.execute(
text(
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema = current_schema() "
"AND table_name = :table"
),
{"table": table},
).fetchone()
is not None
)
def _expected_columns(
conn: Connection, table: str, *, include_missing_columns: bool = True
) -> list[str] | None:
columns = ZERO_PUBLICATION[table]
if columns is None:
return None
expected = list(columns)
if include_missing_columns:
expected = list(columns)
else:
expected = [column for column in columns if _column_exists(conn, table, column)]
if table in {"documents", "user", "podcasts"} and _column_exists(
conn, table, "_0_version"
):
@ -113,11 +133,20 @@ def _expected_columns(conn: Connection, table: str) -> list[str] | None:
return expected
def _format_table_entry(conn: Connection, table: str) -> str:
columns = _expected_columns(conn, table)
def _format_table_entry(
conn: Connection, table: str, *, include_missing_columns: bool = True
) -> str | None:
if not include_missing_columns and not _table_exists(conn, table):
return None
columns = _expected_columns(
conn, table, include_missing_columns=include_missing_columns
)
table_sql = _quote_identifier(table)
if columns is None:
return table_sql
if not include_missing_columns and not columns:
return None
column_sql = ", ".join(_quote_identifier(column) for column in columns)
return f"{table_sql} ({column_sql})"
@ -126,9 +155,17 @@ def _format_table_entry(conn: Connection, table: str) -> str:
def build_set_table_sql(conn: Connection) -> str:
"""Build the canonical plain SET TABLE statement for Zero's event triggers."""
table_list = ", ".join(
_format_table_entry(conn, table) for table in ZERO_PUBLICATION
)
table_entries = [
entry
for table in ZERO_PUBLICATION
if (
entry := _format_table_entry(
conn, table, include_missing_columns=False
)
)
is not None
]
table_list = ", ".join(table_entries)
return f"ALTER PUBLICATION {_quote_identifier(PUBLICATION_NAME)} SET TABLE {table_list}"