mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
feat: enable duplicate workflow feature
This commit is contained in:
parent
c61a3843a5
commit
93c45580e7
8 changed files with 396 additions and 2 deletions
|
|
@ -21,6 +21,7 @@ from api.services.configuration.masking import (
|
|||
)
|
||||
from api.services.mps_service_key_client import mps_service_key_client
|
||||
from api.services.workflow.dto import ReactFlowDTO
|
||||
from api.services.workflow.duplicate import duplicate_workflow
|
||||
from api.services.workflow.errors import ItemKind, WorkflowError
|
||||
from api.services.workflow.workflow import WorkflowGraph
|
||||
|
||||
|
|
@ -606,6 +607,38 @@ async def update_workflow(
|
|||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post("/{workflow_id}/duplicate")
|
||||
async def duplicate_workflow_endpoint(
|
||||
workflow_id: int,
|
||||
user: UserModel = Depends(get_user),
|
||||
) -> WorkflowResponse:
|
||||
"""Duplicate a workflow including its definition, configuration, recordings, and triggers."""
|
||||
try:
|
||||
workflow = await duplicate_workflow(
|
||||
workflow_id=workflow_id,
|
||||
organization_id=user.selected_organization_id,
|
||||
user_id=user.id,
|
||||
)
|
||||
return {
|
||||
"id": workflow.id,
|
||||
"name": workflow.name,
|
||||
"status": workflow.status,
|
||||
"created_at": workflow.created_at,
|
||||
"workflow_definition": mask_workflow_definition(
|
||||
workflow.workflow_definition_with_fallback
|
||||
),
|
||||
"current_definition_id": workflow.current_definition_id,
|
||||
"template_context_variables": workflow.template_context_variables,
|
||||
"call_disposition_codes": workflow.call_disposition_codes,
|
||||
"workflow_configurations": workflow.workflow_configurations,
|
||||
}
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=404, detail=str(e))
|
||||
except Exception as e:
|
||||
logger.error(f"Error duplicating workflow {workflow_id}: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post("/{workflow_id}/runs")
|
||||
async def create_workflow_run(
|
||||
workflow_id: int,
|
||||
|
|
|
|||
|
|
@ -98,3 +98,16 @@ class BaseFileSystem(ABC):
|
|||
bool: True if file was downloaded successfully, False otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def acopy_file(self, source_path: str, destination_path: str) -> bool:
|
||||
"""Copy a file within storage (server-side copy).
|
||||
|
||||
Args:
|
||||
source_path: Path to the source file
|
||||
destination_path: Path for the copied file
|
||||
|
||||
Returns:
|
||||
bool: True if file was copied successfully, False otherwise
|
||||
"""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -182,3 +182,20 @@ class MinioFileSystem(BaseFileSystem):
|
|||
return True
|
||||
except S3Error:
|
||||
return False
|
||||
|
||||
async def acopy_file(self, source_path: str, destination_path: str) -> bool:
|
||||
"""Copy a file within MinIO (server-side copy)."""
|
||||
try:
|
||||
from minio.commonconfig import CopySource
|
||||
|
||||
def _copy():
|
||||
self.client.copy_object(
|
||||
self.bucket_name,
|
||||
destination_path,
|
||||
CopySource(self.bucket_name, source_path),
|
||||
)
|
||||
|
||||
await asyncio.to_thread(_copy)
|
||||
return True
|
||||
except S3Error:
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -137,3 +137,18 @@ class S3FileSystem(BaseFileSystem):
|
|||
return True
|
||||
except ClientError:
|
||||
return False
|
||||
|
||||
async def acopy_file(self, source_path: str, destination_path: str) -> bool:
|
||||
"""Copy a file within S3 (server-side copy)."""
|
||||
try:
|
||||
async with self.session.client(
|
||||
"s3", region_name=self.region_name
|
||||
) as s3_client:
|
||||
await s3_client.copy_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=destination_path,
|
||||
CopySource={"Bucket": self.bucket_name, "Key": source_path},
|
||||
)
|
||||
return True
|
||||
except ClientError:
|
||||
return False
|
||||
|
|
|
|||
234
api/services/workflow/duplicate.py
Normal file
234
api/services/workflow/duplicate.py
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
"""Service for duplicating workflows including recordings."""
|
||||
|
||||
import copy
|
||||
import json
|
||||
import posixpath
|
||||
import uuid
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from api.db import db_client
|
||||
from api.db.workflow_recording_client import generate_short_id
|
||||
from api.enums import StorageBackend
|
||||
from api.services.storage import get_storage_for_backend, storage_fs
|
||||
|
||||
|
||||
def _extract_trigger_paths(workflow_definition: dict) -> list[str]:
|
||||
"""Extract trigger UUIDs from workflow definition."""
|
||||
if not workflow_definition:
|
||||
return []
|
||||
nodes = workflow_definition.get("nodes", [])
|
||||
trigger_paths = []
|
||||
for node in nodes:
|
||||
if node.get("type") == "trigger":
|
||||
trigger_path = node.get("data", {}).get("trigger_path")
|
||||
if trigger_path:
|
||||
trigger_paths.append(trigger_path)
|
||||
return trigger_paths
|
||||
|
||||
|
||||
def _regenerate_trigger_uuids(workflow_definition: dict) -> dict:
|
||||
"""Regenerate UUIDs for all trigger nodes to avoid conflicts."""
|
||||
if not workflow_definition:
|
||||
return workflow_definition
|
||||
updated_definition = copy.deepcopy(workflow_definition)
|
||||
nodes = updated_definition.get("nodes", [])
|
||||
for node in nodes:
|
||||
if node.get("type") == "trigger":
|
||||
if "data" not in node:
|
||||
node["data"] = {}
|
||||
node["data"]["trigger_path"] = str(uuid.uuid4())
|
||||
return updated_definition
|
||||
|
||||
|
||||
async def _generate_unique_recording_id() -> str:
|
||||
"""Generate a globally unique short recording ID."""
|
||||
for _ in range(10):
|
||||
rid = generate_short_id(8)
|
||||
exists = await db_client.check_recording_id_exists(rid)
|
||||
if not exists:
|
||||
return rid
|
||||
raise RuntimeError("Failed to generate unique recording ID")
|
||||
|
||||
|
||||
async def duplicate_workflow(
|
||||
workflow_id: int,
|
||||
organization_id: int,
|
||||
user_id: int,
|
||||
):
|
||||
"""Duplicate a workflow including its definition, config, recordings, and triggers.
|
||||
|
||||
Args:
|
||||
workflow_id: The source workflow ID to duplicate
|
||||
organization_id: The organization ID
|
||||
user_id: The user performing the duplication
|
||||
|
||||
Returns:
|
||||
The newly created workflow DB object
|
||||
|
||||
Raises:
|
||||
ValueError: If the source workflow is not found
|
||||
"""
|
||||
# 1. Fetch source workflow
|
||||
source = await db_client.get_workflow(workflow_id, organization_id=organization_id)
|
||||
if source is None:
|
||||
raise ValueError(f"Workflow with id {workflow_id} not found")
|
||||
|
||||
workflow_definition = copy.deepcopy(source.workflow_definition_with_fallback)
|
||||
|
||||
# 2. Regenerate trigger UUIDs to avoid conflicts
|
||||
if workflow_definition:
|
||||
workflow_definition = _regenerate_trigger_uuids(workflow_definition)
|
||||
|
||||
# 3. Create the new workflow
|
||||
new_name = f"{source.name} - Duplicate"
|
||||
new_workflow = await db_client.create_workflow(
|
||||
name=new_name,
|
||||
workflow_definition=workflow_definition,
|
||||
user_id=user_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# 4. Copy template_context_variables and workflow_configurations
|
||||
has_extra_fields = (
|
||||
source.template_context_variables or source.workflow_configurations
|
||||
)
|
||||
if has_extra_fields:
|
||||
new_workflow = await db_client.update_workflow(
|
||||
workflow_id=new_workflow.id,
|
||||
name=None,
|
||||
workflow_definition=None,
|
||||
template_context_variables=copy.deepcopy(source.template_context_variables),
|
||||
workflow_configurations=copy.deepcopy(source.workflow_configurations),
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# 5. Copy recordings with new IDs and storage paths scoped to new workflow
|
||||
recording_id_map = await _duplicate_recordings(
|
||||
source_workflow_id=workflow_id,
|
||||
new_workflow_id=new_workflow.id,
|
||||
organization_id=organization_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
# 6. Replace old recording IDs with new ones in the workflow definition
|
||||
if recording_id_map:
|
||||
workflow_definition = _replace_recording_ids(
|
||||
workflow_definition, recording_id_map
|
||||
)
|
||||
new_workflow = await db_client.update_workflow(
|
||||
workflow_id=new_workflow.id,
|
||||
name=None,
|
||||
workflow_definition=workflow_definition,
|
||||
template_context_variables=None,
|
||||
workflow_configurations=None,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# 7. Sync triggers for the new workflow
|
||||
if workflow_definition:
|
||||
trigger_paths = _extract_trigger_paths(workflow_definition)
|
||||
if trigger_paths:
|
||||
await db_client.sync_triggers_for_workflow(
|
||||
workflow_id=new_workflow.id,
|
||||
organization_id=organization_id,
|
||||
trigger_paths=trigger_paths,
|
||||
)
|
||||
|
||||
return new_workflow
|
||||
|
||||
|
||||
async def _duplicate_recordings(
|
||||
source_workflow_id: int,
|
||||
new_workflow_id: int,
|
||||
organization_id: int,
|
||||
user_id: int,
|
||||
) -> dict[str, str]:
|
||||
"""Duplicate all recordings for a workflow.
|
||||
|
||||
Copies each recording file to a new storage path scoped under the new
|
||||
workflow ID, and creates new DB records pointing to the copied files.
|
||||
|
||||
Returns:
|
||||
Mapping of old_recording_id -> new_recording_id
|
||||
"""
|
||||
recordings = await db_client.get_recordings_for_workflow(
|
||||
workflow_id=source_workflow_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
if not recordings:
|
||||
return {}
|
||||
|
||||
recording_id_map: dict[str, str] = {}
|
||||
|
||||
for rec in recordings:
|
||||
try:
|
||||
new_recording_id = await _generate_unique_recording_id()
|
||||
|
||||
# Build new storage key: recordings/{org_id}/{new_workflow_id}/{new_recording_id}/{filename}
|
||||
filename = posixpath.basename(rec.storage_key)
|
||||
new_storage_key = (
|
||||
f"recordings/{organization_id}"
|
||||
f"/{new_workflow_id}/{new_recording_id}"
|
||||
f"/{filename}"
|
||||
)
|
||||
|
||||
# Copy the file in storage (server-side copy)
|
||||
fs = _get_storage_for_recording(rec.storage_backend)
|
||||
copied = await fs.acopy_file(rec.storage_key, new_storage_key)
|
||||
if not copied:
|
||||
logger.warning(
|
||||
f"Failed to copy recording file {rec.recording_id}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
await db_client.create_recording(
|
||||
recording_id=new_recording_id,
|
||||
workflow_id=new_workflow_id,
|
||||
organization_id=organization_id,
|
||||
tts_provider=rec.tts_provider,
|
||||
tts_model=rec.tts_model,
|
||||
tts_voice_id=rec.tts_voice_id,
|
||||
transcript=rec.transcript,
|
||||
storage_key=new_storage_key,
|
||||
storage_backend=rec.storage_backend,
|
||||
created_by=user_id,
|
||||
metadata=copy.deepcopy(rec.recording_metadata),
|
||||
)
|
||||
|
||||
recording_id_map[rec.recording_id] = new_recording_id
|
||||
logger.info(
|
||||
f"Duplicated recording {rec.recording_id} -> {new_recording_id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error duplicating recording {rec.recording_id}: {e}")
|
||||
continue
|
||||
|
||||
return recording_id_map
|
||||
|
||||
|
||||
def _replace_recording_ids(
|
||||
workflow_definition: dict,
|
||||
recording_id_map: dict[str, str],
|
||||
) -> dict:
|
||||
"""Replace old recording IDs with new ones throughout the workflow definition.
|
||||
|
||||
Uses JSON serialization to do a thorough find-and-replace across all
|
||||
nested fields (node prompts, data, etc.).
|
||||
"""
|
||||
definition_str = json.dumps(workflow_definition)
|
||||
|
||||
for old_id, new_id in recording_id_map.items():
|
||||
definition_str = definition_str.replace(old_id, new_id)
|
||||
|
||||
return json.loads(definition_str)
|
||||
|
||||
|
||||
def _get_storage_for_recording(storage_backend: str):
|
||||
"""Get the appropriate storage filesystem for a recording's backend."""
|
||||
current_backend = StorageBackend.get_current_backend()
|
||||
if storage_backend == current_backend.value:
|
||||
return storage_fs
|
||||
return get_storage_for_backend(storage_backend)
|
||||
|
|
@ -1,10 +1,12 @@
|
|||
"use client";
|
||||
|
||||
import { ReactFlowInstance } from "@xyflow/react";
|
||||
import { AlertCircle, ArrowLeft, ChevronDown, Download, History, LoaderCircle, MoreVertical, Phone } from "lucide-react";
|
||||
import { AlertCircle, ArrowLeft, ChevronDown, Copy, Download, History, LoaderCircle, MoreVertical, Phone } from "lucide-react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useState } from "react";
|
||||
import { toast } from "sonner";
|
||||
|
||||
import { duplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePost } from "@/client/sdk.gen";
|
||||
import { WorkflowError } from "@/client/types.gen";
|
||||
import { FlowEdge, FlowNode } from "@/components/flow/types";
|
||||
import { Button } from "@/components/ui/button";
|
||||
|
|
@ -45,6 +47,7 @@ export const WorkflowEditorHeader = ({
|
|||
}: WorkflowEditorHeaderProps) => {
|
||||
const router = useRouter();
|
||||
const [savingWorkflow, setSavingWorkflow] = useState(false);
|
||||
const [duplicating, setDuplicating] = useState(false);
|
||||
|
||||
const hasValidationErrors = workflowValidationErrors.length > 0;
|
||||
const isCallDisabled = isDirty || hasValidationErrors;
|
||||
|
|
@ -59,6 +62,27 @@ export const WorkflowEditorHeader = ({
|
|||
router.push("/workflow");
|
||||
};
|
||||
|
||||
const handleDuplicate = async () => {
|
||||
if (duplicating) return;
|
||||
setDuplicating(true);
|
||||
const promise = duplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePost({
|
||||
path: { workflow_id: workflowId },
|
||||
});
|
||||
toast.promise(promise, {
|
||||
loading: "Duplicating workflow...",
|
||||
success: "Workflow duplicated successfully",
|
||||
error: "Failed to duplicate workflow",
|
||||
});
|
||||
try {
|
||||
const { data } = await promise;
|
||||
if (data?.id) {
|
||||
router.push(`/workflow/${data.id}`);
|
||||
}
|
||||
} finally {
|
||||
setDuplicating(false);
|
||||
}
|
||||
};
|
||||
|
||||
const handleDownloadWorkflow = () => {
|
||||
if (!rfInstance.current) return;
|
||||
|
||||
|
|
@ -224,6 +248,18 @@ export const WorkflowEditorHeader = ({
|
|||
<History className="w-4 h-4 mr-2" />
|
||||
View Runs
|
||||
</DropdownMenuItem>
|
||||
<DropdownMenuItem
|
||||
onClick={handleDuplicate}
|
||||
disabled={duplicating}
|
||||
className="text-white hover:bg-[#2a2a2a] cursor-pointer"
|
||||
>
|
||||
{duplicating ? (
|
||||
<LoaderCircle className="w-4 h-4 mr-2 animate-spin" />
|
||||
) : (
|
||||
<Copy className="w-4 h-4 mr-2" />
|
||||
)}
|
||||
{duplicating ? "Duplicating..." : "Duplicate Workflow"}
|
||||
</DropdownMenuItem>
|
||||
<DropdownMenuItem
|
||||
onClick={handleDownloadWorkflow}
|
||||
className="text-white hover:bg-[#2a2a2a] cursor-pointer"
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -2278,6 +2278,41 @@ export type UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses = {
|
|||
|
||||
export type UpdateWorkflowApiV1WorkflowWorkflowIdPutResponse = UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses[keyof UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses];
|
||||
|
||||
export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostData = {
|
||||
body?: never;
|
||||
headers?: {
|
||||
authorization?: string | null;
|
||||
'X-API-Key'?: string | null;
|
||||
};
|
||||
path: {
|
||||
workflow_id: number;
|
||||
};
|
||||
query?: never;
|
||||
url: '/api/v1/workflow/{workflow_id}/duplicate';
|
||||
};
|
||||
|
||||
export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors = {
|
||||
/**
|
||||
* Not found
|
||||
*/
|
||||
404: unknown;
|
||||
/**
|
||||
* Validation Error
|
||||
*/
|
||||
422: HttpValidationError;
|
||||
};
|
||||
|
||||
export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostError = DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors[keyof DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors];
|
||||
|
||||
export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses = {
|
||||
/**
|
||||
* Successful Response
|
||||
*/
|
||||
200: WorkflowResponse;
|
||||
};
|
||||
|
||||
export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponse = DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses[keyof DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses];
|
||||
|
||||
export type GetWorkflowRunsApiV1WorkflowWorkflowIdRunsGetData = {
|
||||
body?: never;
|
||||
headers?: {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue