diff --git a/api/routes/workflow.py b/api/routes/workflow.py
index 9938e77..87cfe8a 100644
--- a/api/routes/workflow.py
+++ b/api/routes/workflow.py
@@ -21,6 +21,7 @@ from api.services.configuration.masking import (
)
from api.services.mps_service_key_client import mps_service_key_client
from api.services.workflow.dto import ReactFlowDTO
+from api.services.workflow.duplicate import duplicate_workflow
from api.services.workflow.errors import ItemKind, WorkflowError
from api.services.workflow.workflow import WorkflowGraph
@@ -606,6 +607,38 @@ async def update_workflow(
raise HTTPException(status_code=500, detail=str(e))
+@router.post("/{workflow_id}/duplicate")
+async def duplicate_workflow_endpoint(
+ workflow_id: int,
+ user: UserModel = Depends(get_user),
+) -> WorkflowResponse:
+ """Duplicate a workflow including its definition, configuration, recordings, and triggers."""
+ try:
+ workflow = await duplicate_workflow(
+ workflow_id=workflow_id,
+ organization_id=user.selected_organization_id,
+ user_id=user.id,
+ )
+ return {
+ "id": workflow.id,
+ "name": workflow.name,
+ "status": workflow.status,
+ "created_at": workflow.created_at,
+ "workflow_definition": mask_workflow_definition(
+ workflow.workflow_definition_with_fallback
+ ),
+ "current_definition_id": workflow.current_definition_id,
+ "template_context_variables": workflow.template_context_variables,
+ "call_disposition_codes": workflow.call_disposition_codes,
+ "workflow_configurations": workflow.workflow_configurations,
+ }
+ except ValueError as e:
+ raise HTTPException(status_code=404, detail=str(e))
+ except Exception as e:
+ logger.error(f"Error duplicating workflow {workflow_id}: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+
@router.post("/{workflow_id}/runs")
async def create_workflow_run(
workflow_id: int,
diff --git a/api/services/filesystem/base.py b/api/services/filesystem/base.py
index 92fa132..d840d2b 100644
--- a/api/services/filesystem/base.py
+++ b/api/services/filesystem/base.py
@@ -98,3 +98,16 @@ class BaseFileSystem(ABC):
bool: True if file was downloaded successfully, False otherwise
"""
pass
+
+ @abstractmethod
+ async def acopy_file(self, source_path: str, destination_path: str) -> bool:
+ """Copy a file within storage (server-side copy).
+
+ Args:
+ source_path: Path to the source file
+ destination_path: Path for the copied file
+
+ Returns:
+ bool: True if file was copied successfully, False otherwise
+ """
+ pass
diff --git a/api/services/filesystem/minio.py b/api/services/filesystem/minio.py
index faedab6..2e52c66 100644
--- a/api/services/filesystem/minio.py
+++ b/api/services/filesystem/minio.py
@@ -182,3 +182,20 @@ class MinioFileSystem(BaseFileSystem):
return True
except S3Error:
return False
+
+ async def acopy_file(self, source_path: str, destination_path: str) -> bool:
+ """Copy a file within MinIO (server-side copy)."""
+ try:
+ from minio.commonconfig import CopySource
+
+ def _copy():
+ self.client.copy_object(
+ self.bucket_name,
+ destination_path,
+ CopySource(self.bucket_name, source_path),
+ )
+
+ await asyncio.to_thread(_copy)
+ return True
+ except S3Error:
+ return False
diff --git a/api/services/filesystem/s3.py b/api/services/filesystem/s3.py
index 5ae9918..1a10a43 100644
--- a/api/services/filesystem/s3.py
+++ b/api/services/filesystem/s3.py
@@ -137,3 +137,18 @@ class S3FileSystem(BaseFileSystem):
return True
except ClientError:
return False
+
+ async def acopy_file(self, source_path: str, destination_path: str) -> bool:
+ """Copy a file within S3 (server-side copy)."""
+ try:
+ async with self.session.client(
+ "s3", region_name=self.region_name
+ ) as s3_client:
+ await s3_client.copy_object(
+ Bucket=self.bucket_name,
+ Key=destination_path,
+ CopySource={"Bucket": self.bucket_name, "Key": source_path},
+ )
+ return True
+ except ClientError:
+ return False
diff --git a/api/services/workflow/duplicate.py b/api/services/workflow/duplicate.py
new file mode 100644
index 0000000..7f530c6
--- /dev/null
+++ b/api/services/workflow/duplicate.py
@@ -0,0 +1,234 @@
+"""Service for duplicating workflows including recordings."""
+
+import copy
+import json
+import posixpath
+import uuid
+
+from loguru import logger
+
+from api.db import db_client
+from api.db.workflow_recording_client import generate_short_id
+from api.enums import StorageBackend
+from api.services.storage import get_storage_for_backend, storage_fs
+
+
+def _extract_trigger_paths(workflow_definition: dict) -> list[str]:
+ """Extract trigger UUIDs from workflow definition."""
+ if not workflow_definition:
+ return []
+ nodes = workflow_definition.get("nodes", [])
+ trigger_paths = []
+ for node in nodes:
+ if node.get("type") == "trigger":
+ trigger_path = node.get("data", {}).get("trigger_path")
+ if trigger_path:
+ trigger_paths.append(trigger_path)
+ return trigger_paths
+
+
+def _regenerate_trigger_uuids(workflow_definition: dict) -> dict:
+ """Regenerate UUIDs for all trigger nodes to avoid conflicts."""
+ if not workflow_definition:
+ return workflow_definition
+ updated_definition = copy.deepcopy(workflow_definition)
+ nodes = updated_definition.get("nodes", [])
+ for node in nodes:
+ if node.get("type") == "trigger":
+ if "data" not in node:
+ node["data"] = {}
+ node["data"]["trigger_path"] = str(uuid.uuid4())
+ return updated_definition
+
+
+async def _generate_unique_recording_id() -> str:
+ """Generate a globally unique short recording ID."""
+ for _ in range(10):
+ rid = generate_short_id(8)
+ exists = await db_client.check_recording_id_exists(rid)
+ if not exists:
+ return rid
+ raise RuntimeError("Failed to generate unique recording ID")
+
+
+async def duplicate_workflow(
+ workflow_id: int,
+ organization_id: int,
+ user_id: int,
+):
+ """Duplicate a workflow including its definition, config, recordings, and triggers.
+
+ Args:
+ workflow_id: The source workflow ID to duplicate
+ organization_id: The organization ID
+ user_id: The user performing the duplication
+
+ Returns:
+ The newly created workflow DB object
+
+ Raises:
+ ValueError: If the source workflow is not found
+ """
+ # 1. Fetch source workflow
+ source = await db_client.get_workflow(workflow_id, organization_id=organization_id)
+ if source is None:
+ raise ValueError(f"Workflow with id {workflow_id} not found")
+
+ workflow_definition = copy.deepcopy(source.workflow_definition_with_fallback)
+
+ # 2. Regenerate trigger UUIDs to avoid conflicts
+ if workflow_definition:
+ workflow_definition = _regenerate_trigger_uuids(workflow_definition)
+
+ # 3. Create the new workflow
+ new_name = f"{source.name} - Duplicate"
+ new_workflow = await db_client.create_workflow(
+ name=new_name,
+ workflow_definition=workflow_definition,
+ user_id=user_id,
+ organization_id=organization_id,
+ )
+
+ # 4. Copy template_context_variables and workflow_configurations
+ has_extra_fields = (
+ source.template_context_variables or source.workflow_configurations
+ )
+ if has_extra_fields:
+ new_workflow = await db_client.update_workflow(
+ workflow_id=new_workflow.id,
+ name=None,
+ workflow_definition=None,
+ template_context_variables=copy.deepcopy(source.template_context_variables),
+ workflow_configurations=copy.deepcopy(source.workflow_configurations),
+ organization_id=organization_id,
+ )
+
+ # 5. Copy recordings with new IDs and storage paths scoped to new workflow
+ recording_id_map = await _duplicate_recordings(
+ source_workflow_id=workflow_id,
+ new_workflow_id=new_workflow.id,
+ organization_id=organization_id,
+ user_id=user_id,
+ )
+
+ # 6. Replace old recording IDs with new ones in the workflow definition
+ if recording_id_map:
+ workflow_definition = _replace_recording_ids(
+ workflow_definition, recording_id_map
+ )
+ new_workflow = await db_client.update_workflow(
+ workflow_id=new_workflow.id,
+ name=None,
+ workflow_definition=workflow_definition,
+ template_context_variables=None,
+ workflow_configurations=None,
+ organization_id=organization_id,
+ )
+
+ # 7. Sync triggers for the new workflow
+ if workflow_definition:
+ trigger_paths = _extract_trigger_paths(workflow_definition)
+ if trigger_paths:
+ await db_client.sync_triggers_for_workflow(
+ workflow_id=new_workflow.id,
+ organization_id=organization_id,
+ trigger_paths=trigger_paths,
+ )
+
+ return new_workflow
+
+
+async def _duplicate_recordings(
+ source_workflow_id: int,
+ new_workflow_id: int,
+ organization_id: int,
+ user_id: int,
+) -> dict[str, str]:
+ """Duplicate all recordings for a workflow.
+
+ Copies each recording file to a new storage path scoped under the new
+ workflow ID, and creates new DB records pointing to the copied files.
+
+ Returns:
+ Mapping of old_recording_id -> new_recording_id
+ """
+ recordings = await db_client.get_recordings_for_workflow(
+ workflow_id=source_workflow_id,
+ organization_id=organization_id,
+ )
+
+ if not recordings:
+ return {}
+
+ recording_id_map: dict[str, str] = {}
+
+ for rec in recordings:
+ try:
+ new_recording_id = await _generate_unique_recording_id()
+
+ # Build new storage key: recordings/{org_id}/{new_workflow_id}/{new_recording_id}/{filename}
+ filename = posixpath.basename(rec.storage_key)
+ new_storage_key = (
+ f"recordings/{organization_id}"
+ f"/{new_workflow_id}/{new_recording_id}"
+ f"/{filename}"
+ )
+
+ # Copy the file in storage (server-side copy)
+ fs = _get_storage_for_recording(rec.storage_backend)
+ copied = await fs.acopy_file(rec.storage_key, new_storage_key)
+ if not copied:
+ logger.warning(
+ f"Failed to copy recording file {rec.recording_id}, skipping"
+ )
+ continue
+
+ await db_client.create_recording(
+ recording_id=new_recording_id,
+ workflow_id=new_workflow_id,
+ organization_id=organization_id,
+ tts_provider=rec.tts_provider,
+ tts_model=rec.tts_model,
+ tts_voice_id=rec.tts_voice_id,
+ transcript=rec.transcript,
+ storage_key=new_storage_key,
+ storage_backend=rec.storage_backend,
+ created_by=user_id,
+ metadata=copy.deepcopy(rec.recording_metadata),
+ )
+
+ recording_id_map[rec.recording_id] = new_recording_id
+ logger.info(
+ f"Duplicated recording {rec.recording_id} -> {new_recording_id}"
+ )
+
+ except Exception as e:
+ logger.error(f"Error duplicating recording {rec.recording_id}: {e}")
+ continue
+
+ return recording_id_map
+
+
+def _replace_recording_ids(
+ workflow_definition: dict,
+ recording_id_map: dict[str, str],
+) -> dict:
+ """Replace old recording IDs with new ones throughout the workflow definition.
+
+ Uses JSON serialization to do a thorough find-and-replace across all
+ nested fields (node prompts, data, etc.).
+ """
+ definition_str = json.dumps(workflow_definition)
+
+ for old_id, new_id in recording_id_map.items():
+ definition_str = definition_str.replace(old_id, new_id)
+
+ return json.loads(definition_str)
+
+
+def _get_storage_for_recording(storage_backend: str):
+ """Get the appropriate storage filesystem for a recording's backend."""
+ current_backend = StorageBackend.get_current_backend()
+ if storage_backend == current_backend.value:
+ return storage_fs
+ return get_storage_for_backend(storage_backend)
diff --git a/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx b/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx
index 9c1ec09..988683b 100644
--- a/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx
+++ b/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx
@@ -1,10 +1,12 @@
"use client";
import { ReactFlowInstance } from "@xyflow/react";
-import { AlertCircle, ArrowLeft, ChevronDown, Download, History, LoaderCircle, MoreVertical, Phone } from "lucide-react";
+import { AlertCircle, ArrowLeft, ChevronDown, Copy, Download, History, LoaderCircle, MoreVertical, Phone } from "lucide-react";
import { useRouter } from "next/navigation";
import { useState } from "react";
+import { toast } from "sonner";
+import { duplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePost } from "@/client/sdk.gen";
import { WorkflowError } from "@/client/types.gen";
import { FlowEdge, FlowNode } from "@/components/flow/types";
import { Button } from "@/components/ui/button";
@@ -45,6 +47,7 @@ export const WorkflowEditorHeader = ({
}: WorkflowEditorHeaderProps) => {
const router = useRouter();
const [savingWorkflow, setSavingWorkflow] = useState(false);
+ const [duplicating, setDuplicating] = useState(false);
const hasValidationErrors = workflowValidationErrors.length > 0;
const isCallDisabled = isDirty || hasValidationErrors;
@@ -59,6 +62,27 @@ export const WorkflowEditorHeader = ({
router.push("/workflow");
};
+ const handleDuplicate = async () => {
+ if (duplicating) return;
+ setDuplicating(true);
+ const promise = duplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePost({
+ path: { workflow_id: workflowId },
+ });
+ toast.promise(promise, {
+ loading: "Duplicating workflow...",
+ success: "Workflow duplicated successfully",
+ error: "Failed to duplicate workflow",
+ });
+ try {
+ const { data } = await promise;
+ if (data?.id) {
+ router.push(`/workflow/${data.id}`);
+ }
+ } finally {
+ setDuplicating(false);
+ }
+ };
+
const handleDownloadWorkflow = () => {
if (!rfInstance.current) return;
@@ -224,6 +248,18 @@ export const WorkflowEditorHeader = ({
View Runs
+
+ {duplicating ? (
+
+ ) : (
+
+ )}
+ {duplicating ? "Duplicating..." : "Duplicate Workflow"}
+
= ClientOptions & {
/**
@@ -378,6 +378,17 @@ export const updateWorkflowApiV1WorkflowWorkflowIdPut = (options: Options) => {
+ return (options.client ?? _heyApiClient).post({
+ url: '/api/v1/workflow/{workflow_id}/duplicate',
+ ...options
+ });
+};
+
/**
* Get Workflow Runs
* Get workflow runs with optional filtering and sorting.
diff --git a/ui/src/client/types.gen.ts b/ui/src/client/types.gen.ts
index d538f11..ef287f0 100644
--- a/ui/src/client/types.gen.ts
+++ b/ui/src/client/types.gen.ts
@@ -2278,6 +2278,41 @@ export type UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses = {
export type UpdateWorkflowApiV1WorkflowWorkflowIdPutResponse = UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses[keyof UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses];
+export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostData = {
+ body?: never;
+ headers?: {
+ authorization?: string | null;
+ 'X-API-Key'?: string | null;
+ };
+ path: {
+ workflow_id: number;
+ };
+ query?: never;
+ url: '/api/v1/workflow/{workflow_id}/duplicate';
+};
+
+export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors = {
+ /**
+ * Not found
+ */
+ 404: unknown;
+ /**
+ * Validation Error
+ */
+ 422: HttpValidationError;
+};
+
+export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostError = DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors[keyof DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors];
+
+export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses = {
+ /**
+ * Successful Response
+ */
+ 200: WorkflowResponse;
+};
+
+export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponse = DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses[keyof DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses];
+
export type GetWorkflowRunsApiV1WorkflowWorkflowIdRunsGetData = {
body?: never;
headers?: {