dograh/api/services/integrations/registry.py

129 lines
3.6 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from typing import Any
from api.services.integrations.base import (
IntegrationCompletionContext,
IntegrationNodeRegistration,
IntegrationPackageSpec,
IntegrationRuntimeContext,
)
from api.services.workflow.node_data import BaseNodeData
_PACKAGE_REGISTRY: dict[str, IntegrationPackageSpec] = {}
def register_package(spec: IntegrationPackageSpec) -> IntegrationPackageSpec:
existing = _PACKAGE_REGISTRY.get(spec.name)
if existing is not None and existing is not spec:
raise ValueError(
f"Duplicate integration package registration for {spec.name!r}"
)
_PACKAGE_REGISTRY[spec.name] = spec
return spec
def _ensure_loaded() -> None:
from api.services.integrations.loader import ensure_integrations_loaded
ensure_integrations_loaded()
def all_packages() -> list[IntegrationPackageSpec]:
_ensure_loaded()
return [_PACKAGE_REGISTRY[name] for name in sorted(_PACKAGE_REGISTRY)]
def get_package(name: str) -> IntegrationPackageSpec | None:
_ensure_loaded()
return _PACKAGE_REGISTRY.get(name)
def get_node_registration(type_name: str) -> IntegrationNodeRegistration | None:
_ensure_loaded()
for package in _PACKAGE_REGISTRY.values():
for node in package.nodes:
if node.type_name == type_name:
return node
return None
def get_node_data_model(type_name: str) -> type[BaseNodeData] | None:
registration = get_node_registration(type_name)
return registration.data_model if registration else None
def get_node_spec(type_name: str):
registration = get_node_registration(type_name)
return registration.node_spec if registration else None
def get_node_secret_fields(type_name: str) -> tuple[str, ...]:
registration = get_node_registration(type_name)
return registration.sensitive_fields if registration else ()
def all_node_specs():
_ensure_loaded()
specs = []
for package in all_packages():
specs.extend(node.node_spec for node in package.nodes)
return specs
def all_routers():
_ensure_loaded()
routers = []
for package in all_packages():
routers.extend(package.routers)
return routers
def create_runtime_sessions(
context: IntegrationRuntimeContext,
):
_ensure_loaded()
sessions = []
for package in all_packages():
if package.create_runtime_sessions is None:
continue
sessions.extend(package.create_runtime_sessions(context))
return sessions
def iter_completion_packages(
workflow_definition: dict[str, Any],
):
_ensure_loaded()
nodes = workflow_definition.get("nodes", []) if workflow_definition else []
for package in all_packages():
node_types = {node.type_name for node in package.nodes}
package_nodes = [
node
for node in nodes
if isinstance(node, dict) and node.get("type") in node_types
]
if package_nodes:
yield package, package_nodes
def has_completion_handlers(workflow_definition: dict[str, Any]) -> bool:
return any(
package.run_completion is not None
for package, _nodes in iter_completion_packages(workflow_definition)
)
async def run_completion_handlers(
*,
context: IntegrationCompletionContext,
) -> dict[str, Any]:
results: dict[str, Any] = {}
for package, nodes in iter_completion_packages(context.workflow_definition):
if package.run_completion is None:
continue
package_result = await package.run_completion(nodes, context)
if package_result:
results.update(package_result)
return results