mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
* Add tuner integration * bump pipecat version * chore: update pipecat submodule to match upstream and use tuner-pipecat-sdk 0.2.0 Update pipecat submodule from 0.0.109.dev23 to 13e98d0d9 (the exact commit upstream dograh-hq/dograh uses after v1.30.1). This installs pipecat-ai as 1.1.0.post277 via setuptools_scm, satisfying tuner-pipecat-sdk 0.2.0's pipecat-ai>=1.0.0 requirement. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * wire tuner * feat: refactor integrations into self contained packages * chore: simplify ensure_public_access_token * fix: remove NodeSpec and make DTOs the source of truth * feat: send relevant signal to mcp using to_mcp_dict * fix: fix tests * cleanup: remove nango integrations * feat: add agents.md for integrations --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
239 lines
7.9 KiB
Markdown
239 lines
7.9 KiB
Markdown
# Integrations - Plugin Contract
|
|
|
|
`api/services/integrations/` is the extension seam for third-party integrations.
|
|
New integrations should be self-contained here. Do not bleed integration-specific
|
|
logic into `workflow/dto.py`, `workflow/node_specs/`, `run_pipeline.py`,
|
|
`event_handlers.py`, or `run_integrations.py` unless you are changing the generic
|
|
framework itself.
|
|
|
|
## Golden Path
|
|
|
|
Create a package:
|
|
|
|
```text
|
|
api/services/integrations/<name>/
|
|
├── __init__.py
|
|
├── node.py
|
|
├── runtime.py # optional
|
|
├── completion.py # optional
|
|
├── routes.py # optional
|
|
└── client.py # optional
|
|
```
|
|
|
|
The package self-registers on import via `register_package(...)`. Discovery is
|
|
automatic: `api/services/integrations/loader.py` imports every submodule under
|
|
`api.services.integrations` except the reserved internal names `base`, `loader`,
|
|
and `registry`.
|
|
|
|
## Registration Pattern
|
|
|
|
`__init__.py` should register one `IntegrationPackageSpec`, following the
|
|
existing integration packages in this directory.
|
|
|
|
Use:
|
|
|
|
```python
|
|
PACKAGE = register_package(
|
|
IntegrationPackageSpec(
|
|
name="<package_name>",
|
|
nodes=(NODE,),
|
|
create_runtime_sessions=create_runtime_sessions, # optional
|
|
run_completion=run_completion, # optional
|
|
routers=(router,), # optional
|
|
)
|
|
)
|
|
```
|
|
|
|
The package name is the registry key. The node `type_name` is the workflow node
|
|
type string and must stay stable once exposed.
|
|
|
|
## Node Model + Spec
|
|
|
|
For integration nodes, the Pydantic model is the source of truth. The serialized
|
|
`NodeSpec` is derived from it.
|
|
|
|
Refer to an existing integration node for the overall structure:
|
|
|
|
- Define one Pydantic model per node, inheriting
|
|
`api/services/workflow/node_data.py:BaseNodeData`.
|
|
- Annotate it with `@node_spec(...)`.
|
|
- Define fields with `spec_field(...)`.
|
|
- Generate the external spec with `SPEC = build_spec(ModelClass)`.
|
|
- Register the node with `IntegrationNodeRegistration(...)`.
|
|
|
|
Important rules:
|
|
|
|
- Put runtime validation in the model, not in the generated spec.
|
|
Example: conditional requiredness belongs in `@model_validator(mode="after")`.
|
|
- Keep `@node_spec(name=...)` and `IntegrationNodeRegistration.type_name`
|
|
identical. They are the same workflow node type string.
|
|
- Put wire constraints in the field itself where possible.
|
|
Example: `gt=0`, `min_length=1`, `pattern=...`.
|
|
- Put UI/export-only differences in `field_overrides`.
|
|
Use this for `display_name`, `description`, `required`, `spec_default`,
|
|
`display_options`, or property ordering.
|
|
- Use `spec_exclude=True` for internal fields that must exist in persisted data
|
|
but must not show up in `/api/v1/node-types`.
|
|
- Set `property_order=(...)` in `@node_spec(...)` when the editor field order
|
|
must remain stable.
|
|
|
|
Typical workflow graph constraints for configuration-only integration nodes:
|
|
|
|
```python
|
|
GraphConstraints(min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0)
|
|
```
|
|
|
|
These constraints control how the node can be connected in the workflow graph.
|
|
Use them for configuration nodes that are not conversational graph steps.
|
|
|
|
## Secret Fields
|
|
|
|
If the node stores secrets, register them in
|
|
`IntegrationNodeRegistration.sensitive_fields`.
|
|
|
|
That is enough for generic masking / masked round-trip preservation via
|
|
`api/services/configuration/masking.py`. Do not add new integration-specific
|
|
masking branches unless you are changing the shared masking framework.
|
|
|
|
## No Central DTO Edits
|
|
|
|
Do not add integration node classes to `api/services/workflow/dto.py`.
|
|
|
|
Integration nodes are resolved dynamically through:
|
|
|
|
- `get_node_data_model()` in `workflow/dto.py`
|
|
- `get_node_spec()` / `all_node_specs()` in `services/integrations/registry.py`
|
|
|
|
`RFNodeDTO` validates integration nodes by `type` through the registry. That is
|
|
the intended extension path.
|
|
|
|
## Live Call Path
|
|
|
|
If the integration needs live call data, implement `create_runtime_sessions(...)`
|
|
in `runtime.py` and return `IntegrationRuntimeSession` objects.
|
|
|
|
The generic wiring is already in `api/services/pipecat/run_pipeline.py`:
|
|
|
|
- `create_runtime_sessions(IntegrationRuntimeContext(...))` is called before the
|
|
pipeline task starts.
|
|
- Each returned session gets `session.attach(task)` called.
|
|
|
|
Use this only for lightweight live collection:
|
|
|
|
- attach task observers
|
|
- read context messages
|
|
- capture timing / turn / tool events
|
|
- build an in-memory snapshot
|
|
|
|
Do not do outbound network I/O in the live path unless there is a very strong
|
|
reason. Prefer the standard pattern: collect live, deliver after the call.
|
|
|
|
`IntegrationRuntimeContext` gives you:
|
|
|
|
- `workflow_run_id`
|
|
- `workflow_run`
|
|
- `workflow_graph`
|
|
- `run_definition`
|
|
- `user_config`
|
|
- `is_realtime`
|
|
- `context_messages_provider`
|
|
|
|
Typical runtime pattern:
|
|
|
|
- scan `context.workflow_graph.nodes.values()` for enabled nodes of your type
|
|
- if none are enabled, return `[]`
|
|
- build one collector/session per workflow run, not per node, unless the
|
|
integration truly needs multiple independent collectors
|
|
|
|
## Call-Finish Snapshot Path
|
|
|
|
`api/services/pipecat/event_handlers.py` finalizes runtime sessions before the
|
|
engine is cleaned up.
|
|
|
|
The generic flow:
|
|
|
|
1. `on_pipeline_finished` builds `gathered_context`
|
|
2. each runtime session gets `await session.on_call_finished(...)`
|
|
3. returned dicts are merged into `integration_logs`
|
|
4. those logs are persisted into `workflow_run.logs`
|
|
|
|
Use `on_call_finished(...)` to emit a compact, serializable snapshot that the
|
|
post-call completion handler can consume later. Return `None` if there is nothing
|
|
to persist.
|
|
|
|
This is the handoff between the live call path and the post-call task path.
|
|
|
|
## Post-Call Completion Path
|
|
|
|
If the integration needs durable artifacts, public URLs, retries, or external
|
|
delivery, implement `run_completion(nodes, context)` in `completion.py`.
|
|
|
|
The generic orchestration is already in `api/tasks/run_integrations.py`:
|
|
|
|
1. load the pinned workflow definition from the workflow run
|
|
2. create a public token if post-call work exists
|
|
3. run QA nodes first
|
|
4. run registered integration completion handlers
|
|
5. run webhook nodes last
|
|
|
|
Your handler receives:
|
|
|
|
- `nodes`: raw workflow node dicts for your node types only
|
|
- `IntegrationCompletionContext`:
|
|
- `workflow_run_id`
|
|
- `workflow_run`
|
|
- `workflow_definition`
|
|
- `definition_id`
|
|
- `organization_id`
|
|
- `public_token`
|
|
|
|
Expected completion handler pattern:
|
|
|
|
- validate each node with `YourNodeData.model_validate(node.get("data", {}))`
|
|
- skip disabled nodes
|
|
- read any runtime snapshot from `context.workflow_run.logs`
|
|
- build durable URLs using `public_token` when appropriate
|
|
- perform external delivery
|
|
- return a result dict keyed per node, usually with `node_id` embedded
|
|
|
|
Returned data is merged into `workflow_run.annotations`.
|
|
|
|
Do not assume completion runs inside the live pipeline process. Treat it as a
|
|
separate post-call worker step.
|
|
|
|
## Optional Routes
|
|
|
|
If an integration exposes HTTP routes, put them in `routes.py` and include the
|
|
router in `IntegrationPackageSpec.routers`.
|
|
|
|
Routers are mounted automatically by `api/routes/main.py` through `all_routers()`.
|
|
Do not edit `routes/main.py` for per-integration route wiring.
|
|
|
|
## Import Discipline
|
|
|
|
Keep package import side effects light.
|
|
|
|
The integration loader runs during:
|
|
|
|
- node-type/spec enumeration
|
|
- tests
|
|
- route startup
|
|
- registry access
|
|
|
|
So avoid top-level imports that require environment variables, network access,
|
|
or heavyweight initialization when possible. Prefer lazy imports inside
|
|
`run_completion()` / `create_runtime_sessions()` if the dependency is optional or
|
|
environment-sensitive.
|
|
|
|
## Testing Expectations
|
|
|
|
At minimum, new integrations should add coverage for:
|
|
|
|
- node model validation
|
|
- generated spec/example validity
|
|
- secret masking + masked round-trip preservation if secrets exist
|
|
- runtime snapshot creation if live collectors exist
|
|
- completion handler happy path and disabled-node skip path
|
|
|
|
If you change shared integration machinery, test the framework in the generic
|
|
code path, not only the concrete integration.
|