diff --git a/api/services/workflow/audit.py b/api/services/workflow/audit.py new file mode 100644 index 0000000..0265375 --- /dev/null +++ b/api/services/workflow/audit.py @@ -0,0 +1,99 @@ +"""Rule-based audit of a workflow definition's nodes + edges. + +Pure, dependency-free helpers derived from `NodeSpec.graph_constraints`. +Lives in tracked code so the regression tests in +`test_workflow_graph_constraints.py` can pin it; the admin cleanup +script in `api/services/admin_utils/local_exec.py` is the production +consumer. +""" + +from api.services.workflow.node_specs import REGISTRY + + +def _build_type_rules() -> tuple[set[str], set[str]]: + """From NodeSpec.graph_constraints, derive the set of types that are + forbidden as edge sources (max_outgoing == 0) and as targets + (max_incoming == 0).""" + src_forbidden: set[str] = set() + tgt_forbidden: set[str] = set() + for name, spec in REGISTRY.items(): + gc = spec.graph_constraints + if gc is None: + continue + if gc.max_outgoing == 0: + src_forbidden.add(name) + if gc.max_incoming == 0: + tgt_forbidden.add(name) + return src_forbidden, tgt_forbidden + + +def _empty_violation(reason: str) -> dict: + """Graph-level violation row — no edge metadata to attach.""" + return { + "edge_id": "(graph)", + "source_id": None, + "source_type": None, + "target_id": None, + "target_type": None, + "edge_label": None, + "reason": reason, + } + + +def audit_definition(nodes, edges) -> list[dict]: + """Rule-based audit — emits one row per offending edge. + + Used by the cleanup migration which needs per-edge granularity to + know what to strip. Pinned by tests in test_workflow_graph_constraints.py. + """ + if not isinstance(nodes, list) or not isinstance(edges, list): + return [] + + src_forbidden, tgt_forbidden = _build_type_rules() + nodes_by_id: dict = {} + for n in nodes: + if isinstance(n, dict) and "id" in n: + nodes_by_id[n["id"]] = n.get("type") + + violations: list[dict] = [] + + # Graph-level: WorkflowGraph._assert_start_node requires exactly one + # startCall node. The DTO doesn't enforce this, so legacy or + # script-edited rows can land in a state that fails at runtime. + start_count = sum(1 for t in nodes_by_id.values() if t == "startCall") + if start_count == 0: + violations.append(_empty_violation("no_start_node")) + elif start_count > 1: + violations.append(_empty_violation(f"multiple_start_nodes:{start_count}")) + for e in edges: + if not isinstance(e, dict): + continue + src = e.get("source") + tgt = e.get("target") + eid = e.get("id") or f"{src}->{tgt}" + src_type = nodes_by_id.get(src) if src is not None else None + tgt_type = nodes_by_id.get(tgt) if tgt is not None else None + + reasons: list[str] = [] + if src is None or src not in nodes_by_id: + reasons.append("source_id_missing") + if tgt is None or tgt not in nodes_by_id: + reasons.append("target_id_missing") + if src_type in src_forbidden: + reasons.append(f"source_max_outgoing_0:{src_type}") + if tgt_type in tgt_forbidden: + reasons.append(f"target_max_incoming_0:{tgt_type}") + + for r in reasons: + violations.append( + { + "edge_id": eid, + "source_id": src, + "source_type": src_type, + "target_id": tgt, + "target_type": tgt_type, + "edge_label": (e.get("data") or {}).get("label") if isinstance(e.get("data"), dict) else None, + "reason": r, + } + ) + return violations diff --git a/api/tests/test_workflow_graph_constraints.py b/api/tests/test_workflow_graph_constraints.py index eaf57aa..bb351f8 100644 --- a/api/tests/test_workflow_graph_constraints.py +++ b/api/tests/test_workflow_graph_constraints.py @@ -1,5 +1,5 @@ """Regression tests for WorkflowGraph edge/graph constraints + the -admin audit script that mirrors them. +rule-based audit that mirrors them. Each fixture in `dto_fixtures/` is either a clean workflow or a single category of violation we found in production. We pin two layers: @@ -8,10 +8,10 @@ category of violation we found in production. We pin two layers: both MCP tools. Driven by `NodeSpec.graph_constraints`. If this layer ever stops rejecting one of these fixtures, the production write paths will quietly start accepting bad workflows again. - 2. audit_definition (api.services.admin_utils.local_exec) — read-only - sweep over persisted rows used to find legacy/imported breakage. - Pinned so refactors of the rule set don't silently change the - verdicts the migration relies on. + 2. audit_definition (api.services.workflow.audit) — read-only sweep + over persisted rows used by the admin cleanup script to find + legacy/imported breakage. Pinned so refactors of the rule set + don't silently change the verdicts the migration relies on. DTO-level shape validation is covered by `test_dto.py` and isn't re-pinned here. @@ -22,7 +22,7 @@ from pathlib import Path import pytest -from api.services.admin_utils.local_exec import audit_definition +from api.services.workflow.audit import audit_definition from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.workflow_graph import WorkflowGraph