diff --git a/api/alembic/versions/cdcf9f65913b_add_workflow_uuid.py b/api/alembic/versions/cdcf9f65913b_add_workflow_uuid.py new file mode 100644 index 0000000..3789bdc --- /dev/null +++ b/api/alembic/versions/cdcf9f65913b_add_workflow_uuid.py @@ -0,0 +1,49 @@ +"""add workflow_uuid + +Revision ID: cdcf9f65913b +Revises: a1b2c3d4e5f6 +Create Date: 2026-04-25 18:24:45.954049 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cdcf9f65913b" +down_revision: Union[str, None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # 1. Add the column as nullable so existing rows are accepted. + op.add_column( + "workflows", + sa.Column("workflow_uuid", sa.String(length=36), nullable=True), + ) + + # 2. Backfill UUIDs for existing rows. gen_random_uuid() is built-in on + # PostgreSQL 13+; cast to text to match the String(36) column type. + op.execute( + "UPDATE workflows SET workflow_uuid = gen_random_uuid()::text " + "WHERE workflow_uuid IS NULL" + ) + + # 3. Now that every row has a value, enforce NOT NULL. + op.alter_column("workflows", "workflow_uuid", nullable=False) + + # 4. Create the unique index. + op.create_index( + op.f("ix_workflows_workflow_uuid"), + "workflows", + ["workflow_uuid"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_workflows_workflow_uuid"), table_name="workflows") + op.drop_column("workflows", "workflow_uuid") diff --git a/api/db/agent_trigger_client.py b/api/db/agent_trigger_client.py index ffe10cd..8d5419c 100644 --- a/api/db/agent_trigger_client.py +++ b/api/db/agent_trigger_client.py @@ -3,24 +3,36 @@ from typing import List, Optional from loguru import logger -from sqlalchemy import and_, select, update -from sqlalchemy.dialects.postgresql import insert +from sqlalchemy import and_, insert, select, update from api.db.base_client import BaseDBClient from api.db.models import AgentTriggerModel from api.enums import TriggerState +class TriggerPathConflictError(Exception): + """Raised when a trigger path is already in use by a different workflow. + + ``trigger_path`` is globally unique, so any conflict — same org or + cross-org — surfaces here. + """ + + def __init__(self, trigger_paths: List[str]): + self.trigger_paths = list(trigger_paths) + joined = ", ".join(self.trigger_paths) + super().__init__(f"Trigger path(s) already in use by another agent: {joined}") + + class AgentTriggerClient(BaseDBClient): """Client for managing agent triggers (UUID -> workflow_id mappings).""" async def get_agent_trigger_by_path( self, trigger_path: str, active_only: bool = True ) -> Optional[AgentTriggerModel]: - """Get an agent trigger by its unique path (UUID). + """Get an agent trigger by its globally unique path (UUID). Args: - trigger_path: The unique trigger UUID + trigger_path: The trigger UUID active_only: If True, only return active triggers Returns: @@ -39,13 +51,63 @@ class AgentTriggerClient(BaseDBClient): result = await session.execute(query) return result.scalar_one_or_none() + async def check_trigger_path_conflicts( + self, + trigger_paths: List[str], + exclude_workflow_id: Optional[int] = None, + ) -> List[str]: + """Return any trigger paths already in use by a different workflow. + + Archived triggers count as conflicts — we never silently repurpose + another workflow's trigger. + + Args: + trigger_paths: Paths to check + exclude_workflow_id: Workflow that may legitimately own these paths + (used during updates to ignore the workflow's own triggers) + + Returns: + List of conflicting trigger paths (empty if no conflicts). + """ + if not trigger_paths: + return [] + + async with self.async_session() as session: + query = select(AgentTriggerModel.trigger_path).where( + AgentTriggerModel.trigger_path.in_(trigger_paths), + ) + if exclude_workflow_id is not None: + query = query.where( + AgentTriggerModel.workflow_id != exclude_workflow_id + ) + result = await session.execute(query) + return [row[0] for row in result.all()] + + async def assert_trigger_paths_available( + self, + trigger_paths: List[str], + exclude_workflow_id: Optional[int] = None, + ) -> None: + """Raise TriggerPathConflictError if any path is already in use.""" + conflicts = await self.check_trigger_path_conflicts( + trigger_paths=trigger_paths, + exclude_workflow_id=exclude_workflow_id, + ) + if conflicts: + raise TriggerPathConflictError(conflicts) + async def sync_triggers_for_workflow( self, workflow_id: int, organization_id: int, trigger_paths: List[str] ) -> None: """Sync triggers for a workflow based on the trigger nodes in the workflow definition. - This creates/reactivates triggers that are in the workflow definition - and archives triggers that are no longer in the workflow. + Creates/reactivates triggers that are in the workflow definition and + archives triggers that are no longer in the workflow. + + Raises TriggerPathConflictError if any new trigger path is already in + use by another workflow. Callers should invoke + ``assert_trigger_paths_available`` upfront so the workflow is not + created/updated when a conflict will block trigger sync. Args: workflow_id: ID of the workflow @@ -53,7 +115,7 @@ class AgentTriggerClient(BaseDBClient): trigger_paths: List of trigger UUIDs from the workflow definition """ async with self.async_session() as session: - # Get all existing triggers for this workflow (including archived) + # Existing triggers tied to THIS workflow (any state) result = await session.execute( select(AgentTriggerModel).where( AgentTriggerModel.workflow_id == workflow_id @@ -63,26 +125,48 @@ class AgentTriggerClient(BaseDBClient): existing_paths = set(existing_triggers.keys()) new_paths = set(trigger_paths) + paths_to_add = new_paths - existing_paths + + # Refuse to take over a trigger owned by another workflow + # (active or archived). The global unique constraint on + # trigger_path backstops races between this check and the + # insert below. + if paths_to_add: + conflict_result = await session.execute( + select(AgentTriggerModel.trigger_path).where( + AgentTriggerModel.trigger_path.in_(paths_to_add), + AgentTriggerModel.workflow_id != workflow_id, + ) + ) + conflicts = [row[0] for row in conflict_result.all()] + if conflicts: + raise TriggerPathConflictError(conflicts) # Archive triggers that are no longer in the workflow definition paths_to_archive = existing_paths - new_paths if paths_to_archive: await session.execute( update(AgentTriggerModel) - .where(AgentTriggerModel.trigger_path.in_(paths_to_archive)) + .where( + and_( + AgentTriggerModel.workflow_id == workflow_id, + AgentTriggerModel.trigger_path.in_(paths_to_archive), + ) + ) .values(state=TriggerState.ARCHIVED.value) ) logger.info( f"Archived {len(paths_to_archive)} triggers for workflow {workflow_id}" ) - # Reactivate existing triggers that are back in the workflow + # Reactivate this workflow's previously-archived triggers paths_to_reactivate = new_paths & existing_paths if paths_to_reactivate: await session.execute( update(AgentTriggerModel) .where( and_( + AgentTriggerModel.workflow_id == workflow_id, AgentTriggerModel.trigger_path.in_(paths_to_reactivate), AgentTriggerModel.state == TriggerState.ARCHIVED.value, ) @@ -90,25 +174,15 @@ class AgentTriggerClient(BaseDBClient): .values(state=TriggerState.ACTIVE.value) ) - # Add new triggers - paths_to_add = new_paths - existing_paths for trigger_path in paths_to_add: - stmt = insert(AgentTriggerModel).values( - trigger_path=trigger_path, - workflow_id=workflow_id, - organization_id=organization_id, - state=TriggerState.ACTIVE.value, + await session.execute( + insert(AgentTriggerModel).values( + trigger_path=trigger_path, + workflow_id=workflow_id, + organization_id=organization_id, + state=TriggerState.ACTIVE.value, + ) ) - # Handle race condition where trigger might already exist for another workflow - stmt = stmt.on_conflict_do_update( - index_elements=["trigger_path"], - set_={ - "workflow_id": workflow_id, - "organization_id": organization_id, - "state": TriggerState.ACTIVE.value, - }, - ) - await session.execute(stmt) if paths_to_add: logger.info( diff --git a/api/db/models.py b/api/db/models.py index b8d4b1d..4a2f0f7 100644 --- a/api/db/models.py +++ b/api/db/models.py @@ -243,6 +243,13 @@ class WorkflowDefinitionModel(Base): class WorkflowModel(Base): __tablename__ = "workflows" id = Column(Integer, primary_key=True, index=True) + workflow_uuid = Column( + String(36), + unique=True, + nullable=False, + index=True, + default=lambda: str(uuid.uuid4()), + ) user_id = Column(Integer, ForeignKey("users.id"), nullable=True) user = relationship("UserModel", back_populates="workflows") organization_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) @@ -723,7 +730,7 @@ class AgentTriggerModel(Base): id = Column(Integer, primary_key=True, index=True) - # Unique trigger path (UUID format) - generated by UI when trigger node is created + # Globally unique trigger path (UUID format) trigger_path = Column(String(36), unique=True, nullable=False, index=True) # Link to workflow diff --git a/api/mcp_server/tools/create_workflow.py b/api/mcp_server/tools/create_workflow.py index 66dc5e8..9934b3e 100644 --- a/api/mcp_server/tools/create_workflow.py +++ b/api/mcp_server/tools/create_workflow.py @@ -26,6 +26,7 @@ from loguru import logger from pydantic import ValidationError as PydanticValidationError from api.db import db_client +from api.db.agent_trigger_client import TriggerPathConflictError from api.enums import PostHogEvent from api.mcp_server.auth import authenticate_mcp_request from api.mcp_server.tracing import traced_tool @@ -125,7 +126,20 @@ async def create_workflow(code: str) -> dict[str, Any]: except (ValueError, Exception) as e: # WorkflowGraph raises ValueError return _error_result("graph_validation", str(e)) - # 4. Persist as a new workflow with v1 published. + # 4. Reject upfront if any trigger path collides with another workflow's + # trigger in this org so we don't leave an orphan workflow record. + trigger_paths = _extract_trigger_paths(payload) + if trigger_paths: + try: + await db_client.assert_trigger_paths_available( + trigger_paths=trigger_paths, + ) + except TriggerPathConflictError as e: + return _error_result( + "trigger_path_conflict", str(e), trigger_paths=e.trigger_paths + ) + + # 5. Persist as a new workflow with v1 published. workflow = await db_client.create_workflow( name, payload, @@ -144,7 +158,6 @@ async def create_workflow(code: str) -> dict[str, Any]: }, ) - trigger_paths = _extract_trigger_paths(payload) if trigger_paths: await db_client.sync_triggers_for_workflow( workflow_id=workflow.id, diff --git a/api/routes/workflow.py b/api/routes/workflow.py index cc9ec52..b663860 100644 --- a/api/routes/workflow.py +++ b/api/routes/workflow.py @@ -12,6 +12,7 @@ from pydantic import BaseModel, Field, ValidationError from api.constants import DEPLOYMENT_MODE from api.db import db_client +from api.db.agent_trigger_client import TriggerPathConflictError from api.db.models import UserModel from api.db.workflow_template_client import WorkflowTemplateClient from api.enums import CallType, PostHogEvent, StorageBackend @@ -58,6 +59,19 @@ def extract_trigger_paths(workflow_definition: dict) -> List[str]: return trigger_paths +def _trigger_path_to_node_id(workflow_definition: dict) -> dict[str, str]: + """Map each trigger node's trigger_path to its node id.""" + if not workflow_definition: + return {} + out: dict[str, str] = {} + for node in workflow_definition.get("nodes", []): + if node.get("type") == "trigger": + tp = node.get("data", {}).get("trigger_path") + if tp: + out[tp] = node.get("id") + return out + + def regenerate_trigger_uuids(workflow_definition: dict) -> dict: """Regenerate UUIDs for all trigger nodes in a workflow definition. @@ -89,6 +103,28 @@ def regenerate_trigger_uuids(workflow_definition: dict) -> dict: return updated_definition +def ensure_trigger_paths(workflow_definition: Optional[dict]) -> Optional[dict]: + """Mint a UUID for any trigger node that's missing ``data.trigger_path``. + + Trigger nodes that already carry a non-empty trigger_path are left + untouched so stable IDs survive edits. The input is not mutated; the + returned dict is what should be persisted and echoed in the response. + """ + if not workflow_definition: + return workflow_definition + + import copy + + out = copy.deepcopy(workflow_definition) + for node in out.get("nodes") or []: + if node.get("type") != "trigger": + continue + data = node.setdefault("data", {}) + if not data.get("trigger_path"): + data["trigger_path"] = str(uuid.uuid4()) + return out + + router = APIRouter(prefix="/workflow") @@ -97,6 +133,96 @@ class ValidateWorkflowResponse(BaseModel): errors: list[WorkflowError] +def _trigger_conflict_http_exception( + workflow_definition: Optional[dict], conflicting_paths: list[str] +) -> HTTPException: + """Build a 409 with the same detail shape as validate's 422 so the editor + can highlight the offending trigger node(s) using the same code path.""" + path_to_node = ( + _trigger_path_to_node_id(workflow_definition) if workflow_definition else {} + ) + errors: list[WorkflowError] = [ + WorkflowError( + kind=ItemKind.node, + id=path_to_node.get(p), + field="data.trigger_path", + message=( + "Trigger path is already in use. Please choose another one " + "or leave empty to use a unique one generated by the server." + ), + ) + for p in conflicting_paths + ] + return HTTPException( + status_code=409, + detail=ValidateWorkflowResponse(is_valid=False, errors=errors).model_dump(), + ) + + +async def _validate_workflow_definition( + workflow_definition: Optional[dict], + exclude_workflow_id: Optional[int] = None, +) -> list[WorkflowError]: + """Run DTO + graph + trigger-conflict checks on a workflow definition. + + Returns the list of errors (empty if the definition is valid). This is + the single source of truth for "is this workflow valid?" — used by the + /validate route (read-only audit) and the /publish route (gate). + """ + errors: list[WorkflowError] = [] + if not workflow_definition: + return errors + + # ----------- DTO Validation ------------ + dto: Optional[ReactFlowDTO] = None + try: + dto = ReactFlowDTO.model_validate(workflow_definition) + except ValidationError as exc: + errors.extend(_transform_schema_errors(exc, workflow_definition)) + + # ----------- Graph Validation if DTO is valid ------------ + try: + if dto: + WorkflowGraph(dto) + except ValueError as e: + errors.extend(e.args[0]) + + # ----------- Trigger Path Conflict Check ------------ + trigger_paths = extract_trigger_paths(workflow_definition) + if trigger_paths: + conflicts = await db_client.check_trigger_path_conflicts( + trigger_paths=trigger_paths, + exclude_workflow_id=exclude_workflow_id, + ) + if conflicts: + path_to_node = _trigger_path_to_node_id(workflow_definition) + for conflicting_path in conflicts: + errors.append( + WorkflowError( + kind=ItemKind.node, + id=path_to_node.get(conflicting_path), + field="data.trigger_path", + message=( + "Trigger path is already in use. Please choose " + "another one or leave empty to use a unique one " + "generated by the server." + ), + ) + ) + + return errors + + +def _validation_errors_http_exception( + errors: list[WorkflowError], status_code: int = 422 +) -> HTTPException: + """Wrap a list of validation errors in the response shape clients expect.""" + return HTTPException( + status_code=status_code, + detail=ValidateWorkflowResponse(is_valid=False, errors=errors).model_dump(), + ) + + class CallDispositionCodes(BaseModel): disposition_codes: list[str] = [] @@ -219,34 +345,18 @@ async def validate_workflow( status_code=404, detail=f"Workflow with id {workflow_id} not found" ) - errors: list[WorkflowError] = [] - # Validate draft if it exists (user is editing), else validate published draft = await db_client.get_draft_version(workflow_id) workflow_definition = ( draft.workflow_json if draft else workflow.released_definition.workflow_json ) - # ----------- DTO Validation ------------ - dto: Optional[ReactFlowDTO] = None - - try: - dto = ReactFlowDTO.model_validate(workflow_definition) - except ValidationError as exc: - errors.extend(_transform_schema_errors(exc, workflow_definition)) - - # ----------- Graph Validation if DTO is valid ------------ - try: - if dto: - WorkflowGraph(dto) - except ValueError as e: - errors.extend(e.args[0]) + errors = await _validate_workflow_definition( + workflow_definition, exclude_workflow_id=workflow_id + ) if errors: - raise HTTPException( - status_code=422, - detail=ValidateWorkflowResponse(is_valid=False, errors=errors).model_dump(), - ) + raise _validation_errors_http_exception(errors) return ValidateWorkflowResponse(is_valid=True, errors=[]) @@ -290,9 +400,26 @@ async def create_workflow( request: The create workflow request user: The user to create the workflow for """ + # Auto-mint trigger_path for any trigger node that didn't ship one so + # clients don't need to generate UUIDs themselves. + workflow_definition = ensure_trigger_paths(request.workflow_definition) + + # Validate trigger path uniqueness BEFORE creating the workflow so we + # don't leave an orphaned workflow record when the trigger conflicts. + trigger_paths = ( + extract_trigger_paths(workflow_definition) if workflow_definition else [] + ) + if trigger_paths: + try: + await db_client.assert_trigger_paths_available( + trigger_paths=trigger_paths, + ) + except TriggerPathConflictError as e: + raise _trigger_conflict_http_exception(workflow_definition, e.trigger_paths) + workflow = await db_client.create_workflow( request.name, - request.workflow_definition, + workflow_definition, user.id, user.selected_organization_id, ) @@ -308,22 +435,19 @@ async def create_workflow( }, ) - # Sync agent triggers if workflow definition contains any - if request.workflow_definition: - trigger_paths = extract_trigger_paths(request.workflow_definition) - if trigger_paths: - await db_client.sync_triggers_for_workflow( - workflow_id=workflow.id, - organization_id=user.selected_organization_id, - trigger_paths=trigger_paths, - ) + if trigger_paths: + await db_client.sync_triggers_for_workflow( + workflow_id=workflow.id, + organization_id=user.selected_organization_id, + trigger_paths=trigger_paths, + ) return { "id": workflow.id, "name": workflow.name, "status": workflow.status, "created_at": workflow.created_at, - "workflow_definition": mask_workflow_definition(request.workflow_definition), + "workflow_definition": mask_workflow_definition(workflow_definition), "current_definition_id": workflow.current_definition_id, "template_context_variables": workflow.template_context_variables, "call_disposition_codes": workflow.call_disposition_codes, @@ -379,6 +503,16 @@ async def create_workflow_from_template( workflow_def = regenerate_trigger_uuids( workflow_data.get("workflow_definition", {}) ) + + trigger_paths = extract_trigger_paths(workflow_def) if workflow_def else [] + if trigger_paths: + try: + await db_client.assert_trigger_paths_available( + trigger_paths=trigger_paths, + ) + except TriggerPathConflictError as e: + raise HTTPException(status_code=409, detail=str(e)) + workflow = await db_client.create_workflow( name=workflow_data.get("name", f"{request.use_case} - {request.call_type}"), workflow_definition=workflow_def, @@ -399,15 +533,12 @@ async def create_workflow_from_template( }, ) - # Sync agent triggers if workflow definition contains any - if workflow_def: - trigger_paths = extract_trigger_paths(workflow_def) - if trigger_paths: - await db_client.sync_triggers_for_workflow( - workflow_id=workflow.id, - organization_id=user.selected_organization_id, - trigger_paths=trigger_paths, - ) + if trigger_paths: + await db_client.sync_triggers_for_workflow( + workflow_id=workflow.id, + organization_id=user.selected_organization_id, + trigger_paths=trigger_paths, + ) return { "id": workflow.id, @@ -421,6 +552,8 @@ async def create_workflow_from_template( "workflow_configurations": workflow.workflow_configurations, } + except HTTPException: + raise except HTTPStatusError as e: logger.error(f"MPS API error: {e}") raise HTTPException( @@ -601,7 +734,12 @@ async def publish_workflow( workflow_id: int, user: UserModel = Depends(get_user), ): - """Publish the current draft version of a workflow.""" + """Publish the current draft version of a workflow. + + Drafts are allowed to be incomplete (so the editor can save mid-edit), + but a published version is what runtime executes — so this is the gate + where the full DTO + graph + trigger-conflict checks must pass. + """ workflow = await db_client.get_workflow( workflow_id, organization_id=user.selected_organization_id ) @@ -610,6 +748,16 @@ async def publish_workflow( status_code=404, detail=f"Workflow with id {workflow_id} not found" ) + draft = await db_client.get_draft_version(workflow_id) + if draft is None: + raise HTTPException(status_code=400, detail="No draft to publish") + + errors = await _validate_workflow_definition( + draft.workflow_json, exclude_workflow_id=workflow_id + ) + if errors: + raise _validation_errors_http_exception(errors) + try: published = await db_client.publish_workflow_draft(workflow_id) except ValueError as e: @@ -750,6 +898,10 @@ async def update_workflow( # node.data / edge.data before anything touches the DB — the UI sends # nodes wholesale from the React Flow store, which carries those. workflow_definition = sanitize_workflow_definition(request.workflow_definition) + # Mint trigger_path for any trigger node that ships without one. The + # response echoes workflow_definition so the client picks up the new + # UUID without a refetch. + workflow_definition = ensure_trigger_paths(workflow_definition) if workflow_definition: existing_workflow = await db_client.get_workflow( workflow_id, organization_id=user.selected_organization_id @@ -786,6 +938,22 @@ async def update_workflow( except ValueError as e: raise HTTPException(status_code=422, detail=str(e)) + # Reject upfront if any new trigger path collides with another + # workflow's trigger — keeps the workflow record from + # being updated when the trigger sync would fail. + if workflow_definition: + new_trigger_paths = extract_trigger_paths(workflow_definition) + if new_trigger_paths: + try: + await db_client.assert_trigger_paths_available( + trigger_paths=new_trigger_paths, + exclude_workflow_id=workflow_id, + ) + except TriggerPathConflictError as e: + raise _trigger_conflict_http_exception( + workflow_definition, e.trigger_paths + ) + workflow = await db_client.update_workflow( workflow_id=workflow_id, name=request.name, @@ -1111,6 +1279,16 @@ async def duplicate_workflow_template( # Create a new workflow from the template # Regenerate trigger UUIDs to avoid conflicts with existing triggers workflow_def = regenerate_trigger_uuids(template.template_json) + + trigger_paths = extract_trigger_paths(workflow_def) if workflow_def else [] + if trigger_paths: + try: + await db_client.assert_trigger_paths_available( + trigger_paths=trigger_paths, + ) + except TriggerPathConflictError as e: + raise HTTPException(status_code=409, detail=str(e)) + workflow = await db_client.create_workflow( request.workflow_name, workflow_def, @@ -1118,15 +1296,12 @@ async def duplicate_workflow_template( user.selected_organization_id, ) - # Sync agent triggers if template contains any - if workflow_def: - trigger_paths = extract_trigger_paths(workflow_def) - if trigger_paths: - await db_client.sync_triggers_for_workflow( - workflow_id=workflow.id, - organization_id=user.selected_organization_id, - trigger_paths=trigger_paths, - ) + if trigger_paths: + await db_client.sync_triggers_for_workflow( + workflow_id=workflow.id, + organization_id=user.selected_organization_id, + trigger_paths=trigger_paths, + ) return { "id": workflow.id, diff --git a/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts b/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts index 7411c97..f190d6d 100644 --- a/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts +++ b/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts @@ -25,6 +25,23 @@ import logger from '@/lib/logger'; import { getNextNodeId, getRandomId } from "@/lib/utils"; import { DEFAULT_WORKFLOW_CONFIGURATIONS, WorkflowConfigurations } from "@/types/workflow-configurations"; +// Pull a WorkflowError[] out of any validate-shaped payload — works whether +// the body is the raw `{ is_valid, errors }` (validate success-with-errors) +// or wrapped as `{ detail: { is_valid, errors } }` (HTTPException body for +// validate's 422 and save's 409). Returns [] for any other shape so callers +// can tell "no structured errors in this response" from "valid". +function extractWorkflowErrors(payload: unknown): WorkflowError[] { + if (!payload || typeof payload !== "object") return []; + const p = payload as { + is_valid?: boolean; + errors?: WorkflowError[]; + detail?: { is_valid?: boolean; errors?: WorkflowError[] } | string; + }; + if (p.is_valid === false && p.errors) return p.errors; + if (typeof p.detail === "object" && p.detail?.errors) return p.detail.errors; + return []; +} + // Build initial node data from spec defaults. Replaces the per-type // hardcoded `getNewNode` switch — adding a new node type is now zero // frontend code: declare the spec on the backend and the defaults flow @@ -238,6 +255,29 @@ export const useWorkflowState = ({ setIsDirty(true); }; + // Replace the canvas's validation state with `errors`. Always clears any + // prior invalid markers first, so passing [] is the "workflow is now + // valid" path. + const applyWorkflowErrors = useCallback( + (errors: WorkflowError[]) => { + clearValidationErrors(); + errors.forEach((error) => { + if (error.kind === "node" && error.id) { + markNodeAsInvalid(error.id, error.message); + } else if (error.kind === "edge" && error.id) { + markEdgeAsInvalid(error.id, error.message); + } + }); + setWorkflowValidationErrors(errors); + }, + [ + clearValidationErrors, + markNodeAsInvalid, + markEdgeAsInvalid, + setWorkflowValidationErrors, + ], + ); + // Validate workflow function const validateWorkflow = useCallback(async () => { if (!user?.id) return; @@ -247,58 +287,16 @@ export const useWorkflowState = ({ workflow_id: workflowId, }, }); - - // Clear validation errors first - clearValidationErrors(); - - // Check if we have validation errors - if (response.error) { - let errors: WorkflowError[] = []; - const errorResponse = response.error as { - is_valid?: boolean; - errors?: WorkflowError[]; - detail?: { errors: WorkflowError[] }; - }; - - if (errorResponse.is_valid === false && errorResponse.errors) { - errors = errorResponse.errors; - } else if (errorResponse.detail?.errors) { - errors = errorResponse.detail.errors; - } - - if (errors.length > 0) { - // Update nodes with validation state - errors.forEach((error) => { - if (error.kind === 'node' && error.id) { - markNodeAsInvalid(error.id, error.message); - } else if (error.kind === 'edge' && error.id) { - markEdgeAsInvalid(error.id, error.message); - } - }); - - setWorkflowValidationErrors(errors); - } - } else if (response.data) { - if (response.data.is_valid === false && response.data.errors) { - const errors = response.data.errors; - - errors.forEach((error: WorkflowError) => { - if (error.kind === 'node' && error.id) { - markNodeAsInvalid(error.id, error.message); - } else if (error.kind === 'edge' && error.id) { - markEdgeAsInvalid(error.id, error.message); - } - }); - - setWorkflowValidationErrors(errors); - } else { - logger.info('Workflow is valid'); - } - } + // 422 surfaces under response.error, 200 with is_valid=true under + // response.data. extractWorkflowErrors normalises both — empty + // list means "valid" and clears any stale highlights. + applyWorkflowErrors( + extractWorkflowErrors(response.error ?? response.data), + ); } catch (error: unknown) { logger.error(`Unexpected validation error: ${error}`); } - }, [workflowId, user, clearValidationErrors, markNodeAsInvalid, markEdgeAsInvalid, setWorkflowValidationErrors]); + }, [workflowId, user, applyWorkflowErrors]); // Save workflow function. Returns version info from the API response. const saveWorkflow = useCallback(async (updateWorkflowDefinition: boolean = true): Promise<{ versionNumber?: number; versionStatus?: string } | undefined> => { @@ -311,6 +309,7 @@ export const useWorkflowState = ({ const viewport = rfInstance.current.getViewport(); const flow = { nodes: currentNodes, edges: currentEdges, viewport }; let result: { versionNumber?: number; versionStatus?: string } | undefined; + let saveSucceeded = false; try { const response = await updateWorkflowApiV1WorkflowWorkflowIdPut({ path: { @@ -321,21 +320,59 @@ export const useWorkflowState = ({ workflow_definition: updateWorkflowDefinition ? flow : null, }, }); - setIsDirty(false); - if (response.data) { - result = { - versionNumber: response.data.version_number ?? undefined, - versionStatus: response.data.version_status ?? undefined, - }; + if (response.error) { + // Backend rejected the save (e.g. 409 trigger-path conflict). + // When it carries structured WorkflowError items, reuse the + // validate pipeline so the offending node/edge gets + // highlighted in-canvas. We only apply when there are + // structured errors — a non-structured failure (network, + // 500) shouldn't wipe the existing validation state. + const workflowErrors = extractWorkflowErrors(response.error); + if (workflowErrors.length > 0) { + applyWorkflowErrors(workflowErrors); + } + logger.error(`Error saving workflow: ${JSON.stringify(response.error)}`); + } else { + setIsDirty(false); + if (response.data) { + // Reload server state into the canvas — the backend may + // have mutated the definition (e.g. minted a missing + // trigger_path) and is the source of truth post-save. + // Passing no `changes` arg skips history/dirty tracking. + const wf = response.data.workflow_definition as + | { nodes?: FlowNode[]; edges?: FlowEdge[] } + | undefined; + if (wf?.nodes) setNodes(wf.nodes); + if (wf?.edges) setEdges(wf.edges); + result = { + versionNumber: response.data.version_number ?? undefined, + versionStatus: response.data.version_status ?? undefined, + }; + saveSucceeded = true; + } } } catch (error) { logger.error(`Error saving workflow: ${error}`); } - // Validate after saving - await validateWorkflow(); + // Only run validate after a successful save — when save failed we've + // already populated the validation state from the error response and + // re-running validate would clear those errors (validate reads the + // unchanged DB state, which won't surface the user's pending issue). + if (saveSucceeded) { + await validateWorkflow(); + } return result; - }, [workflowId, workflowName, setIsDirty, user, validateWorkflow]); + }, [ + workflowId, + workflowName, + setIsDirty, + setNodes, + setEdges, + user, + validateWorkflow, + applyWorkflowErrors, + ]); // Set up keyboard shortcut for save (Cmd/Ctrl + S) useEffect(() => { diff --git a/ui/src/client/sdk.gen.ts b/ui/src/client/sdk.gen.ts index 2a0219b..1e94a7a 100644 --- a/ui/src/client/sdk.gen.ts +++ b/ui/src/client/sdk.gen.ts @@ -279,6 +279,10 @@ export const getWorkflowVersionsApiV1WorkflowWorkflowIdVersionsGet = (options: Options) => (options.client ?? client).post({ url: '/api/v1/workflow/{workflow_id}/publish', ...options });