feat: refactor node spec and add mcp tools (#244)

* refactor: carve out extraction panel

* refactor: create spec versions for node types

* refactor: create a GenericNode and remove custom nodes

* feat: add python and typescript sdk

* add dograh sdk

* fix: fetch draft workflow definition over published one

* fix: fix routes of SDKs to use code gen

* chore: remove doclink dependency to reduce image size

* chore: format files

* chore: bump pipecat

* feat: let mcp fetch archived workflows on demand

* chore: fix tests

* feat: add sdk documentation

* chore: change banner and add badge
This commit is contained in:
Abhishek 2026-04-21 07:56:16 +05:30 committed by GitHub
parent 0a61ef295f
commit 00a1a22b74
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
162 changed files with 14355 additions and 3554 deletions

3
sdk/python/.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
dist/
build/
*.egg-info/

24
sdk/python/LICENSE Normal file
View file

@ -0,0 +1,24 @@
BSD 2-Clause License
Copyright (c) 2025, Zansat Technologies Private Limited
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

77
sdk/python/README.md Normal file
View file

@ -0,0 +1,77 @@
# dograh-sdk
Typed builder for Dograh voice-AI workflows. Fetches the node-spec catalog from
the Dograh backend at session start, validates every call against it at the
call site, and produces `ReactFlowDTO`-compatible JSON.
## Install
```bash
pip install dograh-sdk
```
For local development against a checked-out monorepo:
```bash
pip install -e sdk/python/
```
## Usage
```python
from dograh_sdk import DograhClient, Workflow
with DograhClient(base_url="http://localhost:8000", api_key="...") as client:
wf = Workflow(client=client, name="loan_qualification")
start = wf.add(
type="startCall",
name="greeting",
prompt="You are Sarah from Acme Loans. Greet the caller warmly.",
greeting_type="text",
greeting="Hi {{first_name}}, this is Sarah.",
)
qualify = wf.add(
type="agentNode",
name="qualify",
prompt="Ask about loan amount and timeline.",
)
done = wf.add(type="endCall", name="done", prompt="Thank the caller.")
wf.edge(start, qualify, label="interested", condition="Caller expressed interest.")
wf.edge(qualify, done, label="done", condition="Qualification complete.")
client.save_workflow(workflow_id=123, workflow=wf)
```
## What gets validated at the call site
The SDK fetches the spec for each node type via `get_node_type` and raises
`ValidationError` immediately when:
- an unknown field is passed (catches typos)
- a required field is missing or empty
- a scalar type is wrong (e.g., string for a boolean)
- an `options` value isn't in the allowed list
When a spec carries an `llm_hint`, the hint is appended to the error message so
an LLM agent can self-correct on retry:
```
tool_uuids: expected tool_refs, got str
Hint: List of tool UUIDs from `list_tools`.
```
Server-side Pydantic validators run on save and surface anything the SDK lets
through (compound invariants, cross-field rules).
## Environment
```bash
DOGRAH_API_URL=http://localhost:8000 # default
DOGRAH_API_KEY=sk-... # sent as X-API-Key
```
## License
BSD 2-Clause — see `LICENSE`.

49
sdk/python/pyproject.toml Normal file
View file

@ -0,0 +1,49 @@
[project]
name = "dograh-sdk"
version = "0.1.2"
description = "Typed builder for Dograh voice-AI workflows"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "BSD-2-Clause" }
authors = [
{ name = "Zansat Technologies Private Limited", email = "contact@dograh.com" },
]
keywords = ["dograh", "voice-ai", "workflow", "sdk", "llm", "agent"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries",
]
dependencies = [
"httpx>=0.27",
"pydantic>=2.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
]
[project.urls]
Homepage = "https://dograh.com"
Documentation = "https://docs.dograh.com"
Repository = "https://github.com/dograh-hq/dograh"
[project.scripts]
dograh-sdk-codegen = "dograh_sdk.codegen:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/dograh_sdk"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View file

@ -0,0 +1,35 @@
"""Dograh SDK — typed builder for voice-AI workflows.
Runtime SDK: fetches the spec catalog from the Dograh backend at session
start and validates every `Workflow.add()` call against it. LLMs don't
need to import per-node-type classes the `type` argument is a string
keyed against the fetched spec catalog.
from dograh_sdk import DograhClient, Workflow
with DograhClient(base_url="http://localhost:8000", api_key=...) as client:
wf = Workflow(client=client, name="loan_qualification")
start = wf.add(type="startCall", name="greeting", prompt="...")
qualify = wf.add(type="agentNode", name="qualify", prompt="...")
wf.edge(start, qualify, label="interested", condition="...")
client.save_workflow(workflow_id=123, workflow=wf)
For typed IDE autocomplete, generate per-node dataclasses via the SDK
codegen (Phase 6) the runtime and typed SDKs share this same core.
"""
from .client import DograhClient
from .errors import ApiError, DograhSdkError, SpecMismatchError, ValidationError
from .typed._base import TypedNode
from .workflow import NodeRef, Workflow
__all__ = [
"ApiError",
"DograhClient",
"DograhSdkError",
"NodeRef",
"SpecMismatchError",
"TypedNode",
"ValidationError",
"Workflow",
]

View file

@ -0,0 +1,102 @@
"""GENERATED — do not edit. Source: filtered OpenAPI from `api.app`.
Regenerate with `./scripts/generate_sdk.sh`.
`DograhClient` mixes in this class to get HTTP methods for every route
decorated with `sdk_expose(...)` on the backend. Request/response types
come from `_generated_models` (datamodel-codegen output).
"""
from __future__ import annotations
from typing import Any
from dograh_sdk._generated_models import (
CredentialResponse,
DocumentListResponseSchema,
InitiateCallRequest,
NodeSpec,
NodeTypesResponse,
RecordingListResponseSchema,
ToolResponse,
UpdateWorkflowRequest,
WorkflowListResponse,
WorkflowResponse,
)
class _GeneratedClient:
# `DograhClient.__init__` installs `self._request` (see client.py).
def get_node_type(self, name: str) -> NodeSpec:
"""Fetch a single node spec by name."""
data = self._request("GET", f"/node-types/{name}")
return NodeSpec.model_validate(data)
def get_workflow(self, workflow_id: int) -> WorkflowResponse:
"""Get a single workflow by ID (returns draft if one exists, else published)."""
data = self._request("GET", f"/workflow/fetch/{workflow_id}")
return WorkflowResponse.model_validate(data)
def list_credentials(self) -> list[CredentialResponse]:
"""List webhook credentials available to the authenticated organization."""
data = self._request("GET", "/credentials/")
return [CredentialResponse.model_validate(x) for x in data]
def list_documents(self, *, status: str | None = None, limit: int | None = None, offset: int | None = None) -> DocumentListResponseSchema:
"""List knowledge base documents available to the authenticated organization."""
params: dict[str, Any] = {}
if status is not None:
params["status"] = status
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
data = self._request("GET", "/knowledge-base/documents", params=params)
return DocumentListResponseSchema.model_validate(data)
def list_node_types(self) -> NodeTypesResponse:
"""List every registered node type with its spec. Pinned to spec_version."""
data = self._request("GET", "/node-types")
return NodeTypesResponse.model_validate(data)
def list_recordings(self, *, workflow_id: int | None = None, tts_provider: str | None = None, tts_model: str | None = None, tts_voice_id: str | None = None) -> RecordingListResponseSchema:
"""List workflow recordings available to the authenticated organization."""
params: dict[str, Any] = {}
if workflow_id is not None:
params["workflow_id"] = workflow_id
if tts_provider is not None:
params["tts_provider"] = tts_provider
if tts_model is not None:
params["tts_model"] = tts_model
if tts_voice_id is not None:
params["tts_voice_id"] = tts_voice_id
data = self._request("GET", "/workflow-recordings/", params=params)
return RecordingListResponseSchema.model_validate(data)
def list_tools(self, *, status: str | None = None, category: str | None = None) -> list[ToolResponse]:
"""List tools available to the authenticated organization."""
params: dict[str, Any] = {}
if status is not None:
params["status"] = status
if category is not None:
params["category"] = category
data = self._request("GET", "/tools/", params=params)
return [ToolResponse.model_validate(x) for x in data]
def list_workflows(self, *, status: str | None = None) -> list[WorkflowListResponse]:
"""List all workflows in the authenticated organization."""
params: dict[str, Any] = {}
if status is not None:
params["status"] = status
data = self._request("GET", "/workflow/fetch", params=params)
return [WorkflowListResponse.model_validate(x) for x in data]
def test_phone_call(self, *, body: InitiateCallRequest) -> Any:
"""Place a test call from a workflow to a phone number."""
return self._request("POST", "/telephony/initiate-call", json=body.model_dump(mode="json", exclude_none=True))
def update_workflow(self, workflow_id: int, *, body: UpdateWorkflowRequest) -> WorkflowResponse:
"""Update a workflow's name and/or definition. Saves as a new draft."""
data = self._request("PUT", f"/workflow/{workflow_id}", json=body.model_dump(mode="json", exclude_none=True))
return WorkflowResponse.model_validate(data)

View file

@ -0,0 +1,358 @@
# generated by datamodel-codegen:
# filename: dograh-openapi-XXXXXX.json.oPRfLAwVZP
# timestamp: 2026-04-21T02:15:12+00:00
from __future__ import annotations
from enum import Enum
from typing import Annotated, Any
from pydantic import AwareDatetime, BaseModel, ConfigDict, Field
class CallDispositionCodes(BaseModel):
disposition_codes: Annotated[list[str] | None, Field(title='Disposition Codes')] = (
[]
)
class CreatedByResponse(BaseModel):
"""
Response schema for the user who created a tool.
"""
id: Annotated[int, Field(title='Id')]
provider_id: Annotated[str, Field(title='Provider Id')]
class CredentialResponse(BaseModel):
"""
Response schema for a webhook credential (never includes sensitive data).
"""
uuid: Annotated[str, Field(title='Uuid')]
name: Annotated[str, Field(title='Name')]
description: Annotated[str | None, Field(title='Description')]
credential_type: Annotated[str, Field(title='Credential Type')]
created_at: Annotated[AwareDatetime, Field(title='Created At')]
updated_at: Annotated[AwareDatetime | None, Field(title='Updated At')]
class DisplayOptions(BaseModel):
"""
Conditional visibility rules.
`show` keys are AND-combined: this property is visible only when EVERY
referenced field's value matches one of the listed values.
`hide` keys are OR-combined: this property is hidden when ANY referenced
field's value matches one of the listed values.
Example:
DisplayOptions(show={"extraction_enabled": [True]})
DisplayOptions(show={"greeting_type": ["audio"]})
"""
model_config = ConfigDict(
extra='forbid',
)
show: Annotated[dict[str, list[Any]] | None, Field(title='Show')] = None
hide: Annotated[dict[str, list[Any]] | None, Field(title='Hide')] = None
class DocumentResponseSchema(BaseModel):
"""
Response schema for document metadata.
"""
id: Annotated[int, Field(title='Id')]
document_uuid: Annotated[str, Field(title='Document Uuid')]
filename: Annotated[str, Field(title='Filename')]
file_size_bytes: Annotated[int, Field(title='File Size Bytes')]
file_hash: Annotated[str, Field(title='File Hash')]
mime_type: Annotated[str, Field(title='Mime Type')]
processing_status: Annotated[str, Field(title='Processing Status')]
processing_error: Annotated[str | None, Field(title='Processing Error')] = None
total_chunks: Annotated[int, Field(title='Total Chunks')]
retrieval_mode: Annotated[str | None, Field(title='Retrieval Mode')] = 'chunked'
custom_metadata: Annotated[dict[str, Any], Field(title='Custom Metadata')]
docling_metadata: Annotated[dict[str, Any], Field(title='Docling Metadata')]
source_url: Annotated[str | None, Field(title='Source Url')] = None
created_at: Annotated[AwareDatetime, Field(title='Created At')]
updated_at: Annotated[AwareDatetime, Field(title='Updated At')]
organization_id: Annotated[int, Field(title='Organization Id')]
created_by: Annotated[int, Field(title='Created By')]
is_active: Annotated[bool, Field(title='Is Active')]
class GraphConstraints(BaseModel):
"""
Per-node-type graph rules. WorkflowGraph enforces these at validation.
"""
model_config = ConfigDict(
extra='forbid',
)
min_incoming: Annotated[int | None, Field(title='Min Incoming')] = None
max_incoming: Annotated[int | None, Field(title='Max Incoming')] = None
min_outgoing: Annotated[int | None, Field(title='Min Outgoing')] = None
max_outgoing: Annotated[int | None, Field(title='Max Outgoing')] = None
class InitiateCallRequest(BaseModel):
workflow_id: Annotated[int, Field(title='Workflow Id')]
workflow_run_id: Annotated[int | None, Field(title='Workflow Run Id')] = None
phone_number: Annotated[str | None, Field(title='Phone Number')] = None
class NodeCategory(Enum):
"""
Drives grouping in the AddNodePanel UI.
"""
call_node = 'call_node'
global_node = 'global_node'
trigger = 'trigger'
integration = 'integration'
class NodeExample(BaseModel):
"""
A worked example LLMs can pattern-match. Keep small and realistic.
"""
model_config = ConfigDict(
extra='forbid',
)
name: Annotated[str, Field(title='Name')]
description: Annotated[str | None, Field(title='Description')] = None
data: Annotated[dict[str, Any], Field(title='Data')]
class PropertyOption(BaseModel):
"""
An option in an `options` or `multi_options` dropdown.
"""
model_config = ConfigDict(
extra='forbid',
)
value: Annotated[str | int | bool | float, Field(title='Value')]
label: Annotated[str, Field(title='Label')]
description: Annotated[str | None, Field(title='Description')] = None
class PropertyType(Enum):
"""
Bounded vocabulary of property types the renderer dispatches on.
Adding a value here requires a matching arm in the frontend
`<PropertyInput>` switch and (where relevant) the SDK codegen template.
"""
string = 'string'
number = 'number'
boolean = 'boolean'
options = 'options'
multi_options = 'multi_options'
fixed_collection = 'fixed_collection'
json = 'json'
tool_refs = 'tool_refs'
document_refs = 'document_refs'
recording_ref = 'recording_ref'
credential_ref = 'credential_ref'
mention_textarea = 'mention_textarea'
url = 'url'
class RecordingResponseSchema(BaseModel):
"""
Response schema for a single recording.
"""
id: Annotated[int, Field(title='Id')]
recording_id: Annotated[str, Field(title='Recording Id')]
workflow_id: Annotated[int | None, Field(title='Workflow Id')] = None
organization_id: Annotated[int, Field(title='Organization Id')]
tts_provider: Annotated[str | None, Field(title='Tts Provider')] = None
tts_model: Annotated[str | None, Field(title='Tts Model')] = None
tts_voice_id: Annotated[str | None, Field(title='Tts Voice Id')] = None
transcript: Annotated[str, Field(title='Transcript')]
storage_key: Annotated[str, Field(title='Storage Key')]
storage_backend: Annotated[str, Field(title='Storage Backend')]
metadata: Annotated[dict[str, Any], Field(title='Metadata')]
created_by: Annotated[int, Field(title='Created By')]
created_at: Annotated[AwareDatetime, Field(title='Created At')]
is_active: Annotated[bool, Field(title='Is Active')]
class ToolResponse(BaseModel):
"""
Response schema for a tool.
"""
id: Annotated[int, Field(title='Id')]
tool_uuid: Annotated[str, Field(title='Tool Uuid')]
name: Annotated[str, Field(title='Name')]
description: Annotated[str | None, Field(title='Description')]
category: Annotated[str, Field(title='Category')]
icon: Annotated[str | None, Field(title='Icon')]
icon_color: Annotated[str | None, Field(title='Icon Color')]
status: Annotated[str, Field(title='Status')]
definition: Annotated[dict[str, Any], Field(title='Definition')]
created_at: Annotated[AwareDatetime, Field(title='Created At')]
updated_at: Annotated[AwareDatetime | None, Field(title='Updated At')]
created_by: CreatedByResponse | None = None
class UpdateWorkflowRequest(BaseModel):
name: Annotated[str | None, Field(title='Name')] = None
workflow_definition: Annotated[
dict[str, Any] | None, Field(title='Workflow Definition')
] = None
template_context_variables: Annotated[
dict[str, Any] | None, Field(title='Template Context Variables')
] = None
workflow_configurations: Annotated[
dict[str, Any] | None, Field(title='Workflow Configurations')
] = None
class ValidationError(BaseModel):
loc: Annotated[list[str | int], Field(title='Location')]
msg: Annotated[str, Field(title='Message')]
type: Annotated[str, Field(title='Error Type')]
input: Annotated[Any | None, Field(title='Input')] = None
ctx: Annotated[dict[str, Any] | None, Field(title='Context')] = None
class WorkflowListResponse(BaseModel):
"""
Lightweight response for workflow listings (excludes large fields).
"""
id: Annotated[int, Field(title='Id')]
name: Annotated[str, Field(title='Name')]
status: Annotated[str, Field(title='Status')]
created_at: Annotated[AwareDatetime, Field(title='Created At')]
total_runs: Annotated[int, Field(title='Total Runs')]
class WorkflowResponse(BaseModel):
id: Annotated[int, Field(title='Id')]
name: Annotated[str, Field(title='Name')]
status: Annotated[str, Field(title='Status')]
created_at: Annotated[AwareDatetime, Field(title='Created At')]
workflow_definition: Annotated[dict[str, Any], Field(title='Workflow Definition')]
current_definition_id: Annotated[int | None, Field(title='Current Definition Id')]
template_context_variables: Annotated[
dict[str, Any] | None, Field(title='Template Context Variables')
] = None
call_disposition_codes: CallDispositionCodes | None = None
total_runs: Annotated[int | None, Field(title='Total Runs')] = None
workflow_configurations: Annotated[
dict[str, Any] | None, Field(title='Workflow Configurations')
] = None
version_number: Annotated[int | None, Field(title='Version Number')] = None
version_status: Annotated[str | None, Field(title='Version Status')] = None
class DocumentListResponseSchema(BaseModel):
"""
Response schema for list of documents.
"""
documents: Annotated[list[DocumentResponseSchema], Field(title='Documents')]
total: Annotated[int, Field(title='Total')]
limit: Annotated[int, Field(title='Limit')]
offset: Annotated[int, Field(title='Offset')]
class HTTPValidationError(BaseModel):
detail: Annotated[list[ValidationError] | None, Field(title='Detail')] = None
class PropertySpec(BaseModel):
"""
Single field on a node.
`description` is HUMAN-FACING shown under the field in the edit
dialog. Keep it concise and explain what the field does.
`llm_hint` is LLM-FACING appears only in the `get_node_type` MCP
response and in SDK schema output. Use it for catalog tool references
(e.g., "Use `list_recordings`"), array shape, expected value idioms,
or anything that would be noise in the UI. Optional; omit when the
`description` already suffices for both audiences.
"""
model_config = ConfigDict(
extra='forbid',
)
name: Annotated[str, Field(title='Name')]
type: PropertyType
display_name: Annotated[str, Field(title='Display Name')]
description: Annotated[str, Field(min_length=1, title='Description')]
"""
Human-facing explanation shown in the UI.
"""
llm_hint: Annotated[str | None, Field(title='Llm Hint')] = None
"""
LLM-only guidance; omitted from the UI.
"""
default: Annotated[Any | None, Field(title='Default')] = None
required: Annotated[bool | None, Field(title='Required')] = False
placeholder: Annotated[str | None, Field(title='Placeholder')] = None
display_options: DisplayOptions | None = None
options: Annotated[list[PropertyOption] | None, Field(title='Options')] = None
properties: Annotated[list[PropertySpec] | None, Field(title='Properties')] = None
min_value: Annotated[float | None, Field(title='Min Value')] = None
max_value: Annotated[float | None, Field(title='Max Value')] = None
min_length: Annotated[int | None, Field(title='Min Length')] = None
max_length: Annotated[int | None, Field(title='Max Length')] = None
pattern: Annotated[str | None, Field(title='Pattern')] = None
editor: Annotated[str | None, Field(title='Editor')] = None
extra: Annotated[dict[str, Any] | None, Field(title='Extra')] = None
class RecordingListResponseSchema(BaseModel):
"""
Response schema for list of recordings.
"""
recordings: Annotated[list[RecordingResponseSchema], Field(title='Recordings')]
total: Annotated[int, Field(title='Total')]
class NodeSpec(BaseModel):
"""
Single source of truth for a node type.
"""
model_config = ConfigDict(
extra='forbid',
)
name: Annotated[str, Field(title='Name')]
display_name: Annotated[str, Field(title='Display Name')]
description: Annotated[str, Field(min_length=1, title='Description')]
"""
Human-facing explanation shown in AddNodePanel.
"""
llm_hint: Annotated[str | None, Field(title='Llm Hint')] = None
"""
LLM-only guidance; omitted from the UI.
"""
category: NodeCategory
icon: Annotated[str, Field(title='Icon')]
version: Annotated[str | None, Field(title='Version')] = '1.0.0'
properties: Annotated[list[PropertySpec], Field(title='Properties')]
examples: Annotated[list[NodeExample] | None, Field(title='Examples')] = None
graph_constraints: GraphConstraints | None = None
class NodeTypesResponse(BaseModel):
spec_version: Annotated[str, Field(title='Spec Version')]
node_types: Annotated[list[NodeSpec], Field(title='Node Types')]
PropertySpec.model_rebuild()

View file

@ -0,0 +1,166 @@
"""Client-side validation of node data against a fetched spec.
Intentionally lightweight: we catch the hallucinations that matter (typo'd
field names, missing required fields, obvious scalar-type mismatches) and
leave rigorous coercion to the backend's Pydantic validators, which run at
save time. The tradeoff: the SDK fails fast on mistakes an LLM makes, and
the backend remains the single authority on wire-format correctness.
"""
from __future__ import annotations
from typing import Any
from .errors import ValidationError
# Map PropertyType → primitive Python type(s) we check at call site.
# `None` means "skip scalar-type check" (compound types, refs, JSON, etc.).
_SCALAR_TYPES: dict[str, tuple[type, ...] | None] = {
"string": (str,),
"number": (int, float),
"boolean": (bool,),
"options": None, # value-in-options handled separately
"multi_options": None,
"fixed_collection": (list,),
"json": None, # any JSON-serializable
"tool_refs": (list,),
"document_refs": (list,),
"recording_ref": (str,),
"credential_ref": (str,),
"mention_textarea": (str,),
"url": (str,),
}
def _with_hint(prop: dict[str, Any], message: str) -> str:
"""Append `prop.llm_hint` to an error message when set.
Surfacing the hint inside validation errors lets an LLM author
self-correct on retry it sees the catalog reference or value-shape
guidance inline with the failure.
"""
hint = prop.get("llm_hint")
if hint:
return f"{message}\n Hint: {hint}"
return message
def _check_scalar(prop: dict[str, Any], value: Any) -> None:
# None is always allowed (missing value handled by required check).
if value is None:
return
allowed = _SCALAR_TYPES.get(prop["type"])
if allowed is None:
return
# Booleans ARE ints in Python, so exclude accidentally-matching bools.
if bool in allowed and not (int in allowed or float in allowed):
if not isinstance(value, bool):
raise ValidationError(
_with_hint(
prop,
f"{prop['name']}: expected boolean, got {type(value).__name__}",
)
)
return
if not isinstance(value, allowed):
raise ValidationError(
_with_hint(
prop,
f"{prop['name']}: expected {prop['type']}, "
f"got {type(value).__name__}",
)
)
def _check_options(prop: dict[str, Any], value: Any) -> None:
if value is None:
return
allowed = {o["value"] for o in prop.get("options") or []}
if not allowed:
return
if prop["type"] == "multi_options":
if not isinstance(value, list):
raise ValidationError(
_with_hint(
prop,
f"{prop['name']}: expected list, got {type(value).__name__}",
)
)
bad = [v for v in value if v not in allowed]
if bad:
raise ValidationError(
_with_hint(
prop,
f"{prop['name']}: values {bad} not in allowed {sorted(allowed)}",
)
)
else: # 'options'
if value not in allowed:
raise ValidationError(
_with_hint(
prop,
f"{prop['name']}: {value!r} not in allowed {sorted(allowed)}",
)
)
def validate_node_data(
spec: dict[str, Any],
kwargs: dict[str, Any],
) -> dict[str, Any]:
"""Validate LLM-supplied kwargs against the node spec. Returns the data
dict to embed in the wire format, with defaults applied.
Raises:
ValidationError if kwargs contain unknown fields, omit required
fields, or carry obvious type mismatches.
"""
declared = {p["name"]: p for p in spec["properties"]}
# Unknown field names — the most common LLM hallucination.
unknown = set(kwargs) - set(declared)
if unknown:
raise ValidationError(
f"{spec['name']}: unknown field(s) {sorted(unknown)}. "
f"Allowed: {sorted(declared)}"
)
# Per-property validation
data: dict[str, Any] = {}
for name, prop in declared.items():
if name in kwargs:
value = kwargs[name]
elif prop.get("default") is not None:
value = prop["default"]
else:
value = None
# Scalar / collection shape
if prop["type"] in ("options", "multi_options"):
_check_options(prop, value)
else:
_check_scalar(prop, value)
# Nested fixed_collection rows — validate each row as a sub-spec.
if prop["type"] == "fixed_collection" and isinstance(value, list):
sub_spec = {"name": f"{spec['name']}.{name}", "properties": prop.get("properties") or []}
data[name] = [validate_node_data(sub_spec, row) for row in value]
continue
if value is not None:
data[name] = value
# Required check — must be set AND non-empty for strings.
for name, prop in declared.items():
if not prop.get("required"):
continue
val = data.get(name)
if val is None or (isinstance(val, str) and val.strip() == ""):
raise ValidationError(
_with_hint(
prop,
f"{spec['name']}: required field missing: {name}",
)
)
return data

View file

@ -0,0 +1,151 @@
"""HTTP client for the Dograh REST API.
Most endpoint methods come from `_GeneratedClient` (auto-generated from
the FastAPI OpenAPI spec see `scripts/generate_sdk.sh`). This class
adds the session/auth/cache surface around that mixin plus a couple of
ergonomic wrappers (`load_workflow`, `save_workflow`) that compose a
generated call with local `Workflow` hydration.
The SDK surface on the backend is controlled by decorating routes with
`@sdk_expose(method="...")`; anything else is invisible here.
"""
from __future__ import annotations
import os
from typing import Any
import httpx
from ._generated_client import _GeneratedClient
from ._generated_models import (
NodeSpec,
NodeTypesResponse,
UpdateWorkflowRequest,
WorkflowResponse,
)
from .errors import ApiError, SpecMismatchError
from .workflow import Workflow
class DograhClient(_GeneratedClient):
"""Sync HTTP client. Suitable for scripts, pytest, and the LLM SDK
exec sandbox.
Auth precedence:
1. `api_key` kwarg
2. `DOGRAH_API_KEY` env var
3. unauthenticated (most endpoints will 401)
"""
def __init__(
self,
*,
base_url: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
):
resolved_url = base_url or os.environ.get(
"DOGRAH_API_URL", "http://localhost:8000"
)
self.base_url = resolved_url.rstrip("/")
self.api_key = api_key or os.environ.get("DOGRAH_API_KEY")
headers = {"Accept": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
self._http = httpx.Client(
base_url=f"{self.base_url}/api/v1",
headers=headers,
timeout=timeout,
)
# Populated by the first call to `list_node_types` / `get_node_type`
# — avoids repeated round-trips when building a workflow.
self._spec_cache: dict[str, NodeSpec] = {}
self._spec_version: str | None = None
def close(self) -> None:
self._http.close()
def __enter__(self) -> DograhClient:
return self
def __exit__(self, *args: Any) -> None:
self.close()
@property
def spec_version(self) -> str | None:
"""Contract version reported by the server, or None until the
first `list_node_types` / `get_node_type` call."""
return self._spec_version
# ── spec discovery overrides (generated methods + caching) ────────
def list_node_types(self) -> NodeTypesResponse:
resp = super().list_node_types()
self._spec_version = resp.spec_version
for spec in resp.node_types:
self._spec_cache[spec.name] = spec
return resp
def get_node_type(self, name: str) -> NodeSpec:
cached = self._spec_cache.get(name)
if cached is not None:
return cached
try:
spec = super().get_node_type(name)
except ApiError as e:
if e.status_code == 404:
raise SpecMismatchError(f"Unknown node type: {name!r}") from e
raise
self._spec_cache[name] = spec
return spec
# ── ergonomic workflow wrappers ───────────────────────────────────
def load_workflow(self, workflow_id: int) -> Workflow:
"""Fetch a workflow and hydrate it into an editable `Workflow` builder."""
resp = self.get_workflow(workflow_id)
if not resp.workflow_definition:
raise ApiError(
200,
f"Workflow {workflow_id} has no definition to load",
body=resp.model_dump(mode="json"),
)
return Workflow.from_json(
resp.workflow_definition, client=self, name=resp.name
)
def save_workflow(self, workflow_id: int, workflow: Workflow) -> WorkflowResponse:
"""Persist a `Workflow` builder back to the server as a new draft."""
return self.update_workflow(
workflow_id,
body=UpdateWorkflowRequest(
name=workflow.name,
workflow_definition=workflow.to_json(),
),
)
# ── low-level ──────────────────────────────────────────────────
def _request(self, method: str, path: str, **kwargs: Any) -> Any:
resp = self._http.request(method, path, **kwargs)
if resp.status_code >= 400:
try:
body = resp.json()
if isinstance(body, dict):
message = body.get("detail") or body.get("message") or resp.text
else:
message = resp.text
except ValueError:
body = resp.text
message = resp.text
raise ApiError(resp.status_code, message, body=body)
if resp.status_code == 204 or not resp.content:
return None
try:
return resp.json()
except ValueError:
return resp.text

View file

@ -0,0 +1,332 @@
"""Typed SDK code generator.
Reads NodeSpecs (from the live backend, a JSON file, or the in-process
registry) and emits a dataclass per node type into an output directory.
The generated files live under `dograh_sdk.typed` and are committed to
the repository so `pip install dograh-sdk` ships typed classes without
requiring a regen step.
Run manually:
python -m dograh_sdk.codegen --api http://localhost:8000 \\
--out sdk/python/src/dograh_sdk/typed
python -m dograh_sdk.codegen --input specs.json \\
--out ./my_typed
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import textwrap
from pathlib import Path
from typing import Any
# ── property type → Python type annotation ────────────────────────────────
_SCALAR_PY_TYPES = {
"string": "str",
"number": "float",
"boolean": "bool",
"json": "dict[str, Any]",
"mention_textarea": "str",
"url": "str",
"recording_ref": "str",
"credential_ref": "str",
"tool_refs": "list[str]",
"document_refs": "list[str]",
}
def _snake_to_camel(name: str) -> str:
"""`start_call` → `StartCall` (class-name case)."""
return "".join(part.capitalize() or "_" for part in name.split("_"))
def _spec_class_name(spec_name: str) -> str:
# startCall → StartCall; agentNode → AgentNode; qa → Qa
if not spec_name:
return "Node"
return spec_name[0].upper() + spec_name[1:]
def _safe_py_repr(value: Any) -> str:
"""Render a JSON-serializable value as a Python literal."""
return repr(value)
def _py_type_for(prop: dict[str, Any], owner_class_name: str) -> tuple[str, str]:
"""Return (type_annotation, default_source) for one property.
Defaults are expressed as source code e.g., `"Start Call"`, `False`,
`field(default_factory=list)`, etc. An empty string means "no default"
(the field is required for the dataclass).
"""
t = prop["type"]
required = bool(prop.get("required"))
has_spec_default = prop.get("default") is not None
# Compound types first
if t == "options":
options = prop.get("options") or []
literals = ", ".join(repr(o["value"]) for o in options)
annotation = f"Literal[{literals}]" if literals else "str"
elif t == "multi_options":
options = prop.get("options") or []
literals = ", ".join(repr(o["value"]) for o in options)
inner = f"Literal[{literals}]" if literals else "str"
annotation = f"list[{inner}]"
elif t == "fixed_collection":
row_class = f"{owner_class_name}_{_spec_class_name(prop['name'])}Row"
annotation = f"list[{row_class}]"
else:
annotation = _SCALAR_PY_TYPES.get(t, "Any")
# Required fields without a spec default get no dataclass default
# (the user must set them). Optional fields default to None if the
# spec doesn't declare anything, or to the spec's default literal.
if has_spec_default:
spec_default = prop["default"]
if isinstance(spec_default, (dict, list, set)):
# Mutable defaults require default_factory — can't appear
# inline on a dataclass field.
default_src = f"field(default_factory=lambda: {spec_default!r})"
else:
default_src = _safe_py_repr(spec_default)
elif required:
default_src = "" # no default — caller must pass a value
elif t == "multi_options" or t == "fixed_collection" or t in (
"tool_refs",
"document_refs",
):
default_src = "field(default_factory=list)"
else:
default_src = "None"
annotation = f"Optional[{annotation}]"
return annotation, default_src
def _format_docstring(text: str, indent: int = 4) -> str:
"""Wrap a description into a triple-quoted docstring."""
pad = " " * indent
wrapped = textwrap.fill(
text.strip(),
width=76,
initial_indent=pad,
subsequent_indent=pad,
)
return f'{pad}"""\n{wrapped}\n{pad}"""'
# ── source rendering ─────────────────────────────────────────────────────
_FILE_HEADER = '''"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
'''
def _render_nested_row_dataclass(
owner_class_name: str,
parent_prop: dict[str, Any],
) -> str:
row_class = f"{owner_class_name}_{_spec_class_name(parent_prop['name'])}Row"
props = parent_prop.get("properties") or []
lines = [f"@dataclass(kw_only=True)", f"class {row_class}:"]
desc = parent_prop.get("description") or "Row in " + parent_prop["name"]
lines.append(_format_docstring(desc))
lines.append("")
if not props:
lines.append(" pass")
return "\n".join(lines)
for sub in props:
annotation, default_src = _py_type_for(sub, row_class)
if default_src:
lines.append(f" {sub['name']}: {annotation} = {default_src}")
else:
lines.append(f" {sub['name']}: {annotation}")
if sub.get("description"):
lines.append(_format_docstring(sub["description"]))
return "\n".join(lines)
def _render_spec_class(spec: dict[str, Any]) -> str:
class_name = _spec_class_name(spec["name"])
lines: list[str] = []
# Emit nested row dataclasses first so the main class can reference them.
nested_rendered: list[str] = []
for prop in spec.get("properties", []):
if prop["type"] == "fixed_collection":
nested_rendered.append(
_render_nested_row_dataclass(class_name, prop)
)
lines.extend(nested_rendered)
if nested_rendered:
lines.append("")
lines.append("@dataclass(kw_only=True)")
lines.append(f"class {class_name}(TypedNode):")
# Class docstring: description + optional llm_hint
description = spec.get("description") or ""
llm_hint = spec.get("llm_hint")
doc_text = description
if llm_hint:
doc_text = f"{description}\n\nLLM hint: {llm_hint}"
lines.append(_format_docstring(doc_text))
lines.append("")
# Spec-name discriminator
lines.append(f' type: ClassVar[str] = {spec["name"]!r}')
lines.append("")
# Split fields into "has default" and "required-no-default" so we can
# emit required ones first (dataclass rule, even though we use
# kw_only=True — still cleaner output).
with_defaults: list[tuple[dict, str, str]] = []
without_defaults: list[tuple[dict, str]] = []
for prop in spec.get("properties", []):
annotation, default_src = _py_type_for(prop, class_name)
if default_src:
with_defaults.append((prop, annotation, default_src))
else:
without_defaults.append((prop, annotation))
for prop, annotation in without_defaults:
lines.append(f" {prop['name']}: {annotation}")
if prop.get("description"):
lines.append(_format_docstring(prop["description"]))
lines.append("")
for prop, annotation, default_src in with_defaults:
lines.append(f" {prop['name']}: {annotation} = {default_src}")
if prop.get("description"):
lines.append(_format_docstring(prop["description"]))
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _render_init_module(spec_names: list[str]) -> str:
lines = [
'"""GENERATED — do not edit by hand.',
"",
"Re-exports every typed node class so users can write",
"`from dograh_sdk.typed import StartCall, AgentNode`.",
'"""',
"",
]
exports: list[str] = []
for spec_name in sorted(spec_names):
module_name = re.sub(r"(?<!^)(?=[A-Z])", "_", spec_name).lower()
# Handle abbreviations (qa, webhook, trigger, etc.): no underscore needed.
class_name = _spec_class_name(spec_name)
lines.append(f"from dograh_sdk.typed.{module_name} import {class_name}")
exports.append(class_name)
lines.append("from dograh_sdk.typed._base import TypedNode")
exports.append("TypedNode")
lines.append("")
lines.append("__all__ = [")
for name in sorted(exports):
lines.append(f' "{name}",')
lines.append("]")
lines.append("")
return "\n".join(lines)
def _module_name_for(spec_name: str) -> str:
"""startCall → start_call.py; agentNode → agent_node.py."""
return re.sub(r"(?<!^)(?=[A-Z])", "_", spec_name).lower()
# ── public entry points ──────────────────────────────────────────────────
def generate_all(specs: list[dict[str, Any]], out_dir: Path) -> None:
"""Emit typed dataclasses for every spec into `out_dir`.
Preserves `out_dir/_base.py` if present (it's hand-written). Writes
one `<spec>.py` file per spec and regenerates `__init__.py`.
"""
out_dir.mkdir(parents=True, exist_ok=True)
for spec in specs:
module_name = _module_name_for(spec["name"])
source = _FILE_HEADER + "\n\n" + _render_spec_class(spec) + "\n"
(out_dir / f"{module_name}.py").write_text(source)
(out_dir / "__init__.py").write_text(
_render_init_module([s["name"] for s in specs])
)
def _load_specs_from_json(path: Path) -> list[dict[str, Any]]:
raw = json.loads(path.read_text())
if isinstance(raw, dict) and "node_types" in raw:
return raw["node_types"]
if isinstance(raw, list):
return raw
raise SystemExit(f"{path}: expected list or {{node_types: [...]}}")
def _load_specs_from_api(base_url: str) -> list[dict[str, Any]]:
import httpx
resp = httpx.get(
f"{base_url.rstrip('/')}/api/v1/node-types",
timeout=30.0,
)
resp.raise_for_status()
body = resp.json()
return body.get("node_types", [])
def main(argv: list[str] | None = None) -> None:
parser = argparse.ArgumentParser(
prog="python -m dograh_sdk.codegen",
description="Generate typed SDK dataclasses from the Dograh node-spec catalog.",
)
source = parser.add_mutually_exclusive_group(required=True)
source.add_argument("--api", help="Dograh backend base URL")
source.add_argument("--input", help="Local JSON file with specs")
parser.add_argument(
"--out", required=True, help="Output directory for generated modules"
)
args = parser.parse_args(argv)
if args.api:
specs = _load_specs_from_api(args.api)
else:
specs = _load_specs_from_json(Path(args.input))
out_dir = Path(args.out)
generate_all(specs, out_dir)
print(
f"Generated {len(specs)} typed node modules "
f"({', '.join(s['name'] for s in specs)}) into {out_dir}"
)
if __name__ == "__main__":
main(sys.argv[1:])

View file

@ -0,0 +1,32 @@
"""SDK-level exceptions.
All errors raised from `dograh_sdk` are subclasses of `DograhSdkError` so
calling code can catch them as one category.
"""
class DograhSdkError(Exception):
"""Base class for all SDK errors."""
class ValidationError(DograhSdkError):
"""Raised when node data fails client-side validation (unknown field,
missing required field, obvious type mismatch).
Server-side Pydantic validation runs on save and may raise further
errors via `ApiError` this class covers the fast-fail cases caught
at the `Workflow.add()` call site.
"""
class ApiError(DograhSdkError):
"""Raised when the Dograh backend returns a non-2xx response."""
def __init__(self, status_code: int, message: str, body: object = None):
super().__init__(f"[{status_code}] {message}")
self.status_code = status_code
self.body = body
class SpecMismatchError(DograhSdkError):
"""Raised when a referenced node type isn't registered on the server."""

View file

@ -0,0 +1,25 @@
"""GENERATED — do not edit by hand.
Re-exports every typed node class so users can write
`from dograh_sdk.typed import StartCall, AgentNode`.
"""
from dograh_sdk.typed.agent_node import AgentNode
from dograh_sdk.typed.end_call import EndCall
from dograh_sdk.typed.global_node import GlobalNode
from dograh_sdk.typed.qa import Qa
from dograh_sdk.typed.start_call import StartCall
from dograh_sdk.typed.trigger import Trigger
from dograh_sdk.typed.webhook import Webhook
from dograh_sdk.typed._base import TypedNode
__all__ = [
"AgentNode",
"EndCall",
"GlobalNode",
"Qa",
"StartCall",
"Trigger",
"TypedNode",
"Webhook",
]

View file

@ -0,0 +1,49 @@
"""Base class for generated per-node-type dataclasses.
The typed SDK (`dograh_sdk.typed`) contains one generated dataclass per
node spec. Each subclass declares its spec name as a class-level `type`
and carries fields mirroring the spec's properties — giving IDEs full
autocomplete, docstrings on hover, and mypy/pyright coverage.
At runtime the typed objects feed into `Workflow.add_typed(node)`, which
unpacks them into the same kwargs the generic `add()` already accepts.
Wire format and validation rules are unchanged typed SDK is an
ergonomic layer, not a second validator.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any, ClassVar
@dataclass(kw_only=True)
class TypedNode:
"""Common base for every generated typed node class.
Subclasses override `type` with their spec name (e.g. `"startCall"`).
Subclasses should be declared with `@dataclass(kw_only=True)` so
required fields can appear after optional ones without triggering
Python's default-ordering rule.
"""
# Overridden per subclass via dataclass inheritance + ClassVar shadowing.
type: ClassVar[str] = ""
def to_dict(self) -> dict[str, Any]:
"""Dataclass fields as a plain dict, suitable for feeding directly
into `Workflow.add(type=..., **kwargs)`.
`type` is a ClassVar and is NOT included the caller passes it
separately.
Fields with "unset" sentinels (`None`, empty list) are filtered
out so the output matches what `Workflow.add(**kwargs)` would
produce when the user omits them. Downstream validation applies
spec defaults for absent keys.
"""
raw = asdict(self)
return {
k: v for k, v in raw.items()
if v is not None and v != []
}

View file

@ -0,0 +1,97 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class AgentNode_Extraction_variablesRow:
"""
Each entry declares one variable to capture from the conversation, with
its name, type, and per-variable hint.
"""
name: str
"""
snake_case identifier used downstream.
"""
type: Literal['string', 'number', 'boolean'] = 'string'
"""
Data type of the extracted value.
"""
prompt: Optional[str] = None
"""
Per-variable hint describing what to look for.
"""
@dataclass(kw_only=True)
class AgentNode(TypedNode):
"""
Conversational step the LLM runs one focused exchange. LLM hint: Mid-
call step executed by the LLM. Most workflows are a chain of agent nodes
connected by edges that describe transition conditions. Each agent node
can invoke tools and reference documents.
"""
type: ClassVar[str] = 'agentNode'
prompt: str
"""
Agent system prompt for this step. Supports {{template_variables}} from
extraction or pre-call fetch.
"""
name: str = 'Agent'
"""
Short identifier for this step (e.g., 'Qualify Budget'). Appears in call
logs and edge transition tools.
"""
allow_interrupt: bool = True
"""
When true, the user can interrupt the agent mid-utterance. Set false for
non-interruptible disclosures.
"""
add_global_prompt: bool = True
"""
When true and a Global node exists, prepends the global prompt to this
node's prompt at runtime.
"""
extraction_enabled: bool = False
"""
When true, runs an LLM extraction pass on transition out of this node to
capture variables from the conversation.
"""
extraction_prompt: Optional[str] = None
"""
Overall instructions guiding variable extraction.
"""
extraction_variables: list[AgentNode_Extraction_variablesRow] = field(default_factory=list)
"""
Each entry declares one variable to capture from the conversation, with
its name, type, and per-variable hint.
"""
tool_uuids: list[str] = field(default_factory=list)
"""
Tools the agent can invoke during this step.
"""
document_uuids: list[str] = field(default_factory=list)
"""
Documents the agent can reference during this step.
"""

View file

@ -0,0 +1,82 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class EndCall_Extraction_variablesRow:
"""
Each entry declares one variable to capture from the conversation, with
its name, data type, and a per-variable extraction hint.
"""
name: str
"""
snake_case identifier used downstream.
"""
type: Literal['string', 'number', 'boolean'] = 'string'
"""
The data type of the extracted value.
"""
prompt: Optional[str] = None
"""
Per-variable hint describing what to look for in the conversation.
"""
@dataclass(kw_only=True)
class EndCall(TypedNode):
"""
Closes the conversation and hangs up. LLM hint: Terminal node that
politely closes the conversation. Variable extraction can run before
hangup. A workflow can have multiple endCall nodes reached via different
edge conditions.
"""
type: ClassVar[str] = 'endCall'
prompt: str
"""
Agent system prompt for the closing exchange. Supports
{{template_variables}} from extraction or pre-call fetch.
"""
name: str = 'End Call'
"""
Short identifier shown in call logs. Should describe the ending context
(e.g., 'Successful close', 'Polite decline').
"""
add_global_prompt: bool = False
"""
When true and a Global node exists, prepends the global prompt to this
node's prompt at runtime.
"""
extraction_enabled: bool = False
"""
When true, runs an LLM extraction pass before hangup to capture
variables from the conversation.
"""
extraction_prompt: Optional[str] = None
"""
Overall instructions guiding how variables should be extracted from the
conversation.
"""
extraction_variables: list[EndCall_Extraction_variablesRow] = field(default_factory=list)
"""
Each entry declares one variable to capture from the conversation, with
its name, data type, and a per-variable extraction hint.
"""

View file

@ -0,0 +1,38 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class GlobalNode(TypedNode):
"""
Persona/tone appended to every agent node's prompt. LLM hint: System-
level prompt appended to every prompted node whose `add_global_prompt`
is true. Use it for persona, tone, and shared rules that apply across
the entire conversation. At most one global node per workflow.
"""
type: ClassVar[str] = 'globalNode'
name: str = 'Global Node'
"""
Short identifier shown in the canvas and call logs. Has no runtime
effect.
"""
prompt: str = "You are a helpful assistant whose mode of interaction with the user is voice. So don't use any special characters which can not be pronounced. Use short sentences and simple language."
"""
Text appended to every prompted node's system prompt when that node has
`add_global_prompt=true`. Supports {{template_variables}}.
"""

View file

@ -0,0 +1,87 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class Qa(TypedNode):
"""
Run LLM quality analysis on the call transcript. LLM hint: Runs an LLM
quality review on the call transcript after completion. Per-node
analysis splits the conversation by node and evaluates each segment
against the configured system prompt. Sampling, minimum duration, and
voicemail filters are supported.
"""
type: ClassVar[str] = 'qa'
name: str = 'QA Analysis'
"""
Short identifier for this QA configuration.
"""
qa_enabled: bool = True
"""
When false, the QA run is skipped.
"""
qa_system_prompt: str = 'You are a QA analyst evaluating a specific segment of a voice AI conversation.\n\n## Node Purpose\n{{node_summary}}\n\n## Previous Conversation Context (For start of conversation, previous conversation summary can be empty.)\n{{previous_conversation_summary}}\n\n## Tags to evaluate\n\nExamine the conversation carefully and identify which of the following tags apply:\n\n- UNCLEAR_CONVERSATION - The conversation is not coherent or clear, messages don\'t connect logically\n- ASSISTANT_IN_LOOP - The assistant asks the same question multiple times or gets stuck repeating itself\n- ASSISTANT_REPLY_IMPROPER - The assistant did not reply properly to the user\'s question/query or seems confused by what the user said\n- USER_FRUSTRATED - The user seems angry, frustrated, or is complaining about something in the call\n- USER_NOT_UNDERSTANDING - The user explicitly says they don\'t understand or repeatedly asks for clarification\n- HEARING_ISSUES - Either party can\'t hear the other ("hello?", "are you there?", "can you hear me?")\n- DEAD_AIR - Unusually long silences in the conversation (use the timestamps to judge)\n- USER_REQUESTING_FEATURE - The user asks for something the assistant can\'t fulfill\n- ASSISTANT_LACKS_EMPATHY - The assistant ignores the user\'s personal situation or emotional state and continues pitching or pushing the agenda.\n- USER_DETECTS_AI - The user suspects or identifies that they are talking to an AI/robot/bot rather than a real human.\n\n## Call metrics (pre-computed)\n\nUse these alongside the transcript for your analysis:\n{{metrics}}\n\n## Output format\n\nReturn ONLY a valid JSON object (no markdown):\n{\n "tags": [\n {\n "tag": "TAG_NAME",\n "reason": "Short reason with evidence from the transcript"\n }\n ],\n "overall_sentiment": "positive|neutral|negative",\n "call_quality_score": <1-10>,\n "summary": "1-2 sentence summary of this segment"\n}\n\nIf no tags apply, return an empty tags list. Always provide sentiment, score, and summary.'
"""
Instructions to the QA reviewer LLM. Supports placeholders:
`{node_summary}`, `{previous_conversation_summary}`, `{transcript}`,
`{metrics}`.
"""
qa_min_call_duration: float = 15
"""
Calls shorter than this are skipped.
"""
qa_voicemail_calls: bool = False
"""
When false, calls flagged as voicemail are skipped.
"""
qa_sample_rate: float = 100
"""
Percent of eligible calls QA'd. 100 means every call; lower values use
random sampling.
"""
qa_use_workflow_llm: bool = True
"""
When true, the QA pass uses the same LLM the workflow runs with. Set
false to specify a separate provider/model.
"""
qa_provider: Optional[Literal['openai', 'azure', 'openrouter', 'anthropic']] = None
"""
LLM provider used for the QA pass.
"""
qa_model: str = 'default'
"""
Model identifier (e.g., 'gpt-4o', 'claude-sonnet-4-6'). Provider-
specific.
"""
qa_api_key: Optional[str] = None
"""
API key for the chosen provider.
"""
qa_endpoint: Optional[str] = None
"""
Required for the Azure provider.
"""

View file

@ -0,0 +1,142 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class StartCall_Extraction_variablesRow:
"""
Each entry declares one variable to capture, with its name, data type,
and per-variable extraction hint.
"""
name: str
"""
snake_case identifier used downstream.
"""
type: Literal['string', 'number', 'boolean'] = 'string'
"""
Data type of the extracted value.
"""
prompt: Optional[str] = None
"""
Per-variable hint describing what to look for.
"""
@dataclass(kw_only=True)
class StartCall(TypedNode):
"""
Entry point of the workflow plays a greeting and opens the
conversation. LLM hint: The entry point of every workflow (exactly one
required). Plays an optional greeting, can fetch context from an
external API before the call begins, and executes the first
conversational turn.
"""
type: ClassVar[str] = 'startCall'
prompt: str
"""
Agent system prompt for the opening turn. Supports
{{template_variables}} from pre-call fetch and the initial context.
"""
name: str = 'Start Call'
"""
Short identifier shown in the canvas and call logs.
"""
greeting_type: Literal['text', 'audio'] = 'text'
"""
Whether the optional greeting is spoken via TTS from text or played from
a pre-recorded audio file.
"""
greeting: Optional[str] = None
"""
Text spoken via TTS at the start of the call. Supports
{{template_variables}}. Leave empty to skip the greeting.
"""
greeting_recording_id: Optional[str] = None
"""
Pre-recorded audio file played at the start of the call.
"""
allow_interrupt: bool = False
"""
When true, the user can interrupt the agent mid-utterance.
"""
add_global_prompt: bool = True
"""
When true and a Global node exists, prepends the global prompt to this
node's prompt at runtime.
"""
delayed_start: bool = False
"""
When true, the agent waits before speaking after pickup. Useful for
outbound calls where the called party needs a moment to settle.
"""
delayed_start_duration: float = 2.0
"""
Seconds to wait before the agent speaks. 0.110.
"""
extraction_enabled: bool = False
"""
When true, runs an LLM extraction pass on transition out of this node to
capture variables from the opening turn.
"""
extraction_prompt: Optional[str] = None
"""
Overall instructions guiding variable extraction.
"""
extraction_variables: list[StartCall_Extraction_variablesRow] = field(default_factory=list)
"""
Each entry declares one variable to capture, with its name, data type,
and per-variable extraction hint.
"""
tool_uuids: list[str] = field(default_factory=list)
"""
Tools the agent can invoke during the opening turn.
"""
document_uuids: list[str] = field(default_factory=list)
"""
Documents the agent can reference.
"""
pre_call_fetch_enabled: bool = False
"""
When true, makes a POST request to an external API before the call
starts and merges the JSON response into the call context as template
variables.
"""
pre_call_fetch_url: Optional[str] = None
"""
URL the pre-call POST request is sent to. The request body includes
caller and called numbers.
"""
pre_call_fetch_credential_uuid: Optional[str] = None
"""
Optional credential attached to the pre-call request.
"""

View file

@ -0,0 +1,42 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class Trigger(TypedNode):
"""
Public HTTP endpoint that launches the workflow. LLM hint: Exposes a
public HTTP POST endpoint. External systems call the URL (derived from
the auto-generated `trigger_path`) to launch this workflow. Requires an
API key in the `X-API-Key` header.
"""
type: ClassVar[str] = 'trigger'
name: str = 'API Trigger'
"""
Short identifier shown in the canvas. No runtime effect.
"""
enabled: bool = True
"""
When false, the trigger URL returns 404.
"""
trigger_path: Optional[str] = None
"""
Auto-generated UUID-style path segment that uniquely identifies this
trigger. Do not edit manually.
"""

View file

@ -0,0 +1,84 @@
"""GENERATED — do not edit by hand.
Regenerate with `python -m dograh_sdk.codegen` against the target
Dograh backend. Source of truth: each node's NodeSpec in the backend's
`api/services/workflow/node_specs/` directory.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal, Optional
from dograh_sdk.typed._base import TypedNode
@dataclass(kw_only=True)
class Webhook_Custom_headersRow:
"""
Additional HTTP headers to include with the request.
"""
key: str
"""
HTTP header name (e.g., 'X-Source').
"""
value: str
"""
Header value (supports {{template_variables}}).
"""
@dataclass(kw_only=True)
class Webhook(TypedNode):
"""
Send HTTP request after the workflow completes. LLM hint: Sends an HTTP
request to an external system after the workflow completes. The payload
is a Jinja-templated JSON body with access to `workflow_run_id`,
`initial_context`, `gathered_context`, `annotations`, and call metadata.
"""
type: ClassVar[str] = 'webhook'
name: str = 'Webhook'
"""
Short identifier shown in the canvas and run logs.
"""
enabled: bool = True
"""
When false, the webhook is skipped at run time.
"""
http_method: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE'] = 'POST'
"""
HTTP verb used for the outbound request.
"""
endpoint_url: Optional[str] = None
"""
URL the request is sent to.
"""
credential_uuid: Optional[str] = None
"""
Optional credential applied as the Authorization header.
"""
custom_headers: list[Webhook_Custom_headersRow] = field(default_factory=list)
"""
Additional HTTP headers to include with the request.
"""
payload_template: dict[str, Any] = field(default_factory=lambda: {'call_id': '{{workflow_run_id}}', 'first_name': '{{initial_context.first_name}}', 'rsvp': '{{gathered_context.rsvp}}', 'duration': '{{cost_info.call_duration_seconds}}', 'recording_url': '{{recording_url}}', 'transcript_url': '{{transcript_url}}'})
"""
JSON body of the request. Values are Jinja-rendered against the run
context `{{workflow_run_id}}`, `{{gathered_context.foo}}`,
`{{annotations.qa_xxx}}`, etc.
"""
retry_config: Optional[dict[str, Any]] = None
"""
Optional retry settings: `enabled` (bool), `max_retries` (int),
`retry_delay_seconds` (int).
"""

View file

@ -0,0 +1,247 @@
"""Workflow builder.
Users compose workflows by calling `Workflow.add(type="agentNode", ...)`
and `Workflow.edge(source, target, ...)`. Every call is validated
immediately against the spec catalog fetched from the backend, so LLM
hallucinations fail at the call site rather than at save time.
Wire format matches `ReactFlowDTO` from `api/services/workflow/dto.py`
1:1, so `Workflow.to_json()` output can be round-tripped through
`ReactFlowDTO.model_validate` without further translation.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from ._validation import validate_node_data
if TYPE_CHECKING:
from ._generated_models import NodeSpec
from .client import DograhClient
from .typed._base import TypedNode
@dataclass
class _Node:
id: str
type: str
position: dict[str, float]
data: dict[str, Any]
@dataclass
class _Edge:
id: str
source: str
target: str
data: dict[str, Any]
@dataclass
class NodeRef:
"""Opaque handle returned by `Workflow.add()`. Passed to `edge()` to
wire nodes together without relying on string IDs."""
id: str
type: str
class Workflow:
"""Typed builder that produces `ReactFlowDTO`-compatible JSON.
Usage:
wf = Workflow(client=client, name="loan_qual")
start = wf.add(type="startCall", name="greeting", prompt="...")
qualify = wf.add(type="agentNode", name="qualify", prompt="...")
wf.edge(start, qualify, label="interested", condition="...")
payload = wf.to_json()
"""
def __init__(self, *, client: DograhClient, name: str = "", description: str = ""):
self._client = client
self.name = name
self.description = description
self._nodes: list[_Node] = []
self._edges: list[_Edge] = []
# Auto-incrementing IDs match the pattern used by the existing UI.
self._next_node_id = 1
# ── node construction ──────────────────────────────────────────
def add(
self,
*,
type: str,
position: tuple[float, float] | None = None,
**kwargs: Any,
) -> NodeRef:
"""Add a node of the given type.
`type` is a spec name (e.g., "startCall", "agentNode"). Remaining
kwargs are validated against the spec unknown or missing
required fields raise `ValidationError` immediately.
`position` is optional (x, y) on the React-Flow canvas; omit for
auto-placement at origin.
"""
spec: NodeSpec = self._client.get_node_type(type)
data = validate_node_data(spec.model_dump(mode="json"), kwargs)
node_id = str(self._next_node_id)
self._next_node_id += 1
x, y = position if position is not None else (0.0, 0.0)
self._nodes.append(
_Node(
id=node_id,
type=type,
position={"x": float(x), "y": float(y)},
data=data,
)
)
return NodeRef(id=node_id, type=type)
def add_typed(
self,
node: "TypedNode",
*,
position: tuple[float, float] | None = None,
) -> NodeRef:
"""Typed variant of `add()` — takes a generated dataclass from
`dograh_sdk.typed` instead of string+kwargs.
Equivalent to:
wf.add(type=node.type, position=..., **node.to_dict())
Benefits: mypy/pyright catches misspelled fields at edit time,
and IDEs show field-level docstrings on hover.
"""
return self.add(type=node.type, position=position, **node.to_dict())
# ── edge construction ──────────────────────────────────────────
def edge(
self,
source: NodeRef,
target: NodeRef,
*,
label: str,
condition: str,
transition_speech: str | None = None,
transition_speech_type: str | None = None,
transition_speech_recording_id: str | None = None,
) -> None:
"""Connect two nodes with a labeled transition.
`label` identifies the branch in call logs and LLM tool schemas;
`condition` is the natural-language predicate the engine evaluates
to decide when to follow the edge.
"""
if not label or not label.strip():
from .errors import ValidationError
raise ValidationError("edge.label is required")
if not condition or not condition.strip():
from .errors import ValidationError
raise ValidationError("edge.condition is required")
data: dict[str, Any] = {"label": label, "condition": condition}
if transition_speech is not None:
data["transition_speech"] = transition_speech
if transition_speech_type is not None:
data["transition_speech_type"] = transition_speech_type
if transition_speech_recording_id is not None:
data["transition_speech_recording_id"] = transition_speech_recording_id
edge_id = f"{source.id}-{target.id}"
self._edges.append(
_Edge(id=edge_id, source=source.id, target=target.id, data=data)
)
# ── serialization ──────────────────────────────────────────────
def to_json(self) -> dict[str, Any]:
"""Serialize to the `ReactFlowDTO` wire format.
Passes directly through `ReactFlowDTO.model_validate` and the
`WorkflowGraph` constructor no translation layer needed.
"""
return {
"nodes": [
{
"id": n.id,
"type": n.type,
"position": n.position,
"data": n.data,
}
for n in self._nodes
],
"edges": [
{
"id": e.id,
"source": e.source,
"target": e.target,
"data": e.data,
}
for e in self._edges
],
"viewport": {"x": 0.0, "y": 0.0, "zoom": 1.0},
}
@classmethod
def from_json(
cls,
data: dict[str, Any],
*,
client: DograhClient,
name: str = "",
) -> Workflow:
"""Rebuild a Workflow from a stored `workflow_json` payload.
Useful for the MCP edit flow: fetch existing workflow, convert to
SDK objects, let the LLM mutate in code, serialize back.
"""
wf = cls(client=client, name=name)
# Rebuild nodes in the same order, preserving IDs.
for raw in data.get("nodes", []):
node_id = str(raw.get("id"))
spec: NodeSpec = client.get_node_type(raw["type"])
validated = validate_node_data(spec.model_dump(mode="json"), raw.get("data") or {})
wf._nodes.append(
_Node(
id=node_id,
type=raw["type"],
position=raw.get("position") or {"x": 0.0, "y": 0.0},
data=validated,
)
)
# Keep ID generator above the highest numeric ID seen so new
# nodes don't collide with existing ones.
numeric_ids = [int(n.id) for n in wf._nodes if n.id.isdigit()]
wf._next_node_id = max(numeric_ids, default=0) + 1
for raw in data.get("edges", []):
wf._edges.append(
_Edge(
id=str(raw.get("id") or f"{raw['source']}-{raw['target']}"),
source=str(raw["source"]),
target=str(raw["target"]),
data=raw.get("data") or {},
)
)
return wf
def find_node(self, predicate_or_id: Any) -> NodeRef | None:
"""Lookup a NodeRef by node id or custom predicate. Handy after
`from_json` when the LLM needs to reference an existing node."""
if callable(predicate_or_id):
for n in self._nodes:
if predicate_or_id(n):
return NodeRef(id=n.id, type=n.type)
return None
for n in self._nodes:
if n.id == str(predicate_or_id):
return NodeRef(id=n.id, type=n.type)
return None