diff --git a/surfsense_backend/alembic/versions/144_add_automation_tables.py b/surfsense_backend/alembic/versions/144_add_automation_tables.py index 39f927417..296c33585 100644 --- a/surfsense_backend/alembic/versions/144_add_automation_tables.py +++ b/surfsense_backend/alembic/versions/144_add_automation_tables.py @@ -25,34 +25,60 @@ depends_on: str | Sequence[str] | None = None def upgrade() -> None: - # ENUM types (PostgreSQL requires types created before tables that use them) + # Guard every object so the migration is safe to re-run after a partial + # apply (the types/tables outlive a failed run that never advanced + # alembic_version). Types must precede the tables that reference them. op.execute( """ - CREATE TYPE automation_status AS ENUM ( - 'active', 'paused', 'archived' - ); + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type WHERE typname = 'automation_status' + ) THEN + CREATE TYPE automation_status AS ENUM ( + 'active', 'paused', 'archived' + ); + END IF; + END + $$; """ ) op.execute( """ - CREATE TYPE automation_trigger_type AS ENUM ( - 'schedule', 'manual' - ); + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type WHERE typname = 'automation_trigger_type' + ) THEN + CREATE TYPE automation_trigger_type AS ENUM ( + 'schedule', 'manual' + ); + END IF; + END + $$; """ ) op.execute( """ - CREATE TYPE automation_run_status AS ENUM ( - 'pending', 'running', 'succeeded', 'failed', - 'cancelled', 'timed_out' - ); + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type WHERE typname = 'automation_run_status' + ) THEN + CREATE TYPE automation_run_status AS ENUM ( + 'pending', 'running', 'succeeded', 'failed', + 'cancelled', 'timed_out' + ); + END IF; + END + $$; """ ) # automations — the editable, versioned automation definition op.execute( """ - CREATE TABLE automations ( + CREATE TABLE IF NOT EXISTS automations ( id SERIAL PRIMARY KEY, search_space_id INTEGER NOT NULL REFERENCES searchspaces(id) ON DELETE CASCADE, @@ -69,19 +95,25 @@ def upgrade() -> None: """ ) op.execute( - "CREATE INDEX ix_automations_search_space_id ON automations(search_space_id);" + "CREATE INDEX IF NOT EXISTS ix_automations_search_space_id ON automations(search_space_id);" ) op.execute( - "CREATE INDEX ix_automations_created_by_user_id ON automations(created_by_user_id);" + "CREATE INDEX IF NOT EXISTS ix_automations_created_by_user_id ON automations(created_by_user_id);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_automations_status ON automations(status);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_automations_created_at ON automations(created_at);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_automations_updated_at ON automations(updated_at);" ) - op.execute("CREATE INDEX ix_automations_status ON automations(status);") - op.execute("CREATE INDEX ix_automations_created_at ON automations(created_at);") - op.execute("CREATE INDEX ix_automations_updated_at ON automations(updated_at);") # automation_triggers — one row per (automation, trigger-instance) pair op.execute( """ - CREATE TABLE automation_triggers ( + CREATE TABLE IF NOT EXISTS automation_triggers ( id SERIAL PRIMARY KEY, automation_id INTEGER NOT NULL REFERENCES automations(id) ON DELETE CASCADE, @@ -96,20 +128,22 @@ def upgrade() -> None: """ ) op.execute( - "CREATE INDEX ix_automation_triggers_automation_id ON automation_triggers(automation_id);" - ) - op.execute("CREATE INDEX ix_automation_triggers_type ON automation_triggers(type);") - op.execute( - "CREATE INDEX ix_automation_triggers_enabled ON automation_triggers(enabled);" + "CREATE INDEX IF NOT EXISTS ix_automation_triggers_automation_id ON automation_triggers(automation_id);" ) op.execute( - "CREATE INDEX ix_automation_triggers_created_at ON automation_triggers(created_at);" + "CREATE INDEX IF NOT EXISTS ix_automation_triggers_type ON automation_triggers(type);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_automation_triggers_enabled ON automation_triggers(enabled);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_automation_triggers_created_at ON automation_triggers(created_at);" ) # Partial index for the schedule tick: only enabled schedule triggers # with a scheduled next fire are ever scanned for due rows. op.execute( """ - CREATE INDEX ix_automation_triggers_due + CREATE INDEX IF NOT EXISTS ix_automation_triggers_due ON automation_triggers (next_fire_at) WHERE enabled = true AND type = 'schedule' @@ -120,7 +154,7 @@ def upgrade() -> None: # automation_runs — the immutable per-fire execution record op.execute( """ - CREATE TABLE automation_runs ( + CREATE TABLE IF NOT EXISTS automation_runs ( id SERIAL PRIMARY KEY, automation_id INTEGER NOT NULL REFERENCES automations(id) ON DELETE CASCADE, @@ -140,14 +174,16 @@ def upgrade() -> None: """ ) op.execute( - "CREATE INDEX ix_automation_runs_automation_id ON automation_runs(automation_id);" + "CREATE INDEX IF NOT EXISTS ix_automation_runs_automation_id ON automation_runs(automation_id);" ) op.execute( - "CREATE INDEX ix_automation_runs_trigger_id ON automation_runs(trigger_id);" + "CREATE INDEX IF NOT EXISTS ix_automation_runs_trigger_id ON automation_runs(trigger_id);" ) - op.execute("CREATE INDEX ix_automation_runs_status ON automation_runs(status);") op.execute( - "CREATE INDEX ix_automation_runs_created_at ON automation_runs(created_at);" + "CREATE INDEX IF NOT EXISTS ix_automation_runs_status ON automation_runs(status);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_automation_runs_created_at ON automation_runs(created_at);" ) diff --git a/surfsense_backend/alembic/versions/148_add_automation_runs_to_zero_publication.py b/surfsense_backend/alembic/versions/148_add_automation_runs_to_zero_publication.py new file mode 100644 index 000000000..1b25be753 --- /dev/null +++ b/surfsense_backend/alembic/versions/148_add_automation_runs_to_zero_publication.py @@ -0,0 +1,177 @@ +"""add automation_runs to zero_publication with thin column list + +Publishes ``automation_runs`` so the dashboard can replace polling with a +live run status + per-step ticker. Only the columns the list and ticker +read are exposed (``id, automation_id, trigger_id, status, step_results, +started_at, finished_at, created_at``); heavy JSONB +(``definition_snapshot``, ``inputs``, ``output``, ``artifacts``, ``error``) +stays on REST and is fetched lazily on detail expand. + +Uses the canonical ``ALTER PUBLICATION ... SET TABLE`` + ``COMMENT`` +bookend pattern (see migration 143) -- the shape Zero ``>=1.0`` requires +to fire its schema-change hook. Existing tables are re-emitted unchanged. + +Revision ID: 148 +Revises: 147 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +revision: str = "148" +down_revision: str | None = "147" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +PUBLICATION_NAME = "zero_publication" + +# Mirrors migration 143. Kept in sync explicitly: any change to these lists +# must be re-emitted in a new resync migration with COMMENT bookends. +DOCUMENT_COLS = [ + "id", + "title", + "document_type", + "search_space_id", + "folder_id", + "created_by_id", + "status", + "created_at", + "updated_at", +] + +USER_COLS = [ + "id", + "pages_limit", + "pages_used", + "premium_credit_micros_limit", + "premium_credit_micros_used", +] + +# Thin set: status + lightweight progress only. Heavy JSONB stays on REST. +AUTOMATION_RUN_COLS = [ + "id", + "automation_id", + "trigger_id", + "status", + "step_results", + "started_at", + "finished_at", + "created_at", +] + + +def _has_zero_version(conn, table: str) -> bool: + return ( + conn.execute( + sa.text( + "SELECT 1 FROM information_schema.columns " + "WHERE table_name = :tbl AND column_name = '_0_version'" + ), + {"tbl": table}, + ).fetchone() + is not None + ) + + +def _build_set_table_ddl( + *, documents_has_zero_ver: bool, user_has_zero_ver: bool +) -> str: + doc_cols = DOCUMENT_COLS + (['"_0_version"'] if documents_has_zero_ver else []) + user_cols = USER_COLS + (['"_0_version"'] if user_has_zero_ver else []) + doc_col_list = ", ".join(doc_cols) + user_col_list = ", ".join(user_cols) + run_col_list = ", ".join(AUTOMATION_RUN_COLS) + return ( + f"ALTER PUBLICATION {PUBLICATION_NAME} SET TABLE " + f"notifications, " + f"documents ({doc_col_list}), " + f"folders, " + f"search_source_connectors, " + f"new_chat_messages, " + f"chat_comments, " + f"chat_session_state, " + f'"user" ({user_col_list}), ' + f"automation_runs ({run_col_list})" + ) + + +def upgrade() -> None: + conn = op.get_bind() + + exists = conn.execute( + sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"), + {"name": PUBLICATION_NAME}, + ).fetchone() + if not exists: + return + + documents_has_zero_ver = _has_zero_version(conn, "documents") + user_has_zero_ver = _has_zero_version(conn, "user") + + # COMMENT-ALTER-COMMENT trio must be one transaction so Zero observes + # them as one schema-change event. Matches the SAVEPOINT pattern used + # in migrations 117 / 139 / 140 / 143. + tx = conn.begin_nested() if conn.in_transaction() else conn.begin() + with tx: + conn.execute( + sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-148-resync'") + ) + conn.execute( + sa.text( + _build_set_table_ddl( + documents_has_zero_ver=documents_has_zero_ver, + user_has_zero_ver=user_has_zero_ver, + ) + ) + ) + conn.execute( + sa.text(f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-148-resync'") + ) + + +def downgrade() -> None: + """Re-emit migration 143's shape (no automation_runs).""" + conn = op.get_bind() + + exists = conn.execute( + sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"), + {"name": PUBLICATION_NAME}, + ).fetchone() + if not exists: + return + + documents_has_zero_ver = _has_zero_version(conn, "documents") + user_has_zero_ver = _has_zero_version(conn, "user") + + doc_cols = DOCUMENT_COLS + (['"_0_version"'] if documents_has_zero_ver else []) + user_cols = USER_COLS + (['"_0_version"'] if user_has_zero_ver else []) + doc_col_list = ", ".join(doc_cols) + user_col_list = ", ".join(user_cols) + ddl = ( + f"ALTER PUBLICATION {PUBLICATION_NAME} SET TABLE " + f"notifications, " + f"documents ({doc_col_list}), " + f"folders, " + f"search_source_connectors, " + f"new_chat_messages, " + f"chat_comments, " + f"chat_session_state, " + f'"user" ({user_col_list})' + ) + + tx = conn.begin_nested() if conn.in_transaction() else conn.begin() + with tx: + conn.execute( + sa.text( + f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'pre-148-downgrade'" + ) + ) + conn.execute(sa.text(ddl)) + conn.execute( + sa.text( + f"COMMENT ON PUBLICATION {PUBLICATION_NAME} IS 'post-148-downgrade'" + ) + ) diff --git a/surfsense_backend/app/automations/actions/builtin/agent_task/dependencies.py b/surfsense_backend/app/automations/actions/builtin/agent_task/dependencies.py index e3736cc95..4ef8c52bf 100644 --- a/surfsense_backend/app/automations/actions/builtin/agent_task/dependencies.py +++ b/surfsense_backend/app/automations/actions/builtin/agent_task/dependencies.py @@ -85,23 +85,16 @@ async def build_dependencies( connector_service, firecrawl_api_key = await setup_connector_and_firecrawl( session, search_space_id=search_space_id ) - # Quick fix: use an in-memory checkpointer for automation runs. + # Per-task InMemorySaver: the shared Postgres checkpointer's connection + # pool binds connections to the loop that opened them, but Celery uses a + # fresh loop per task, so the next task hangs 30s on a dead-loop connection + # (`PoolTimeout`). InMemorySaver has no pool and dies with the task — fine + # while runs are one-shot (the checkpoint only spans one graph execution). # - # The shared Postgres checkpointer caches DB connections in a - # module-level pool. Each cached connection is bound to the asyncio - # loop that opened it. Celery throws away the loop after every task, - # so the pool ends up full of connections pointing to a dead loop, - # and the next Celery task (running on a fresh loop) can't use any - # of them — it hangs 30s and fails with - # `PoolTimeout: couldn't get a connection after 30.00 sec`. - # - # InMemorySaver has no cached connections, no loop binding — each - # Celery task creates one and drops it on exit. - # - # TODO(checkpointer): proper fix is to dispose the checkpointer - # pool around each Celery task in `run_async_celery_task`, the same - # way `_dispose_shared_db_engine` already does for the SQLAlchemy - # pool. Then this site can switch back to the shared checkpointer. + # TODO(checkpointer): when runs need durability (crash-resume or HITL + # interrupt/resume across tasks), dispose the checkpointer pool around each + # Celery task in `run_async_celery_task` — as `_dispose_shared_db_engine` + # already does for the SQLAlchemy pool — then use the shared checkpointer. checkpointer = InMemorySaver() return AgentDependencies( llm=llm, diff --git a/surfsense_web/app/dashboard/[search_space_id]/automations/[automation_id]/components/run-details-panel.tsx b/surfsense_web/app/dashboard/[search_space_id]/automations/[automation_id]/components/run-details-panel.tsx index 164f156e5..1a54ac0e5 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/automations/[automation_id]/components/run-details-panel.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/automations/[automation_id]/components/run-details-panel.tsx @@ -15,7 +15,7 @@ import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/component import { ScrollArea } from "@/components/ui/scroll-area"; import { Separator } from "@/components/ui/separator"; import { Skeleton } from "@/components/ui/skeleton"; -import type { RunStepResult } from "@/contracts/types/automation.types"; +import type { RunStatus, RunStepResult } from "@/contracts/types/automation.types"; import { useAutomationRun } from "@/hooks/use-automation-runs"; import { cn } from "@/lib/utils"; import { RunStepResultCard } from "./run-step-result-card"; @@ -23,44 +23,46 @@ import { RunStepResultCard } from "./run-step-result-card"; interface RunDetailsPanelProps { automationId: number; runId: number; + /** Live step entries from Zero; rendered while the run is in-flight and + * also kept as the authoritative source once it finishes. */ + liveSteps: RunStepResult[]; + /** Live run status from Zero. Used to hide diagnostic sections that + * only make sense after the run reaches a terminal state. */ + liveStatus: RunStatus; } /** - * Expanded view of a single run. Fetches lazily — the parent only renders - * this once the row is opened, so the list view stays cheap. + * Expanded view of a single run. Steps render immediately from the live + * Zero row so the panel updates as the run progresses; the heavy REST + * payload (output, artifacts, resolved inputs, run-level error) is + * fetched lazily and merged in when it arrives. * - * We surface the run outcome readably: a run-level error first (when - * present), then per-step cards that render the agent's markdown - * ``final_message`` directly, and finally the structural artifacts/inputs. - * The full ``definition_snapshot`` is omitted because it usually mirrors the - * live definition — surfacing it would dominate the panel without informing + * Surfacing order is outcome-first: a run-level error (when present), + * then per-step cards that render the agent's markdown ``final_message`` + * directly, and finally the structural artifacts/inputs. The full + * ``definition_snapshot`` is omitted because it usually mirrors the live + * definition — surfacing it would dominate the panel without informing * what the user is trying to learn ("did this work? what did it do?"). */ -export function RunDetailsPanel({ automationId, runId }: RunDetailsPanelProps) { - const { data: run, isLoading, error } = useAutomationRun(automationId, runId); +export function RunDetailsPanel({ + automationId, + runId, + liveSteps, + liveStatus, +}: RunDetailsPanelProps) { + const isTerminal = liveStatus !== "pending" && liveStatus !== "running"; + // Defer the REST round-trip until the run can actually carry heavy + // fields — output/artifacts/error are only written at terminal mark. + const { data: run, isLoading, error } = useAutomationRun(automationId, runId, { + enabled: isTerminal, + }); - if (isLoading) { - return ( -
No steps recorded.
++ {isTerminal ? "No steps recorded." : "Waiting for first step…"} +
) : (+ Couldn't load run details{error?.message ? `: ${error.message}` : "."} +
+ ) : hasDiagnostics ? ( + <> +