fix: make trigger paths globally unique

This commit is contained in:
Abhishek Kumar 2026-04-25 19:44:44 +05:30
parent 3e3773f400
commit a1d4a1fab2
7 changed files with 496 additions and 137 deletions

View file

@ -12,6 +12,7 @@ from pydantic import BaseModel, Field, ValidationError
from api.constants import DEPLOYMENT_MODE
from api.db import db_client
from api.db.agent_trigger_client import TriggerPathConflictError
from api.db.models import UserModel
from api.db.workflow_template_client import WorkflowTemplateClient
from api.enums import CallType, PostHogEvent, StorageBackend
@ -58,6 +59,19 @@ def extract_trigger_paths(workflow_definition: dict) -> List[str]:
return trigger_paths
def _trigger_path_to_node_id(workflow_definition: dict) -> dict[str, str]:
"""Map each trigger node's trigger_path to its node id."""
if not workflow_definition:
return {}
out: dict[str, str] = {}
for node in workflow_definition.get("nodes", []):
if node.get("type") == "trigger":
tp = node.get("data", {}).get("trigger_path")
if tp:
out[tp] = node.get("id")
return out
def regenerate_trigger_uuids(workflow_definition: dict) -> dict:
"""Regenerate UUIDs for all trigger nodes in a workflow definition.
@ -89,6 +103,28 @@ def regenerate_trigger_uuids(workflow_definition: dict) -> dict:
return updated_definition
def ensure_trigger_paths(workflow_definition: Optional[dict]) -> Optional[dict]:
"""Mint a UUID for any trigger node that's missing ``data.trigger_path``.
Trigger nodes that already carry a non-empty trigger_path are left
untouched so stable IDs survive edits. The input is not mutated; the
returned dict is what should be persisted and echoed in the response.
"""
if not workflow_definition:
return workflow_definition
import copy
out = copy.deepcopy(workflow_definition)
for node in out.get("nodes") or []:
if node.get("type") != "trigger":
continue
data = node.setdefault("data", {})
if not data.get("trigger_path"):
data["trigger_path"] = str(uuid.uuid4())
return out
router = APIRouter(prefix="/workflow")
@ -97,6 +133,96 @@ class ValidateWorkflowResponse(BaseModel):
errors: list[WorkflowError]
def _trigger_conflict_http_exception(
workflow_definition: Optional[dict], conflicting_paths: list[str]
) -> HTTPException:
"""Build a 409 with the same detail shape as validate's 422 so the editor
can highlight the offending trigger node(s) using the same code path."""
path_to_node = (
_trigger_path_to_node_id(workflow_definition) if workflow_definition else {}
)
errors: list[WorkflowError] = [
WorkflowError(
kind=ItemKind.node,
id=path_to_node.get(p),
field="data.trigger_path",
message=(
"Trigger path is already in use. Please choose another one "
"or leave empty to use a unique one generated by the server."
),
)
for p in conflicting_paths
]
return HTTPException(
status_code=409,
detail=ValidateWorkflowResponse(is_valid=False, errors=errors).model_dump(),
)
async def _validate_workflow_definition(
workflow_definition: Optional[dict],
exclude_workflow_id: Optional[int] = None,
) -> list[WorkflowError]:
"""Run DTO + graph + trigger-conflict checks on a workflow definition.
Returns the list of errors (empty if the definition is valid). This is
the single source of truth for "is this workflow valid?" used by the
/validate route (read-only audit) and the /publish route (gate).
"""
errors: list[WorkflowError] = []
if not workflow_definition:
return errors
# ----------- DTO Validation ------------
dto: Optional[ReactFlowDTO] = None
try:
dto = ReactFlowDTO.model_validate(workflow_definition)
except ValidationError as exc:
errors.extend(_transform_schema_errors(exc, workflow_definition))
# ----------- Graph Validation if DTO is valid ------------
try:
if dto:
WorkflowGraph(dto)
except ValueError as e:
errors.extend(e.args[0])
# ----------- Trigger Path Conflict Check ------------
trigger_paths = extract_trigger_paths(workflow_definition)
if trigger_paths:
conflicts = await db_client.check_trigger_path_conflicts(
trigger_paths=trigger_paths,
exclude_workflow_id=exclude_workflow_id,
)
if conflicts:
path_to_node = _trigger_path_to_node_id(workflow_definition)
for conflicting_path in conflicts:
errors.append(
WorkflowError(
kind=ItemKind.node,
id=path_to_node.get(conflicting_path),
field="data.trigger_path",
message=(
"Trigger path is already in use. Please choose "
"another one or leave empty to use a unique one "
"generated by the server."
),
)
)
return errors
def _validation_errors_http_exception(
errors: list[WorkflowError], status_code: int = 422
) -> HTTPException:
"""Wrap a list of validation errors in the response shape clients expect."""
return HTTPException(
status_code=status_code,
detail=ValidateWorkflowResponse(is_valid=False, errors=errors).model_dump(),
)
class CallDispositionCodes(BaseModel):
disposition_codes: list[str] = []
@ -219,34 +345,18 @@ async def validate_workflow(
status_code=404, detail=f"Workflow with id {workflow_id} not found"
)
errors: list[WorkflowError] = []
# Validate draft if it exists (user is editing), else validate published
draft = await db_client.get_draft_version(workflow_id)
workflow_definition = (
draft.workflow_json if draft else workflow.released_definition.workflow_json
)
# ----------- DTO Validation ------------
dto: Optional[ReactFlowDTO] = None
try:
dto = ReactFlowDTO.model_validate(workflow_definition)
except ValidationError as exc:
errors.extend(_transform_schema_errors(exc, workflow_definition))
# ----------- Graph Validation if DTO is valid ------------
try:
if dto:
WorkflowGraph(dto)
except ValueError as e:
errors.extend(e.args[0])
errors = await _validate_workflow_definition(
workflow_definition, exclude_workflow_id=workflow_id
)
if errors:
raise HTTPException(
status_code=422,
detail=ValidateWorkflowResponse(is_valid=False, errors=errors).model_dump(),
)
raise _validation_errors_http_exception(errors)
return ValidateWorkflowResponse(is_valid=True, errors=[])
@ -290,9 +400,26 @@ async def create_workflow(
request: The create workflow request
user: The user to create the workflow for
"""
# Auto-mint trigger_path for any trigger node that didn't ship one so
# clients don't need to generate UUIDs themselves.
workflow_definition = ensure_trigger_paths(request.workflow_definition)
# Validate trigger path uniqueness BEFORE creating the workflow so we
# don't leave an orphaned workflow record when the trigger conflicts.
trigger_paths = (
extract_trigger_paths(workflow_definition) if workflow_definition else []
)
if trigger_paths:
try:
await db_client.assert_trigger_paths_available(
trigger_paths=trigger_paths,
)
except TriggerPathConflictError as e:
raise _trigger_conflict_http_exception(workflow_definition, e.trigger_paths)
workflow = await db_client.create_workflow(
request.name,
request.workflow_definition,
workflow_definition,
user.id,
user.selected_organization_id,
)
@ -308,22 +435,19 @@ async def create_workflow(
},
)
# Sync agent triggers if workflow definition contains any
if request.workflow_definition:
trigger_paths = extract_trigger_paths(request.workflow_definition)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
return {
"id": workflow.id,
"name": workflow.name,
"status": workflow.status,
"created_at": workflow.created_at,
"workflow_definition": mask_workflow_definition(request.workflow_definition),
"workflow_definition": mask_workflow_definition(workflow_definition),
"current_definition_id": workflow.current_definition_id,
"template_context_variables": workflow.template_context_variables,
"call_disposition_codes": workflow.call_disposition_codes,
@ -379,6 +503,16 @@ async def create_workflow_from_template(
workflow_def = regenerate_trigger_uuids(
workflow_data.get("workflow_definition", {})
)
trigger_paths = extract_trigger_paths(workflow_def) if workflow_def else []
if trigger_paths:
try:
await db_client.assert_trigger_paths_available(
trigger_paths=trigger_paths,
)
except TriggerPathConflictError as e:
raise HTTPException(status_code=409, detail=str(e))
workflow = await db_client.create_workflow(
name=workflow_data.get("name", f"{request.use_case} - {request.call_type}"),
workflow_definition=workflow_def,
@ -399,15 +533,12 @@ async def create_workflow_from_template(
},
)
# Sync agent triggers if workflow definition contains any
if workflow_def:
trigger_paths = extract_trigger_paths(workflow_def)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
return {
"id": workflow.id,
@ -421,6 +552,8 @@ async def create_workflow_from_template(
"workflow_configurations": workflow.workflow_configurations,
}
except HTTPException:
raise
except HTTPStatusError as e:
logger.error(f"MPS API error: {e}")
raise HTTPException(
@ -601,7 +734,12 @@ async def publish_workflow(
workflow_id: int,
user: UserModel = Depends(get_user),
):
"""Publish the current draft version of a workflow."""
"""Publish the current draft version of a workflow.
Drafts are allowed to be incomplete (so the editor can save mid-edit),
but a published version is what runtime executes so this is the gate
where the full DTO + graph + trigger-conflict checks must pass.
"""
workflow = await db_client.get_workflow(
workflow_id, organization_id=user.selected_organization_id
)
@ -610,6 +748,16 @@ async def publish_workflow(
status_code=404, detail=f"Workflow with id {workflow_id} not found"
)
draft = await db_client.get_draft_version(workflow_id)
if draft is None:
raise HTTPException(status_code=400, detail="No draft to publish")
errors = await _validate_workflow_definition(
draft.workflow_json, exclude_workflow_id=workflow_id
)
if errors:
raise _validation_errors_http_exception(errors)
try:
published = await db_client.publish_workflow_draft(workflow_id)
except ValueError as e:
@ -750,6 +898,10 @@ async def update_workflow(
# node.data / edge.data before anything touches the DB — the UI sends
# nodes wholesale from the React Flow store, which carries those.
workflow_definition = sanitize_workflow_definition(request.workflow_definition)
# Mint trigger_path for any trigger node that ships without one. The
# response echoes workflow_definition so the client picks up the new
# UUID without a refetch.
workflow_definition = ensure_trigger_paths(workflow_definition)
if workflow_definition:
existing_workflow = await db_client.get_workflow(
workflow_id, organization_id=user.selected_organization_id
@ -786,6 +938,22 @@ async def update_workflow(
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
# Reject upfront if any new trigger path collides with another
# workflow's trigger — keeps the workflow record from
# being updated when the trigger sync would fail.
if workflow_definition:
new_trigger_paths = extract_trigger_paths(workflow_definition)
if new_trigger_paths:
try:
await db_client.assert_trigger_paths_available(
trigger_paths=new_trigger_paths,
exclude_workflow_id=workflow_id,
)
except TriggerPathConflictError as e:
raise _trigger_conflict_http_exception(
workflow_definition, e.trigger_paths
)
workflow = await db_client.update_workflow(
workflow_id=workflow_id,
name=request.name,
@ -1111,6 +1279,16 @@ async def duplicate_workflow_template(
# Create a new workflow from the template
# Regenerate trigger UUIDs to avoid conflicts with existing triggers
workflow_def = regenerate_trigger_uuids(template.template_json)
trigger_paths = extract_trigger_paths(workflow_def) if workflow_def else []
if trigger_paths:
try:
await db_client.assert_trigger_paths_available(
trigger_paths=trigger_paths,
)
except TriggerPathConflictError as e:
raise HTTPException(status_code=409, detail=str(e))
workflow = await db_client.create_workflow(
request.workflow_name,
workflow_def,
@ -1118,15 +1296,12 @@ async def duplicate_workflow_template(
user.selected_organization_id,
)
# Sync agent triggers if template contains any
if workflow_def:
trigger_paths = extract_trigger_paths(workflow_def)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
return {
"id": workflow.id,