dograh/api/tests/test_node_specs.py
Mohamed-Mamdouh 5f28c1b2a9
feat: add Tuner Integration to Dograh (#311)
* Add tuner integration

* bump pipecat version

* chore: update pipecat submodule to match upstream and use tuner-pipecat-sdk 0.2.0

Update pipecat submodule from 0.0.109.dev23 to 13e98d0d9 (the exact commit
upstream dograh-hq/dograh uses after v1.30.1). This installs pipecat-ai as
1.1.0.post277 via setuptools_scm, satisfying tuner-pipecat-sdk 0.2.0's
pipecat-ai>=1.0.0 requirement.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* wire tuner

* feat: refactor integrations into self contained packages

* chore: simplify ensure_public_access_token

* fix: remove NodeSpec and make DTOs the source of truth

* feat: send relevant signal to mcp using to_mcp_dict

* fix: fix tests

* cleanup: remove nango integrations

* feat: add agents.md for integrations

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
2026-05-20 14:37:33 +05:30

417 lines
15 KiB
Python

"""Spec-quality lint.
Catches drift between NodeSpecs and the rest of the system before it lands:
- Placeholder/empty descriptions
- Missing examples
- display_options referencing fields that don't exist
- Examples that don't validate against the per-type Pydantic DTO
- Spec name not matching a discriminator value in dto.py
"""
from __future__ import annotations
import re
import pytest
from api.services.workflow.dto import (
ReactFlowDTO,
all_node_type_names,
get_node_data_model,
)
from api.services.workflow.node_data import BaseNodeData
from api.services.workflow.node_specs import (
NodeSpec,
PropertySpec,
PropertyType,
all_specs,
)
PLACEHOLDER_DESCRIPTION_PATTERN = re.compile(
r"^\s*(todo|fixme|tbd|xxx|\.\.\.|placeholder|description|n/?a|\?)\s*\.?\s*$",
re.IGNORECASE,
)
def _walk_properties(props: list[PropertySpec], path: str = ""):
"""Yield (full_path, property) for every property and nested sub-property."""
for prop in props:
full_path = f"{path}.{prop.name}" if path else prop.name
yield full_path, prop
if prop.properties:
yield from _walk_properties(prop.properties, full_path)
# ─────────────────────────────────────────────────────────────────────────
# Lint
# ─────────────────────────────────────────────────────────────────────────
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_node_spec_has_non_placeholder_description(spec: NodeSpec):
assert spec.description.strip(), f"{spec.name}: empty description"
assert not PLACEHOLDER_DESCRIPTION_PATTERN.match(spec.description), (
f"{spec.name}: description looks like a placeholder: {spec.description!r}"
)
assert len(spec.description) >= 20, (
f"{spec.name}: description too short to be useful for an LLM "
f"({len(spec.description)} chars)"
)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_node_spec_has_at_least_one_example(spec: NodeSpec):
assert spec.examples, (
f"{spec.name}: must have at least one NodeExample so LLMs have a "
f"realistic shape to pattern-match."
)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_property_descriptions_non_placeholder(spec: NodeSpec):
for path, prop in _walk_properties(spec.properties):
assert prop.description.strip(), f"{spec.name}.{path}: empty description"
assert not PLACEHOLDER_DESCRIPTION_PATTERN.match(prop.description), (
f"{spec.name}.{path}: description looks like a placeholder: "
f"{prop.description!r}"
)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_display_options_reference_real_fields(spec: NodeSpec):
"""A property's display_options must only reference sibling property
names. Nested properties are scoped to their parent's siblings."""
def _check(scope_props: list[PropertySpec], scope_path: str = ""):
names_in_scope = {p.name for p in scope_props}
for prop in scope_props:
current_path = f"{scope_path}.{prop.name}" if scope_path else prop.name
if prop.display_options:
refs = set((prop.display_options.show or {}).keys()) | set(
(prop.display_options.hide or {}).keys()
)
missing = refs - names_in_scope
assert not missing, (
f"{spec.name}.{current_path}: display_options references "
f"unknown sibling fields: {sorted(missing)}"
)
if prop.properties:
_check(prop.properties, current_path)
_check(spec.properties)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_options_properties_have_options(spec: NodeSpec):
for path, prop in _walk_properties(spec.properties):
if prop.type in (PropertyType.options, PropertyType.multi_options):
assert prop.options, (
f"{spec.name}.{path}: type={prop.type.value} requires at "
f"least one PropertyOption."
)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_fixed_collection_has_sub_properties(spec: NodeSpec):
for path, prop in _walk_properties(spec.properties):
if prop.type == PropertyType.fixed_collection:
assert prop.properties, (
f"{spec.name}.{path}: fixed_collection requires nested "
f"`properties` describing each row."
)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_spec_name_matches_dto_discriminator(spec: NodeSpec):
valid_names = all_node_type_names()
assert spec.name in valid_names, (
f"NodeSpec {spec.name!r} doesn't match any registered node type. "
f"Valid: {sorted(valid_names)}"
)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_examples_validate_against_dto(spec: NodeSpec):
"""Each NodeExample.data must pass per-type DTO validation. This stops
examples from drifting away from the actual wire schema."""
for ex in spec.examples:
wire_node = {
"id": "example",
"type": spec.name,
"position": {"x": 0, "y": 0},
"data": ex.data,
}
# Build a minimal valid graph: example node plus a synthetic peer if
# graph_constraints require an incoming or outgoing edge.
nodes = [wire_node]
edges: list[dict] = []
constraints = spec.graph_constraints
if constraints and (constraints.min_outgoing or 0) > 0:
nodes.append(
{
"id": "downstream",
"type": "endCall",
"position": {"x": 0, "y": 0},
"data": {"name": "End", "prompt": "End", "is_end": True},
}
)
edges.append(
{
"id": "e_out",
"source": "example",
"target": "downstream",
"data": {"label": "next", "condition": "next"},
}
)
if constraints and (constraints.min_incoming or 0) > 0:
nodes.append(
{
"id": "upstream",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {
"name": "Start",
"prompt": "Hello",
"is_start": True,
},
}
)
edges.append(
{
"id": "e_in",
"source": "upstream",
"target": "example",
"data": {"label": "in", "condition": "in"},
}
)
# Validate. If this raises, the example is broken.
ReactFlowDTO.model_validate({"nodes": nodes, "edges": edges})
def test_all_dto_types_have_specs():
"""Every registered node type must have a registered NodeSpec."""
spec_names = {s.name for s in all_specs()}
type_values = all_node_type_names()
missing = type_values - spec_names
assert not missing, f"Registered node types without specs: {sorted(missing)}"
def test_all_registered_node_models_inherit_base_node_data():
for type_name in sorted(all_node_type_names()):
data_model = get_node_data_model(type_name)
assert data_model is not None, f"{type_name}: missing node data model"
assert issubclass(data_model, BaseNodeData), (
f"{type_name}: node data model must inherit BaseNodeData"
)
@pytest.mark.parametrize(
("spec_name", "expected_order"),
[
(
"startCall",
[
"name",
"greeting_type",
"greeting",
"greeting_recording_id",
"prompt",
"allow_interrupt",
"add_global_prompt",
"delayed_start",
"delayed_start_duration",
"extraction_enabled",
"extraction_prompt",
"extraction_variables",
"tool_uuids",
"document_uuids",
"pre_call_fetch_enabled",
"pre_call_fetch_url",
"pre_call_fetch_credential_uuid",
],
),
(
"agentNode",
[
"name",
"prompt",
"allow_interrupt",
"add_global_prompt",
"extraction_enabled",
"extraction_prompt",
"extraction_variables",
"tool_uuids",
"document_uuids",
],
),
(
"endCall",
[
"name",
"prompt",
"add_global_prompt",
"extraction_enabled",
"extraction_prompt",
"extraction_variables",
],
),
("globalNode", ["name", "prompt"]),
("trigger", ["name", "enabled", "trigger_path"]),
(
"webhook",
[
"name",
"enabled",
"http_method",
"endpoint_url",
"credential_uuid",
"custom_headers",
"payload_template",
],
),
(
"qa",
[
"name",
"qa_enabled",
"qa_system_prompt",
"qa_min_call_duration",
"qa_voicemail_calls",
"qa_sample_rate",
"qa_use_workflow_llm",
"qa_provider",
"qa_model",
"qa_api_key",
"qa_endpoint",
],
),
(
"tuner",
[
"name",
"tuner_enabled",
"tuner_agent_id",
"tuner_workspace_id",
"tuner_api_key",
],
),
],
)
def test_node_spec_property_order_stable(spec_name: str, expected_order: list[str]):
spec = next(spec for spec in all_specs() if spec.name == spec_name)
assert [prop.name for prop in spec.properties] == expected_order
# ─────────────────────────────────────────────────────────────────────────
# `to_mcp_dict` projection — the lean view served by the `get_node_type`
# MCP tool. UI-only metadata is dropped so it doesn't poison LLM context;
# the full spec stays available to the frontend and SDK via other paths.
# ─────────────────────────────────────────────────────────────────────────
# Keys that are UI-rendering concerns and must never reach the LLM view, at
# either the node or property level.
_UI_ONLY_KEYS = frozenset(
{
"display_name",
"icon",
"category",
"version",
"placeholder",
"display_options",
"editor",
"extra",
"label", # PropertyOption display string
}
)
def _walk_dicts(node):
"""Yield every dict nested anywhere inside a projected structure."""
if isinstance(node, dict):
yield node
for value in node.values():
yield from _walk_dicts(value)
elif isinstance(node, list):
for item in node:
yield from _walk_dicts(item)
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_to_mcp_dict_drops_ui_only_keys(spec: NodeSpec):
projected = spec.to_mcp_dict()
for d in _walk_dicts(projected):
leaked = _UI_ONLY_KEYS & d.keys()
assert not leaked, f"{spec.name}: UI-only keys leaked into LLM view: {leaked}"
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_to_mcp_dict_omits_null_and_empty(spec: NodeSpec):
"""The lean view never emits null values — absent means unset/optional,
which is what halves the noise versus the full model dump."""
for d in _walk_dicts(spec.to_mcp_dict()):
for key, value in d.items():
assert value is not None, f"{spec.name}: {key!r} emitted as null"
@pytest.mark.parametrize("spec", all_specs(), ids=lambda s: s.name)
def test_to_mcp_dict_keeps_property_essentials(spec: NodeSpec):
"""Every property in the LLM view carries the minimum an LLM needs to
author a value: machine name, type, and a description."""
def _check(props: list[dict]):
for prop in props:
assert prop.get("name"), f"{spec.name}: property missing name"
assert prop.get("type"), f"{spec.name}.{prop.get('name')}: missing type"
assert prop.get("description"), (
f"{spec.name}.{prop.get('name')}: missing description"
)
if prop.get("properties"):
_check(prop["properties"])
_check(spec.to_mcp_dict()["properties"])
def test_to_mcp_dict_retains_authoring_signal_startcall():
"""startCall is the richest core node — lock in that the projection
keeps the fields an LLM actually authors against while shedding the rest."""
spec = next(s for s in all_specs() if s.name == "startCall")
projected = spec.to_mcp_dict()
assert set(projected) == {
"name",
"description",
"llm_hint",
"properties",
"examples",
"graph_constraints",
}
props = {p["name"]: p for p in projected["properties"]}
# Required field keeps `required`; optional fields omit it.
assert props["prompt"]["required"] is True
assert "required" not in props["greeting"]
# Enum options project to bare values, dropping the UI label.
assert props["greeting_type"]["options"] == [{"value": "text"}, {"value": "audio"}]
# Validation bounds survive (they constrain valid authored values).
assert props["delayed_start_duration"]["min_value"] == 0.1
assert props["delayed_start_duration"]["max_value"] == 10.0
# llm_hint survives where present (catalog-tool references).
assert "list_recordings" in props["greeting_recording_id"]["llm_hint"]
# fixed_collection rows recurse through the same projection.
var_rows = {p["name"]: p for p in props["extraction_variables"]["properties"]}
assert var_rows["type"]["options"] == [
{"value": "string"},
{"value": "number"},
{"value": "boolean"},
]
# graph_constraints drops its null sub-fields.
assert projected["graph_constraints"] == {"min_incoming": 0, "max_incoming": 0}