dograh/api/tests/test_ts_bridge.py

433 lines
14 KiB
Python
Raw Permalink Normal View History

"""End-to-end tests for the Node TS validator bridge.
Exercises the real `node` subprocess slow-ish but the whole point is
that code JSON and JSON code round-trip losslessly.
"""
from __future__ import annotations
import shutil
from types import NoneType
from typing import Any, get_args
import pytest
from api.mcp_server.ts_bridge import TsBridgeError, generate_code, parse_code
from api.services.workflow.dto import EdgeDataDTO
from api.services.workflow.node_specs import (
NodeSpec,
PropertySpec,
PropertyType,
all_specs,
)
pytestmark = pytest.mark.skipif(
shutil.which("node") is None, reason="node binary not available"
)
def _minimal_workflow() -> dict:
"""Start → End, one edge. Stored shape matches ReactFlowDTO."""
return {
"nodes": [
{
"id": "1",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {
"name": "Greeting",
"prompt": "Greet warmly.",
"greeting_type": "text",
"greeting": "Hi {{first_name}}!",
"allow_interrupt": True,
},
},
{
"id": "2",
"type": "endCall",
"position": {"x": 200, "y": 0},
"data": {"name": "Done", "prompt": "Say goodbye."},
},
],
"edges": [
{
"id": "1-2",
"source": "1",
"target": "2",
"data": {"label": "done", "condition": "conversation complete"},
},
],
"viewport": {"x": 0, "y": 0, "zoom": 1},
}
def _normalize(wf: dict) -> dict:
"""Strip cosmetics before comparing a round-tripped workflow.
Node IDs are regenerated deterministically by the parser
(1, 2, 3, ...) so the inputs already match if constructed that way.
Position is preserved. Edge ids follow `source-target`.
"""
return {
"nodes": [
{
"id": n["id"],
"type": n["type"],
"position": n["position"],
"data": n["data"],
}
for n in wf["nodes"]
],
"edges": [
{
"id": e["id"],
"source": e["source"],
"target": e["target"],
"data": e["data"],
}
for e in wf["edges"]
],
}
def _strip_optional(annotation: Any) -> Any:
args = tuple(arg for arg in get_args(annotation) if arg is not NoneType)
if len(args) == 1:
return args[0]
return annotation
def _pick_option_value(prop: PropertySpec) -> Any:
assert prop.options, f"{prop.name} has no options"
default = prop.default
for option in prop.options:
if option.value != default:
return option.value
return prop.options[0].value
def _sample_number(prop: PropertySpec) -> int | float:
candidates: list[int | float] = [1, 2, 3, 0.5, 4.5, 10]
for candidate in candidates:
if prop.min_value is not None and candidate < prop.min_value:
continue
if prop.max_value is not None and candidate > prop.max_value:
continue
if prop.default is not None and candidate == prop.default:
continue
return candidate
raise AssertionError(f"No valid sample number found for {prop.name}")
def _sample_property_value(prop: PropertySpec, *, path: str) -> Any:
slug = path.replace(".", "_")
if prop.type == PropertyType.string:
return f"{slug}_value"
if prop.type == PropertyType.mention_textarea:
return f"{slug} prompt with {{name}}"
if prop.type == PropertyType.url:
return f"https://example.com/{slug}"
if prop.type == PropertyType.recording_ref:
return f"recording_{slug}"
if prop.type == PropertyType.credential_ref:
return f"credential_{slug}"
if prop.type == PropertyType.number:
return _sample_number(prop)
if prop.type == PropertyType.boolean:
return not prop.default if isinstance(prop.default, bool) else True
if prop.type == PropertyType.options:
return _pick_option_value(prop)
if prop.type == PropertyType.multi_options:
return [_pick_option_value(prop)]
if prop.type == PropertyType.tool_refs:
return [f"tool_{slug}"]
if prop.type == PropertyType.document_refs:
return [f"document_{slug}"]
if prop.type == PropertyType.json:
return {"kind": slug, "enabled": True}
if prop.type == PropertyType.fixed_collection:
assert prop.properties, f"{prop.name} fixed_collection has no sub-properties"
return [
{
sub_prop.name: _sample_property_value(
sub_prop, path=f"{path}.{sub_prop.name}"
)
for sub_prop in prop.properties
}
]
raise AssertionError(f"Unhandled PropertyType in TS bridge test: {prop.type}")
def _sample_node_data(spec: NodeSpec) -> dict[str, Any]:
return {
prop.name: _sample_property_value(prop, path=f"{spec.name}.{prop.name}")
for prop in spec.properties
}
def _sample_edge_value(field_name: str, annotation: Any) -> Any:
inner = _strip_optional(annotation)
if inner is str:
return f"{field_name}_value"
if inner is bool:
return True
if inner in (int, float):
return 1
raise AssertionError(
f"Unhandled edge field annotation in TS bridge test: {field_name} -> {annotation!r}"
)
def _sample_edge_data() -> dict[str, Any]:
return {
field_name: _sample_edge_value(field_name, field.annotation)
for field_name, field in EdgeDataDTO.model_fields.items()
}
# ─── generate_code ───────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_generate_emits_imports_and_factories():
code = await generate_code(_minimal_workflow(), workflow_name="test")
assert 'import { Workflow } from "@dograh/sdk";' in code
assert "startCall" in code
assert "endCall" in code
assert "wf.addTyped(startCall(" in code
assert "wf.edge(" in code
@pytest.mark.asyncio
async def test_generate_strips_spec_defaults():
wf = _minimal_workflow()
code = await generate_code(wf)
# `add_global_prompt=True` is a spec default for startCall; emitted
# code should omit it. Keeps the LLM-facing projection tight.
assert "add_global_prompt" not in code
@pytest.mark.asyncio
async def test_generate_omits_position():
"""Positions are hidden from the LLM — auto-layout post-processing
(future) reassigns them on save. Keeping them out of the edit
surface avoids the LLM producing cramped/overlapping layouts."""
wf = _minimal_workflow()
code = await generate_code(wf)
assert "position" not in code
@pytest.mark.asyncio
async def test_generate_strips_legacy_ui_state_fields():
"""Stored workflows from before spec validation carry UI-state fields
(`invalid`, `selected`, `is_start`, etc.). `get_workflow_code` hides
those from the LLM so edits don't round-trip the noise."""
wf = {
"nodes": [
{
"id": "1",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {
"name": "g",
"prompt": "hi",
"invalid": False,
"validationMessage": None,
"is_start": True,
"selected": True,
"dragging": False,
},
},
],
"edges": [],
"viewport": {"x": 0, "y": 0, "zoom": 1},
}
code = await generate_code(wf)
for dropped in ("invalid", "validationMessage", "is_start", "selected", "dragging"):
assert dropped not in code, f"{dropped} should be stripped"
assert 'prompt: "hi"' in code
@pytest.mark.asyncio
async def test_generate_strips_unknown_edge_fields():
wf = _minimal_workflow()
wf["edges"][0]["data"]["invalid"] = False
wf["edges"][0]["data"]["validationMessage"] = None
code = await generate_code(wf)
assert "invalid" not in code
assert "validationMessage" not in code
@pytest.mark.asyncio
async def test_generate_preserves_all_edge_dto_fields():
wf = _minimal_workflow()
edge_data = _sample_edge_data()
wf["edges"][0]["data"] = edge_data
code = await generate_code(wf)
result = await parse_code(code)
assert result["ok"] is True, result
assert result["workflow"]["edges"][0]["data"] == edge_data
# ─── parse_code ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_parse_accepts_minimal_code():
code = """import { Workflow } from "@dograh/sdk";
import { startCall, endCall } from "@dograh/sdk/typed";
const wf = new Workflow({ name: "min" });
const a = wf.addTyped(startCall({ name: "g", prompt: "hi" }));
const b = wf.addTyped(endCall({ name: "d", prompt: "bye" }));
wf.edge(a, b, { label: "done", condition: "wrapped" });
"""
result = await parse_code(code)
assert result["ok"] is True
wf = result["workflow"]
assert len(wf["nodes"]) == 2
assert len(wf["edges"]) == 1
assert wf["nodes"][0]["type"] == "startCall"
assert wf["edges"][0]["source"] == wf["nodes"][0]["id"]
@pytest.mark.asyncio
async def test_parse_rejects_function_declaration():
code = """import { Workflow } from "@dograh/sdk";
const wf = new Workflow({ name: "x" });
function evil() { return 1; }
"""
result = await parse_code(code)
assert result["ok"] is False
assert result["stage"] == "parse"
assert any("FunctionDeclaration" in e["message"] for e in result["errors"])
@pytest.mark.asyncio
async def test_parse_rejects_unknown_field():
code = """import { Workflow } from "@dograh/sdk";
import { startCall } from "@dograh/sdk/typed";
const wf = new Workflow({ name: "x" });
const a = wf.addTyped(startCall({ name: "g", prompt: "hi", promt: "typo" }));
"""
result = await parse_code(code)
assert result["ok"] is False
assert result["stage"] == "validate"
assert any("Unknown field" in e["message"] for e in result["errors"])
@pytest.mark.asyncio
async def test_parse_rejects_unknown_variable_in_edge():
code = """import { Workflow } from "@dograh/sdk";
import { startCall, endCall } from "@dograh/sdk/typed";
const wf = new Workflow({ name: "x" });
const a = wf.addTyped(startCall({ name: "g", prompt: "hi" }));
wf.edge(a, missing, { label: "done", condition: "c" });
"""
result = await parse_code(code)
assert result["ok"] is False
assert result["stage"] == "parse"
assert any("Unknown node variable" in e["message"] for e in result["errors"])
@pytest.mark.asyncio
async def test_parse_requires_label_and_condition_on_edge():
code = """import { Workflow } from "@dograh/sdk";
import { startCall, endCall } from "@dograh/sdk/typed";
const wf = new Workflow({ name: "x" });
const a = wf.addTyped(startCall({ name: "g", prompt: "hi" }));
const b = wf.addTyped(endCall({ name: "d", prompt: "bye" }));
wf.edge(a, b, { label: "", condition: "c" });
"""
result = await parse_code(code)
assert result["ok"] is False
assert result["stage"] == "parse"
@pytest.mark.asyncio
async def test_parse_rejects_unknown_edge_field():
code = """import { Workflow } from "@dograh/sdk";
import { startCall, endCall } from "@dograh/sdk/typed";
const wf = new Workflow({ name: "x" });
const a = wf.addTyped(startCall({ name: "g", prompt: "hi" }));
const b = wf.addTyped(endCall({ name: "d", prompt: "bye" }));
wf.edge(a, b, { label: "done", condition: "wrapped", bogus: "x" });
"""
result = await parse_code(code)
assert result["ok"] is False
assert result["stage"] == "parse"
assert any("Unknown edge field" in e["message"] for e in result["errors"])
# ─── Round-trip ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_round_trip_minimal():
wf = _minimal_workflow()
code = await generate_code(wf, workflow_name="rt")
result = await parse_code(code)
assert result["ok"] is True, result
# Positions are intentionally not preserved — they'll be reassigned
# by a downstream auto-layout pass. Parser defaults to {0, 0}.
for in_node, out_node in zip(wf["nodes"], result["workflow"]["nodes"]):
assert out_node["type"] == in_node["type"]
assert out_node["position"] == {"x": 0, "y": 0}
for k, v in in_node["data"].items():
assert out_node["data"][k] == v, (
f"{k}: {out_node['data'].get(k)!r} != {v!r}"
)
assert _normalize({"nodes": [], "edges": result["workflow"]["edges"]})["edges"] == [
{
"id": "1-2",
"source": "1",
"target": "2",
"data": {"label": "done", "condition": "conversation complete"},
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("spec", all_specs(), ids=lambda spec: spec.name)
async def test_round_trip_preserves_all_node_spec_fields(spec: NodeSpec):
data = _sample_node_data(spec)
wf = {
"nodes": [
{
"id": "1",
"type": spec.name,
"position": {"x": 0, "y": 0},
"data": data,
}
],
"edges": [],
"viewport": {"x": 0, "y": 0, "zoom": 1},
}
code = await generate_code(wf, workflow_name=f"{spec.name}_rt")
result = await parse_code(code)
assert result["ok"] is True, result
assert result["workflow"]["nodes"][0]["data"] == data
@pytest.mark.asyncio
async def test_generate_fails_on_unknown_type():
bad = {
"nodes": [
{
"id": "1",
"type": "doesNotExist",
"position": {"x": 0, "y": 0},
"data": {},
}
],
"edges": [],
"viewport": {"x": 0, "y": 0, "zoom": 1},
}
with pytest.raises(TsBridgeError, match="Unknown node type"):
await generate_code(bad)