dograh/api/services/workflow/trigger_paths.py

143 lines
4.5 KiB
Python
Raw Permalink Normal View History

import copy
import re
import uuid
from dataclasses import dataclass
from typing import Optional
TRIGGER_PATH_MAX_LENGTH = 36
TRIGGER_PATH_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$")
@dataclass(frozen=True)
class TriggerPathIssue:
node_id: str | None
trigger_path: str
message: str
def extract_trigger_paths(workflow_definition: Optional[dict]) -> list[str]:
"""Extract trigger paths from a workflow definition."""
if not workflow_definition:
return []
trigger_paths: list[str] = []
for node in workflow_definition.get("nodes") or []:
if node.get("type") != "trigger":
continue
trigger_path = (node.get("data") or {}).get("trigger_path")
if isinstance(trigger_path, str) and trigger_path:
trigger_paths.append(trigger_path)
return trigger_paths
def trigger_path_to_node_id(workflow_definition: Optional[dict]) -> dict[str, str]:
"""Map each trigger node's trigger_path to its node id."""
if not workflow_definition:
return {}
out: dict[str, str] = {}
for node in workflow_definition.get("nodes") or []:
if node.get("type") != "trigger":
continue
trigger_path = (node.get("data") or {}).get("trigger_path")
if isinstance(trigger_path, str) and trigger_path:
out[trigger_path] = node.get("id")
return out
def regenerate_trigger_uuids(workflow_definition: Optional[dict]) -> Optional[dict]:
"""Regenerate UUIDs for all trigger nodes in a workflow definition."""
if not workflow_definition:
return workflow_definition
updated_definition = copy.deepcopy(workflow_definition)
for node in updated_definition.get("nodes") or []:
if node.get("type") != "trigger":
continue
data = node.setdefault("data", {})
data["trigger_path"] = str(uuid.uuid4())
return updated_definition
def ensure_trigger_paths(workflow_definition: Optional[dict]) -> Optional[dict]:
"""Mint UUIDs for trigger nodes that do not already have a path."""
if not workflow_definition:
return workflow_definition
out = copy.deepcopy(workflow_definition)
for node in out.get("nodes") or []:
if node.get("type") != "trigger":
continue
data = node.setdefault("data", {})
if not data.get("trigger_path"):
data["trigger_path"] = str(uuid.uuid4())
return out
def validate_trigger_paths(
workflow_definition: Optional[dict],
) -> list[TriggerPathIssue]:
"""Validate custom trigger paths before they reach persistence/runtime."""
if not workflow_definition:
return []
issues: list[TriggerPathIssue] = []
seen_paths: dict[str, str | None] = {}
for node in workflow_definition.get("nodes") or []:
if node.get("type") != "trigger":
continue
node_id = node.get("id")
trigger_path = (node.get("data") or {}).get("trigger_path")
if not trigger_path:
continue
if not isinstance(trigger_path, str):
issues.append(
TriggerPathIssue(
node_id=node_id,
trigger_path=repr(trigger_path),
message="Trigger path must be a string.",
)
)
continue
if len(trigger_path) > TRIGGER_PATH_MAX_LENGTH:
issues.append(
TriggerPathIssue(
node_id=node_id,
trigger_path=trigger_path,
message=(
f"Trigger path must be {TRIGGER_PATH_MAX_LENGTH} "
"characters or fewer."
),
)
)
if not TRIGGER_PATH_PATTERN.fullmatch(trigger_path):
issues.append(
TriggerPathIssue(
node_id=node_id,
trigger_path=trigger_path,
message=(
"Trigger path must be a single URL path segment using "
"only letters, numbers, hyphens, and underscores."
),
)
)
first_node_id = seen_paths.get(trigger_path)
if first_node_id is None:
seen_paths[trigger_path] = node_id
else:
issues.append(
TriggerPathIssue(
node_id=node_id,
trigger_path=trigger_path,
message="Trigger path is duplicated in this workflow.",
)
)
return issues