diff --git a/surfsense_backend/alembic/versions/158_evolve_podcasts_lifecycle.py b/surfsense_backend/alembic/versions/158_evolve_podcasts_lifecycle.py index 15cf04f9d..7c51158a9 100644 --- a/surfsense_backend/alembic/versions/158_evolve_podcasts_lifecycle.py +++ b/surfsense_backend/alembic/versions/158_evolve_podcasts_lifecycle.py @@ -13,8 +13,34 @@ down_revision: str | None = "157" branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None +PUBLICATION_NAME = "zero_publication" + + +def _drop_podcasts_from_zero_publication() -> None: + """Temporarily unpublish podcasts while changing published columns.""" + + op.execute( + f""" + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_publication_tables + WHERE pubname = '{PUBLICATION_NAME}' + AND schemaname = current_schema() + AND tablename = 'podcasts' + ) THEN + ALTER PUBLICATION "{PUBLICATION_NAME}" DROP TABLE "podcasts"; + END IF; + END + $$; + """ + ) + def upgrade() -> None: + _drop_podcasts_from_zero_publication() + # Retype the status enum by swapping in a fresh type and casting existing # rows. The legacy transient value 'generating' maps onto 'rendering'. op.execute("ALTER TYPE podcast_status RENAME TO podcast_status_old;") @@ -57,6 +83,8 @@ def upgrade() -> None: def downgrade() -> None: + _drop_podcasts_from_zero_publication() + op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS error;") op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS duration_seconds;") op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS storage_key;") diff --git a/surfsense_backend/app/zero_publication.py b/surfsense_backend/app/zero_publication.py index 139286ee6..869559c55 100644 --- a/surfsense_backend/app/zero_publication.py +++ b/surfsense_backend/app/zero_publication.py @@ -100,12 +100,32 @@ def _column_exists(conn: Connection, table: str, column: str) -> bool: ) -def _expected_columns(conn: Connection, table: str) -> list[str] | None: +def _table_exists(conn: Connection, table: str) -> bool: + return ( + conn.execute( + text( + "SELECT 1 FROM information_schema.tables " + "WHERE table_schema = current_schema() " + "AND table_name = :table" + ), + {"table": table}, + ).fetchone() + is not None + ) + + +def _expected_columns( + conn: Connection, table: str, *, include_missing_columns: bool = True +) -> list[str] | None: columns = ZERO_PUBLICATION[table] if columns is None: return None - expected = list(columns) + if include_missing_columns: + expected = list(columns) + else: + expected = [column for column in columns if _column_exists(conn, table, column)] + if table in {"documents", "user", "podcasts"} and _column_exists( conn, table, "_0_version" ): @@ -113,11 +133,20 @@ def _expected_columns(conn: Connection, table: str) -> list[str] | None: return expected -def _format_table_entry(conn: Connection, table: str) -> str: - columns = _expected_columns(conn, table) +def _format_table_entry( + conn: Connection, table: str, *, include_missing_columns: bool = True +) -> str | None: + if not include_missing_columns and not _table_exists(conn, table): + return None + + columns = _expected_columns( + conn, table, include_missing_columns=include_missing_columns + ) table_sql = _quote_identifier(table) if columns is None: return table_sql + if not include_missing_columns and not columns: + return None column_sql = ", ".join(_quote_identifier(column) for column in columns) return f"{table_sql} ({column_sql})" @@ -126,9 +155,17 @@ def _format_table_entry(conn: Connection, table: str) -> str: def build_set_table_sql(conn: Connection) -> str: """Build the canonical plain SET TABLE statement for Zero's event triggers.""" - table_list = ", ".join( - _format_table_entry(conn, table) for table in ZERO_PUBLICATION - ) + table_entries = [ + entry + for table in ZERO_PUBLICATION + if ( + entry := _format_table_entry( + conn, table, include_missing_columns=False + ) + ) + is not None + ] + table_list = ", ".join(table_entries) return f"ALTER PUBLICATION {_quote_identifier(PUBLICATION_NAME)} SET TABLE {table_list}"