dograh/api/services/integrations/base.py

70 lines
1.7 KiB
Python
Raw Normal View History

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Protocol
from fastapi import APIRouter
from api.services.workflow.node_data import BaseNodeData
from api.services.workflow.node_specs._base import NodeSpec
class IntegrationRuntimeSession(Protocol):
name: str
def attach(self, task: Any) -> None: ...
async def on_call_finished(
self,
*,
gathered_context: dict[str, Any],
) -> dict[str, Any] | None: ...
@dataclass(frozen=True)
class IntegrationRuntimeContext:
workflow_run_id: int
workflow_run: Any
workflow_graph: Any
run_definition: Any
user_config: Any
is_realtime: bool
context_messages_provider: Callable[[], list[dict[str, Any]]]
@dataclass(frozen=True)
class IntegrationCompletionContext:
workflow_run_id: int
workflow_run: Any
workflow_definition: dict[str, Any]
definition_id: int | None
organization_id: int
public_token: str | None
RuntimeFactory = Callable[
[IntegrationRuntimeContext],
list[IntegrationRuntimeSession],
]
CompletionHandler = Callable[
[list[dict[str, Any]], IntegrationCompletionContext],
Awaitable[dict[str, Any]],
]
@dataclass(frozen=True)
class IntegrationNodeRegistration:
type_name: str
data_model: type[BaseNodeData]
node_spec: NodeSpec
sensitive_fields: tuple[str, ...] = ()
@dataclass(frozen=True)
class IntegrationPackageSpec:
name: str
nodes: tuple[IntegrationNodeRegistration, ...] = ()
routers: tuple[APIRouter, ...] = ()
create_runtime_sessions: RuntimeFactory | None = None
run_completion: CompletionHandler | None = None