feat(docker): add ZERO_AUTO_RESET configuration for improved replication safety

- Introduced the ZERO_AUTO_RESET environment variable to enable automatic reset of the SQLite replica in case of replication halts.
- Updated Docker Compose files to include ZERO_AUTO_RESET in service configurations.
- Enhanced documentation to clarify the purpose and usage of the new variable.
This commit is contained in:
Anish Sarkar 2026-06-06 14:21:14 +05:30
parent 19fabaf011
commit 4e00f24a03
12 changed files with 304 additions and 151 deletions

View file

@ -3,6 +3,7 @@ import os
import sys
from logging.config import fileConfig
import sqlalchemy as sa
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
@ -36,6 +37,9 @@ if config.config_file_name is not None:
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
MIGRATION_ADVISORY_LOCK_NAMESPACE = "surfsense"
MIGRATION_ADVISORY_LOCK_NAME = "alembic_migrations"
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
@ -73,8 +77,22 @@ def do_run_migrations(connection: Connection) -> None:
transaction_per_migration=True,
)
with context.begin_transaction():
context.run_migrations()
lock_params = {
"namespace": MIGRATION_ADVISORY_LOCK_NAMESPACE,
"name": MIGRATION_ADVISORY_LOCK_NAME,
}
connection.execute(
sa.text("SELECT pg_advisory_lock(hashtext(:namespace), hashtext(:name))"),
lock_params,
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.execute(
sa.text("SELECT pg_advisory_unlock(hashtext(:namespace), hashtext(:name))"),
lock_params,
)
async def run_async_migrations() -> None:

View file

@ -47,7 +47,6 @@ depends_on: str | Sequence[str] | None = None
PUBLICATION_NAME = "zero_publication"
# Must stay in sync with the column lists in migrations 117 / 139 / 140.
DOCUMENT_COLS = [
"id",
"title",

View file

@ -0,0 +1,23 @@
"""reconcile zero_publication from canonical definition
Revision ID: 155
Revises: 154
"""
from collections.abc import Sequence
from alembic import op
from app.zero_publication import apply_publication
revision: str = "155"
down_revision: str | None = "154"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
apply_publication(op.get_bind())
def downgrade() -> None:
"""No-op. Historical publication shapes are immutable."""

View file

@ -0,0 +1,229 @@
"""Canonical Zero publication definition for SurfSense.
This module is the single source of truth for ``zero_publication``. Future
publication changes should update ``ZERO_PUBLICATION`` and call
``apply_publication()`` from a migration instead of hand-copying table lists.
SurfSense runs Zero on Postgres with Zero's event triggers installed, so the
official Zero path is a plain ``ALTER PUBLICATION ... SET TABLE``. If a future
deployment cannot use event triggers, use Zero's documented
``zero_0.update_schemas()`` hook as the fallback instead of COMMENT bookends.
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from collections.abc import Mapping, Sequence
from sqlalchemy import text
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import create_async_engine
PUBLICATION_NAME = "zero_publication"
DOCUMENT_COLS = [
"id",
"title",
"document_type",
"search_space_id",
"folder_id",
"created_by_id",
"status",
"created_at",
"updated_at",
]
USER_COLS = [
"id",
"pages_limit",
"pages_used",
"premium_credit_micros_limit",
"premium_credit_micros_used",
]
AUTOMATION_RUN_COLS = [
"id",
"automation_id",
"trigger_id",
"status",
"step_results",
"started_at",
"finished_at",
"created_at",
]
ZERO_PUBLICATION: Mapping[str, Sequence[str] | None] = {
"notifications": None,
"documents": DOCUMENT_COLS,
"folders": None,
"search_source_connectors": None,
"new_chat_messages": None,
"chat_comments": None,
"chat_session_state": None,
"user": USER_COLS,
"automation_runs": AUTOMATION_RUN_COLS,
}
def _quote_identifier(identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _column_exists(conn: Connection, table: str, column: str) -> bool:
return (
conn.execute(
text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_schema = current_schema() "
"AND table_name = :table AND column_name = :column"
),
{"table": table, "column": column},
).fetchone()
is not None
)
def _expected_columns(conn: Connection, table: str) -> list[str] | None:
columns = ZERO_PUBLICATION[table]
if columns is None:
return None
expected = list(columns)
if table in {"documents", "user"} and _column_exists(conn, table, "_0_version"):
expected.append("_0_version")
return expected
def _format_table_entry(conn: Connection, table: str) -> str:
columns = _expected_columns(conn, table)
table_sql = _quote_identifier(table)
if columns is None:
return table_sql
column_sql = ", ".join(_quote_identifier(column) for column in columns)
return f"{table_sql} ({column_sql})"
def build_set_table_sql(conn: Connection) -> str:
"""Build the canonical plain SET TABLE statement for Zero's event triggers."""
table_list = ", ".join(_format_table_entry(conn, table) for table in ZERO_PUBLICATION)
return f"ALTER PUBLICATION {_quote_identifier(PUBLICATION_NAME)} SET TABLE {table_list}"
def apply_publication(conn: Connection) -> None:
"""Reconcile ``zero_publication`` to the canonical shape."""
exists = conn.execute(
text("SELECT 1 FROM pg_publication WHERE pubname = :name"),
{"name": PUBLICATION_NAME},
).fetchone()
if not exists:
return
conn.execute(text(build_set_table_sql(conn)))
def _actual_publication_shape(conn: Connection) -> dict[str, list[str] | None]:
rows = conn.execute(
text(
"SELECT pt.tablename, pr.prattrs IS NULL AS all_columns, pt.attnames "
"FROM pg_publication_tables pt "
"JOIN pg_publication p ON p.pubname = pt.pubname "
"JOIN pg_class c ON c.relname = pt.tablename "
"JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = pt.schemaname "
"JOIN pg_publication_rel pr ON pr.prpubid = p.oid AND pr.prrelid = c.oid "
"WHERE pt.pubname = :name AND pt.schemaname = current_schema() "
"ORDER BY pt.tablename"
),
{"name": PUBLICATION_NAME},
).mappings()
return {
str(row["tablename"]): None
if row["all_columns"]
else list(row["attnames"] or [])
for row in rows
}
def expected_publication_shape(conn: Connection) -> dict[str, list[str] | None]:
return {table: _expected_columns(conn, table) for table in ZERO_PUBLICATION}
def verify_publication(conn: Connection) -> list[str]:
"""Return human-readable mismatches between Postgres and the canonical shape."""
publication_exists = conn.execute(
text("SELECT 1 FROM pg_publication WHERE pubname = :name"),
{"name": PUBLICATION_NAME},
).fetchone()
if not publication_exists:
return [f"Publication {PUBLICATION_NAME!r} does not exist"]
actual = _actual_publication_shape(conn)
expected = expected_publication_shape(conn)
mismatches: list[str] = []
for table, expected_columns in expected.items():
if table not in actual:
mismatches.append(f"{table}: missing from publication")
continue
actual_columns = actual[table]
actual_key = sorted(actual_columns) if actual_columns is not None else None
expected_key = sorted(expected_columns) if expected_columns is not None else None
if actual_key != expected_key:
mismatches.append(
f"{table}: expected columns {expected_columns or 'ALL'}, "
f"got {actual_columns or 'ALL'}"
)
for table in sorted(set(actual) - set(expected)):
mismatches.append(f"{table}: unexpected table in publication")
return mismatches
async def _verify_cli() -> int:
database_url = os.getenv("DATABASE_URL")
if not database_url:
print("DATABASE_URL is required to verify zero_publication.", file=sys.stderr)
return 2
engine = create_async_engine(database_url)
async with engine.connect() as async_conn:
def run_verify(sync_conn: Connection) -> list[str]:
return verify_publication(sync_conn)
mismatches = await async_conn.run_sync(run_verify)
await engine.dispose()
if mismatches:
print("zero_publication shape mismatch:", file=sys.stderr)
for mismatch in mismatches:
print(f" - {mismatch}", file=sys.stderr)
return 1
print("zero_publication shape verified.")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Manage SurfSense's Zero publication")
parser.add_argument("--verify", action="store_true", help="verify zero_publication shape")
args = parser.parse_args()
if args.verify:
return asyncio.run(_verify_cli())
parser.print_help()
return 2
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -49,10 +49,10 @@ trap cleanup SIGTERM SIGINT
# ── Database migrations (only for migrate / all) ─────────────
# Fail-fast contract:
# - alembic upgrade head must succeed within ${MIGRATION_TIMEOUT:-900}s
# - zero_publication must exist in pg_publication afterwards
# - zero_publication must match the canonical app.zero_publication shape
# Either failure exits non-zero so the dedicated `migrations` compose
# service exits non-zero, halting the rest of the stack instead of
# silently producing a half-built system that crash-loops zero-cache.
# silently producing a drifted Zero publication.
run_migrations() {
echo "Running database migrations..."
for i in {1..30}; do
@ -73,58 +73,13 @@ run_migrations() {
fi
echo "Migrations completed successfully."
echo "Verifying zero_publication exists in Postgres..."
local pub_oid
pub_oid=$(python <<'PY' 2>/dev/null || true
import asyncio
import sys
from sqlalchemy import text
from app.db import engine
async def get_oid():
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT oid FROM pg_publication WHERE pubname = 'zero_publication'")
)
row = result.first()
if row is None:
sys.exit(1)
print(int(row[0]))
asyncio.run(get_oid())
PY
)
if [ -z "${pub_oid}" ]; then
echo "ERROR: zero_publication is missing from Postgres after running alembic." >&2
echo "This usually means migration 116 (or a later publication migration) did not run." >&2
echo "Verifying zero_publication matches the canonical shape..."
if ! python -m app.zero_publication --verify; then
echo "ERROR: zero_publication does not match the canonical shape." >&2
echo "Inspect alembic state with:" >&2
echo " docker compose exec db psql -U \"\$DB_USER\" -d \"\$DB_NAME\" -c 'SELECT * FROM alembic_version;'" >&2
exit 1
fi
echo "zero_publication verified (oid=${pub_oid})."
# Stale-replica safety net: if /zero-init is mounted (i.e. we are the
# dedicated `migrations` compose service), drop a marker file when the
# publication oid changed (or on first run) so the wrapped zero-cache
# entrypoint can wipe /data/zero.db before starting. This recovers from
# the case where a previous zero-cache crashed mid-init and left a
# half-built SQLite replica without a `_zero.tableMetadata` table.
if [ -d /zero-init ]; then
local stored_oid=""
[ -f /zero-init/last_pub_oid ] && stored_oid=$(cat /zero-init/last_pub_oid 2>/dev/null || true)
if [ -z "${stored_oid}" ] || [ "${stored_oid}" != "${pub_oid}" ]; then
echo "Publication oid changed (stored=${stored_oid:-<none>}, current=${pub_oid}); writing /zero-init/needs_reset."
: > /zero-init/needs_reset
chmod 666 /zero-init/needs_reset 2>/dev/null || true
fi
echo "${pub_oid}" > /zero-init/last_pub_oid
chmod 666 /zero-init/last_pub_oid 2>/dev/null || true
# World-writable dir so the (possibly non-root) zero-cache container
# can `rm -f /zero-init/needs_reset` after acting on the marker.
chmod 777 /zero-init 2>/dev/null || true
fi
}
# ── Service starters ─────────────────────────────────────────