From e513e563ee24fdb665c41e764cb2d1b7d408718e Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Thu, 19 Mar 2026 17:52:04 +0530 Subject: [PATCH] feat: campaign create error on missing template variables --- api/routes/campaign.py | 36 ++++++++++ api/services/campaign/source_sync.py | 53 ++++++++++++++- api/services/workflow/workflow.py | 70 +++++++++++++++++++- api/utils/template_renderer.py | 7 +- ui/src/app/campaigns/new/page.tsx | 2 + ui/src/components/workflow/WorkflowTable.tsx | 4 ++ 6 files changed, 165 insertions(+), 7 deletions(-) diff --git a/api/routes/campaign.py b/api/routes/campaign.py index 0ff3252..8f8abe2 100644 --- a/api/routes/campaign.py +++ b/api/routes/campaign.py @@ -19,6 +19,7 @@ from api.db.models import UserModel from api.enums import OrganizationConfigurationKey from api.services.auth.depends import get_user from api.services.campaign.runner import campaign_runner_service +from api.services.campaign.source_sync import CampaignSourceSyncService from api.services.campaign.source_sync_factory import get_sync_service from api.services.quota_service import check_dograh_quota from api.services.storage import storage_fs @@ -291,6 +292,41 @@ async def create_campaign( if not validation_result.is_valid: raise HTTPException(status_code=400, detail=validation_result.error.message) + # Validate template variables against source data columns + workflow = await db_client.get_workflow_by_id(request.workflow_id) + if workflow: + from api.services.workflow.dto import ReactFlowDTO + from api.services.workflow.workflow import WorkflowGraph + + workflow_def = workflow.workflow_definition_with_fallback + if workflow_def: + try: + dto = ReactFlowDTO(**workflow_def) + graph = WorkflowGraph(dto) + required_vars = graph.get_required_template_variables() + + if ( + required_vars + and validation_result.headers + and validation_result.rows + ): + template_validation = ( + CampaignSourceSyncService.validate_template_columns( + validation_result.headers, + validation_result.rows, + required_vars, + ) + ) + if not template_validation.is_valid: + raise HTTPException( + status_code=400, + detail=template_validation.error.message, + ) + except HTTPException: + raise + except Exception: + pass # Don't block campaign creation if template extraction fails + if request.max_concurrency is not None: await _validate_max_concurrency( request.max_concurrency, user.selected_organization_id diff --git a/api/services/campaign/source_sync.py b/api/services/campaign/source_sync.py index cbdfdfc..5393b65 100644 --- a/api/services/campaign/source_sync.py +++ b/api/services/campaign/source_sync.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Set from loguru import logger @@ -19,6 +19,8 @@ class ValidationResult: is_valid: bool error: Optional[ValidationError] = None + headers: Optional[List[str]] = field(default=None, repr=False) + rows: Optional[List[List[str]]] = field(default=None, repr=False) class CampaignSourceSyncService(ABC): @@ -113,6 +115,53 @@ class CampaignSourceSyncService(ABC): ), ) + return ValidationResult(is_valid=True, headers=normalized_headers, rows=rows) + + @staticmethod + def validate_template_columns( + headers: List[str], + rows: List[List[str]], + required_columns: Set[str], + ) -> ValidationResult: + """Validate that template variable columns exist and are non-empty in all rows.""" + normalized_headers = CampaignSourceSyncService.normalize_headers(headers) + + # Check for missing columns + missing = required_columns - set(normalized_headers) + if missing: + missing_str = ", ".join(f"'{c}'" for c in sorted(missing)) + return ValidationResult( + is_valid=False, + error=ValidationError( + message=f"Workflow uses template variables that are missing from the source data: {missing_str}. " + "Add the missing columns or remove them from the workflow." + ), + ) + + # Check for empty values in required columns + col_indices = {col: normalized_headers.index(col) for col in required_columns} + + for col, idx in col_indices.items(): + empty_rows = [] + for row_idx, row in enumerate(rows, start=2): + if len(row) <= idx or not row[idx].strip(): + empty_rows.append(row_idx) + + if empty_rows: + if len(empty_rows) > 5: + rows_str = f"{', '.join(map(str, empty_rows[:5]))} and {len(empty_rows) - 5} more" + else: + rows_str = ", ".join(map(str, empty_rows)) + + return ValidationResult( + is_valid=False, + error=ValidationError( + message=f"Template variable '{col}' is empty in rows: {rows_str}. " + "All template variables used in the workflow must have values in every row.", + invalid_rows=empty_rows, + ), + ) + return ValidationResult(is_valid=True) @abstractmethod diff --git a/api/services/workflow/workflow.py b/api/services/workflow/workflow.py index c9a559a..b372a46 100644 --- a/api/services/workflow/workflow.py +++ b/api/services/workflow/workflow.py @@ -1,10 +1,40 @@ import re from collections import Counter -from typing import Dict, List +from typing import Dict, List, Set from api.services.workflow.dto import EdgeDataDTO, NodeDataDTO, NodeType, ReactFlowDTO from api.services.workflow.errors import ItemKind, WorkflowError +# Regex for matching {{ variable }} template placeholders. +# Captures: group(1) = variable path, group(2) = filter name, group(3) = filter value. +# Shared with api.utils.template_renderer via import. +TEMPLATE_VAR_PATTERN = r"\{\{\s*([^|\s}]+)(?:\s*\|\s*([^:}]+)(?::([^}]+))?)?\s*\}\}" + +# Variables injected by the system at runtime, not from source data. +_SYSTEM_VARIABLES = {"campaign_id", "provider", "source_uuid"} + + +def extract_template_variables(text: str) -> Set[str]: + """Extract template variable names from a string, excluding nested paths, + variables with a fallback filter, and system-injected variables.""" + variables: Set[str] = set() + for match in re.finditer(TEMPLATE_VAR_PATTERN, text): + var_name = match.group(1).strip() + filter_name = match.group(2).strip() if match.group(2) else None + + # Skip nested paths (runtime-resolved, e.g. gathered_context.city) + if "." in var_name: + continue + # Skip variables with a fallback (they have a default value) + if filter_name == "fallback": + continue + # Skip system-injected variables + if var_name in _SYSTEM_VARIABLES: + continue + + variables.add(var_name) + return variables + class Edge: def __init__(self, source: str, target: str, data: EdgeDataDTO): @@ -99,6 +129,44 @@ class WorkflowGraph: except IndexError: self.global_node_id = None + # ----------------------------------------------------------- + # template variable extraction + # ----------------------------------------------------------- + def get_required_template_variables(self) -> Set[str]: + """Extract all template variables referenced in node prompts/greetings + and edge transition speeches. + + Scans: + - Start node: prompt, greeting + - Agent / End / Global nodes: prompt + - All edges: transition_speech + + Returns a set of top-level variable names that the workflow expects + from the source data (excluding nested paths, fallback vars, and + system-injected vars). + """ + variables: Set[str] = set() + + for node in self.nodes.values(): + if node.node_type in ( + NodeType.startNode, + NodeType.agentNode, + NodeType.endNode, + NodeType.globalNode, + ): + if node.prompt: + variables |= extract_template_variables(node.prompt) + + # greeting is only relevant on the start node + if node.node_type == NodeType.startNode and node.greeting: + variables |= extract_template_variables(node.greeting) + + for edge in self.edges: + if edge.transition_speech: + variables |= extract_template_variables(edge.transition_speech) + + return variables + # ----------------------------------------------------------- # validators # ----------------------------------------------------------- diff --git a/api/utils/template_renderer.py b/api/utils/template_renderer.py index 9e42fd6..982eca9 100644 --- a/api/utils/template_renderer.py +++ b/api/utils/template_renderer.py @@ -4,6 +4,8 @@ import json import re from typing import Any, Dict, Union +from api.services.workflow.workflow import TEMPLATE_VAR_PATTERN + def get_nested_value(obj: Any, path: str) -> Any: """ @@ -97,9 +99,6 @@ def _render_string(template_str: str, context: Dict[str, Any]) -> str: if not template_str: return template_str - # Pattern: {{ path }} or {{ path | filter }} or {{ path | filter:default }} - pattern = r"\{\{\s*([^|\s}]+)(?:\s*\|\s*([^:}]+)(?::([^}]+))?)?\s*\}\}" - def _replace(match: re.Match[str]) -> str: # type: ignore[type-arg] variable_path = match.group(1).strip() filter_name = match.group(2).strip() if match.group(2) else None @@ -123,7 +122,7 @@ def _render_string(template_str: str, context: Dict[str, Any]) -> str: return str(value) # Replace template variables - result = re.sub(pattern, _replace, template_str) + result = re.sub(TEMPLATE_VAR_PATTERN, _replace, template_str) # Handle line breaks (convert literal \n to actual newlines) result = result.replace("\\n", "\n") diff --git a/ui/src/app/campaigns/new/page.tsx b/ui/src/app/campaigns/new/page.tsx index 0d22407..7318a25 100644 --- a/ui/src/app/campaigns/new/page.tsx +++ b/ui/src/app/campaigns/new/page.tsx @@ -298,12 +298,14 @@ export default function NewCampaignPage() { // Handle sheet selection const handleSheetSelected = (sheetUrl: string) => { setSourceId(sheetUrl); + setCreateError(null); }; // Handle CSV file upload const handleFileUploaded = (fileKey: string, fileName: string) => { setSourceId(fileKey); setSelectedFileName(fileName); + setCreateError(null); }; return ( diff --git a/ui/src/components/workflow/WorkflowTable.tsx b/ui/src/components/workflow/WorkflowTable.tsx index 30c8081..1cc7faf 100644 --- a/ui/src/components/workflow/WorkflowTable.tsx +++ b/ui/src/components/workflow/WorkflowTable.tsx @@ -72,6 +72,7 @@ export function WorkflowTable({ workflows, showArchived }: WorkflowTableProps) { + ID Agent Name Created At Total Runs @@ -84,6 +85,9 @@ export function WorkflowTable({ workflows, showArchived }: WorkflowTableProps) { key={workflow.id} className={`hover:bg-accent transition-colors ${showArchived ? 'opacity-60' : ''}`} > + + {workflow.id} + {workflow.name}