From 93c45580e79a9326d91371d6c35c4a2850d0c46f Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sat, 21 Mar 2026 12:21:40 +0530 Subject: [PATCH] feat: enable duplicate workflow feature --- api/routes/workflow.py | 33 +++ api/services/filesystem/base.py | 13 + api/services/filesystem/minio.py | 17 ++ api/services/filesystem/s3.py | 15 ++ api/services/workflow/duplicate.py | 234 ++++++++++++++++++ .../components/WorkflowEditorHeader.tsx | 38 ++- ui/src/client/sdk.gen.ts | 13 +- ui/src/client/types.gen.ts | 35 +++ 8 files changed, 396 insertions(+), 2 deletions(-) create mode 100644 api/services/workflow/duplicate.py diff --git a/api/routes/workflow.py b/api/routes/workflow.py index 9938e77..87cfe8a 100644 --- a/api/routes/workflow.py +++ b/api/routes/workflow.py @@ -21,6 +21,7 @@ from api.services.configuration.masking import ( ) from api.services.mps_service_key_client import mps_service_key_client from api.services.workflow.dto import ReactFlowDTO +from api.services.workflow.duplicate import duplicate_workflow from api.services.workflow.errors import ItemKind, WorkflowError from api.services.workflow.workflow import WorkflowGraph @@ -606,6 +607,38 @@ async def update_workflow( raise HTTPException(status_code=500, detail=str(e)) +@router.post("/{workflow_id}/duplicate") +async def duplicate_workflow_endpoint( + workflow_id: int, + user: UserModel = Depends(get_user), +) -> WorkflowResponse: + """Duplicate a workflow including its definition, configuration, recordings, and triggers.""" + try: + workflow = await duplicate_workflow( + workflow_id=workflow_id, + organization_id=user.selected_organization_id, + user_id=user.id, + ) + return { + "id": workflow.id, + "name": workflow.name, + "status": workflow.status, + "created_at": workflow.created_at, + "workflow_definition": mask_workflow_definition( + workflow.workflow_definition_with_fallback + ), + "current_definition_id": workflow.current_definition_id, + "template_context_variables": workflow.template_context_variables, + "call_disposition_codes": workflow.call_disposition_codes, + "workflow_configurations": workflow.workflow_configurations, + } + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error duplicating workflow {workflow_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/{workflow_id}/runs") async def create_workflow_run( workflow_id: int, diff --git a/api/services/filesystem/base.py b/api/services/filesystem/base.py index 92fa132..d840d2b 100644 --- a/api/services/filesystem/base.py +++ b/api/services/filesystem/base.py @@ -98,3 +98,16 @@ class BaseFileSystem(ABC): bool: True if file was downloaded successfully, False otherwise """ pass + + @abstractmethod + async def acopy_file(self, source_path: str, destination_path: str) -> bool: + """Copy a file within storage (server-side copy). + + Args: + source_path: Path to the source file + destination_path: Path for the copied file + + Returns: + bool: True if file was copied successfully, False otherwise + """ + pass diff --git a/api/services/filesystem/minio.py b/api/services/filesystem/minio.py index faedab6..2e52c66 100644 --- a/api/services/filesystem/minio.py +++ b/api/services/filesystem/minio.py @@ -182,3 +182,20 @@ class MinioFileSystem(BaseFileSystem): return True except S3Error: return False + + async def acopy_file(self, source_path: str, destination_path: str) -> bool: + """Copy a file within MinIO (server-side copy).""" + try: + from minio.commonconfig import CopySource + + def _copy(): + self.client.copy_object( + self.bucket_name, + destination_path, + CopySource(self.bucket_name, source_path), + ) + + await asyncio.to_thread(_copy) + return True + except S3Error: + return False diff --git a/api/services/filesystem/s3.py b/api/services/filesystem/s3.py index 5ae9918..1a10a43 100644 --- a/api/services/filesystem/s3.py +++ b/api/services/filesystem/s3.py @@ -137,3 +137,18 @@ class S3FileSystem(BaseFileSystem): return True except ClientError: return False + + async def acopy_file(self, source_path: str, destination_path: str) -> bool: + """Copy a file within S3 (server-side copy).""" + try: + async with self.session.client( + "s3", region_name=self.region_name + ) as s3_client: + await s3_client.copy_object( + Bucket=self.bucket_name, + Key=destination_path, + CopySource={"Bucket": self.bucket_name, "Key": source_path}, + ) + return True + except ClientError: + return False diff --git a/api/services/workflow/duplicate.py b/api/services/workflow/duplicate.py new file mode 100644 index 0000000..7f530c6 --- /dev/null +++ b/api/services/workflow/duplicate.py @@ -0,0 +1,234 @@ +"""Service for duplicating workflows including recordings.""" + +import copy +import json +import posixpath +import uuid + +from loguru import logger + +from api.db import db_client +from api.db.workflow_recording_client import generate_short_id +from api.enums import StorageBackend +from api.services.storage import get_storage_for_backend, storage_fs + + +def _extract_trigger_paths(workflow_definition: dict) -> list[str]: + """Extract trigger UUIDs from workflow definition.""" + if not workflow_definition: + return [] + nodes = workflow_definition.get("nodes", []) + trigger_paths = [] + for node in nodes: + if node.get("type") == "trigger": + trigger_path = node.get("data", {}).get("trigger_path") + if trigger_path: + trigger_paths.append(trigger_path) + return trigger_paths + + +def _regenerate_trigger_uuids(workflow_definition: dict) -> dict: + """Regenerate UUIDs for all trigger nodes to avoid conflicts.""" + if not workflow_definition: + return workflow_definition + updated_definition = copy.deepcopy(workflow_definition) + nodes = updated_definition.get("nodes", []) + for node in nodes: + if node.get("type") == "trigger": + if "data" not in node: + node["data"] = {} + node["data"]["trigger_path"] = str(uuid.uuid4()) + return updated_definition + + +async def _generate_unique_recording_id() -> str: + """Generate a globally unique short recording ID.""" + for _ in range(10): + rid = generate_short_id(8) + exists = await db_client.check_recording_id_exists(rid) + if not exists: + return rid + raise RuntimeError("Failed to generate unique recording ID") + + +async def duplicate_workflow( + workflow_id: int, + organization_id: int, + user_id: int, +): + """Duplicate a workflow including its definition, config, recordings, and triggers. + + Args: + workflow_id: The source workflow ID to duplicate + organization_id: The organization ID + user_id: The user performing the duplication + + Returns: + The newly created workflow DB object + + Raises: + ValueError: If the source workflow is not found + """ + # 1. Fetch source workflow + source = await db_client.get_workflow(workflow_id, organization_id=organization_id) + if source is None: + raise ValueError(f"Workflow with id {workflow_id} not found") + + workflow_definition = copy.deepcopy(source.workflow_definition_with_fallback) + + # 2. Regenerate trigger UUIDs to avoid conflicts + if workflow_definition: + workflow_definition = _regenerate_trigger_uuids(workflow_definition) + + # 3. Create the new workflow + new_name = f"{source.name} - Duplicate" + new_workflow = await db_client.create_workflow( + name=new_name, + workflow_definition=workflow_definition, + user_id=user_id, + organization_id=organization_id, + ) + + # 4. Copy template_context_variables and workflow_configurations + has_extra_fields = ( + source.template_context_variables or source.workflow_configurations + ) + if has_extra_fields: + new_workflow = await db_client.update_workflow( + workflow_id=new_workflow.id, + name=None, + workflow_definition=None, + template_context_variables=copy.deepcopy(source.template_context_variables), + workflow_configurations=copy.deepcopy(source.workflow_configurations), + organization_id=organization_id, + ) + + # 5. Copy recordings with new IDs and storage paths scoped to new workflow + recording_id_map = await _duplicate_recordings( + source_workflow_id=workflow_id, + new_workflow_id=new_workflow.id, + organization_id=organization_id, + user_id=user_id, + ) + + # 6. Replace old recording IDs with new ones in the workflow definition + if recording_id_map: + workflow_definition = _replace_recording_ids( + workflow_definition, recording_id_map + ) + new_workflow = await db_client.update_workflow( + workflow_id=new_workflow.id, + name=None, + workflow_definition=workflow_definition, + template_context_variables=None, + workflow_configurations=None, + organization_id=organization_id, + ) + + # 7. Sync triggers for the new workflow + if workflow_definition: + trigger_paths = _extract_trigger_paths(workflow_definition) + if trigger_paths: + await db_client.sync_triggers_for_workflow( + workflow_id=new_workflow.id, + organization_id=organization_id, + trigger_paths=trigger_paths, + ) + + return new_workflow + + +async def _duplicate_recordings( + source_workflow_id: int, + new_workflow_id: int, + organization_id: int, + user_id: int, +) -> dict[str, str]: + """Duplicate all recordings for a workflow. + + Copies each recording file to a new storage path scoped under the new + workflow ID, and creates new DB records pointing to the copied files. + + Returns: + Mapping of old_recording_id -> new_recording_id + """ + recordings = await db_client.get_recordings_for_workflow( + workflow_id=source_workflow_id, + organization_id=organization_id, + ) + + if not recordings: + return {} + + recording_id_map: dict[str, str] = {} + + for rec in recordings: + try: + new_recording_id = await _generate_unique_recording_id() + + # Build new storage key: recordings/{org_id}/{new_workflow_id}/{new_recording_id}/{filename} + filename = posixpath.basename(rec.storage_key) + new_storage_key = ( + f"recordings/{organization_id}" + f"/{new_workflow_id}/{new_recording_id}" + f"/{filename}" + ) + + # Copy the file in storage (server-side copy) + fs = _get_storage_for_recording(rec.storage_backend) + copied = await fs.acopy_file(rec.storage_key, new_storage_key) + if not copied: + logger.warning( + f"Failed to copy recording file {rec.recording_id}, skipping" + ) + continue + + await db_client.create_recording( + recording_id=new_recording_id, + workflow_id=new_workflow_id, + organization_id=organization_id, + tts_provider=rec.tts_provider, + tts_model=rec.tts_model, + tts_voice_id=rec.tts_voice_id, + transcript=rec.transcript, + storage_key=new_storage_key, + storage_backend=rec.storage_backend, + created_by=user_id, + metadata=copy.deepcopy(rec.recording_metadata), + ) + + recording_id_map[rec.recording_id] = new_recording_id + logger.info( + f"Duplicated recording {rec.recording_id} -> {new_recording_id}" + ) + + except Exception as e: + logger.error(f"Error duplicating recording {rec.recording_id}: {e}") + continue + + return recording_id_map + + +def _replace_recording_ids( + workflow_definition: dict, + recording_id_map: dict[str, str], +) -> dict: + """Replace old recording IDs with new ones throughout the workflow definition. + + Uses JSON serialization to do a thorough find-and-replace across all + nested fields (node prompts, data, etc.). + """ + definition_str = json.dumps(workflow_definition) + + for old_id, new_id in recording_id_map.items(): + definition_str = definition_str.replace(old_id, new_id) + + return json.loads(definition_str) + + +def _get_storage_for_recording(storage_backend: str): + """Get the appropriate storage filesystem for a recording's backend.""" + current_backend = StorageBackend.get_current_backend() + if storage_backend == current_backend.value: + return storage_fs + return get_storage_for_backend(storage_backend) diff --git a/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx b/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx index 9c1ec09..988683b 100644 --- a/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx +++ b/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx @@ -1,10 +1,12 @@ "use client"; import { ReactFlowInstance } from "@xyflow/react"; -import { AlertCircle, ArrowLeft, ChevronDown, Download, History, LoaderCircle, MoreVertical, Phone } from "lucide-react"; +import { AlertCircle, ArrowLeft, ChevronDown, Copy, Download, History, LoaderCircle, MoreVertical, Phone } from "lucide-react"; import { useRouter } from "next/navigation"; import { useState } from "react"; +import { toast } from "sonner"; +import { duplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePost } from "@/client/sdk.gen"; import { WorkflowError } from "@/client/types.gen"; import { FlowEdge, FlowNode } from "@/components/flow/types"; import { Button } from "@/components/ui/button"; @@ -45,6 +47,7 @@ export const WorkflowEditorHeader = ({ }: WorkflowEditorHeaderProps) => { const router = useRouter(); const [savingWorkflow, setSavingWorkflow] = useState(false); + const [duplicating, setDuplicating] = useState(false); const hasValidationErrors = workflowValidationErrors.length > 0; const isCallDisabled = isDirty || hasValidationErrors; @@ -59,6 +62,27 @@ export const WorkflowEditorHeader = ({ router.push("/workflow"); }; + const handleDuplicate = async () => { + if (duplicating) return; + setDuplicating(true); + const promise = duplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePost({ + path: { workflow_id: workflowId }, + }); + toast.promise(promise, { + loading: "Duplicating workflow...", + success: "Workflow duplicated successfully", + error: "Failed to duplicate workflow", + }); + try { + const { data } = await promise; + if (data?.id) { + router.push(`/workflow/${data.id}`); + } + } finally { + setDuplicating(false); + } + }; + const handleDownloadWorkflow = () => { if (!rfInstance.current) return; @@ -224,6 +248,18 @@ export const WorkflowEditorHeader = ({ View Runs + + {duplicating ? ( + + ) : ( + + )} + {duplicating ? "Duplicating..." : "Duplicate Workflow"} + = ClientOptions & { /** @@ -378,6 +378,17 @@ export const updateWorkflowApiV1WorkflowWorkflowIdPut = (options: Options) => { + return (options.client ?? _heyApiClient).post({ + url: '/api/v1/workflow/{workflow_id}/duplicate', + ...options + }); +}; + /** * Get Workflow Runs * Get workflow runs with optional filtering and sorting. diff --git a/ui/src/client/types.gen.ts b/ui/src/client/types.gen.ts index d538f11..ef287f0 100644 --- a/ui/src/client/types.gen.ts +++ b/ui/src/client/types.gen.ts @@ -2278,6 +2278,41 @@ export type UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses = { export type UpdateWorkflowApiV1WorkflowWorkflowIdPutResponse = UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses[keyof UpdateWorkflowApiV1WorkflowWorkflowIdPutResponses]; +export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostData = { + body?: never; + headers?: { + authorization?: string | null; + 'X-API-Key'?: string | null; + }; + path: { + workflow_id: number; + }; + query?: never; + url: '/api/v1/workflow/{workflow_id}/duplicate'; +}; + +export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors = { + /** + * Not found + */ + 404: unknown; + /** + * Validation Error + */ + 422: HttpValidationError; +}; + +export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostError = DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors[keyof DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostErrors]; + +export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses = { + /** + * Successful Response + */ + 200: WorkflowResponse; +}; + +export type DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponse = DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses[keyof DuplicateWorkflowEndpointApiV1WorkflowWorkflowIdDuplicatePostResponses]; + export type GetWorkflowRunsApiV1WorkflowWorkflowIdRunsGetData = { body?: never; headers?: {