feat: campaign create error on missing template variables

This commit is contained in:
Abhishek Kumar 2026-03-19 17:52:04 +05:30
parent d8942dffb1
commit e513e563ee
6 changed files with 165 additions and 7 deletions

View file

@ -19,6 +19,7 @@ from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
from api.services.auth.depends import get_user
from api.services.campaign.runner import campaign_runner_service
from api.services.campaign.source_sync import CampaignSourceSyncService
from api.services.campaign.source_sync_factory import get_sync_service
from api.services.quota_service import check_dograh_quota
from api.services.storage import storage_fs
@ -291,6 +292,41 @@ async def create_campaign(
if not validation_result.is_valid:
raise HTTPException(status_code=400, detail=validation_result.error.message)
# Validate template variables against source data columns
workflow = await db_client.get_workflow_by_id(request.workflow_id)
if workflow:
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.workflow import WorkflowGraph
workflow_def = workflow.workflow_definition_with_fallback
if workflow_def:
try:
dto = ReactFlowDTO(**workflow_def)
graph = WorkflowGraph(dto)
required_vars = graph.get_required_template_variables()
if (
required_vars
and validation_result.headers
and validation_result.rows
):
template_validation = (
CampaignSourceSyncService.validate_template_columns(
validation_result.headers,
validation_result.rows,
required_vars,
)
)
if not template_validation.is_valid:
raise HTTPException(
status_code=400,
detail=template_validation.error.message,
)
except HTTPException:
raise
except Exception:
pass # Don't block campaign creation if template extraction fails
if request.max_concurrency is not None:
await _validate_max_concurrency(
request.max_concurrency, user.selected_organization_id

View file

@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
from loguru import logger
@ -19,6 +19,8 @@ class ValidationResult:
is_valid: bool
error: Optional[ValidationError] = None
headers: Optional[List[str]] = field(default=None, repr=False)
rows: Optional[List[List[str]]] = field(default=None, repr=False)
class CampaignSourceSyncService(ABC):
@ -113,6 +115,53 @@ class CampaignSourceSyncService(ABC):
),
)
return ValidationResult(is_valid=True, headers=normalized_headers, rows=rows)
@staticmethod
def validate_template_columns(
headers: List[str],
rows: List[List[str]],
required_columns: Set[str],
) -> ValidationResult:
"""Validate that template variable columns exist and are non-empty in all rows."""
normalized_headers = CampaignSourceSyncService.normalize_headers(headers)
# Check for missing columns
missing = required_columns - set(normalized_headers)
if missing:
missing_str = ", ".join(f"'{c}'" for c in sorted(missing))
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Workflow uses template variables that are missing from the source data: {missing_str}. "
"Add the missing columns or remove them from the workflow."
),
)
# Check for empty values in required columns
col_indices = {col: normalized_headers.index(col) for col in required_columns}
for col, idx in col_indices.items():
empty_rows = []
for row_idx, row in enumerate(rows, start=2):
if len(row) <= idx or not row[idx].strip():
empty_rows.append(row_idx)
if empty_rows:
if len(empty_rows) > 5:
rows_str = f"{', '.join(map(str, empty_rows[:5]))} and {len(empty_rows) - 5} more"
else:
rows_str = ", ".join(map(str, empty_rows))
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Template variable '{col}' is empty in rows: {rows_str}. "
"All template variables used in the workflow must have values in every row.",
invalid_rows=empty_rows,
),
)
return ValidationResult(is_valid=True)
@abstractmethod

View file

@ -1,10 +1,40 @@
import re
from collections import Counter
from typing import Dict, List
from typing import Dict, List, Set
from api.services.workflow.dto import EdgeDataDTO, NodeDataDTO, NodeType, ReactFlowDTO
from api.services.workflow.errors import ItemKind, WorkflowError
# Regex for matching {{ variable }} template placeholders.
# Captures: group(1) = variable path, group(2) = filter name, group(3) = filter value.
# Shared with api.utils.template_renderer via import.
TEMPLATE_VAR_PATTERN = r"\{\{\s*([^|\s}]+)(?:\s*\|\s*([^:}]+)(?::([^}]+))?)?\s*\}\}"
# Variables injected by the system at runtime, not from source data.
_SYSTEM_VARIABLES = {"campaign_id", "provider", "source_uuid"}
def extract_template_variables(text: str) -> Set[str]:
"""Extract template variable names from a string, excluding nested paths,
variables with a fallback filter, and system-injected variables."""
variables: Set[str] = set()
for match in re.finditer(TEMPLATE_VAR_PATTERN, text):
var_name = match.group(1).strip()
filter_name = match.group(2).strip() if match.group(2) else None
# Skip nested paths (runtime-resolved, e.g. gathered_context.city)
if "." in var_name:
continue
# Skip variables with a fallback (they have a default value)
if filter_name == "fallback":
continue
# Skip system-injected variables
if var_name in _SYSTEM_VARIABLES:
continue
variables.add(var_name)
return variables
class Edge:
def __init__(self, source: str, target: str, data: EdgeDataDTO):
@ -99,6 +129,44 @@ class WorkflowGraph:
except IndexError:
self.global_node_id = None
# -----------------------------------------------------------
# template variable extraction
# -----------------------------------------------------------
def get_required_template_variables(self) -> Set[str]:
"""Extract all template variables referenced in node prompts/greetings
and edge transition speeches.
Scans:
- Start node: prompt, greeting
- Agent / End / Global nodes: prompt
- All edges: transition_speech
Returns a set of top-level variable names that the workflow expects
from the source data (excluding nested paths, fallback vars, and
system-injected vars).
"""
variables: Set[str] = set()
for node in self.nodes.values():
if node.node_type in (
NodeType.startNode,
NodeType.agentNode,
NodeType.endNode,
NodeType.globalNode,
):
if node.prompt:
variables |= extract_template_variables(node.prompt)
# greeting is only relevant on the start node
if node.node_type == NodeType.startNode and node.greeting:
variables |= extract_template_variables(node.greeting)
for edge in self.edges:
if edge.transition_speech:
variables |= extract_template_variables(edge.transition_speech)
return variables
# -----------------------------------------------------------
# validators
# -----------------------------------------------------------

View file

@ -4,6 +4,8 @@ import json
import re
from typing import Any, Dict, Union
from api.services.workflow.workflow import TEMPLATE_VAR_PATTERN
def get_nested_value(obj: Any, path: str) -> Any:
"""
@ -97,9 +99,6 @@ def _render_string(template_str: str, context: Dict[str, Any]) -> str:
if not template_str:
return template_str
# Pattern: {{ path }} or {{ path | filter }} or {{ path | filter:default }}
pattern = r"\{\{\s*([^|\s}]+)(?:\s*\|\s*([^:}]+)(?::([^}]+))?)?\s*\}\}"
def _replace(match: re.Match[str]) -> str: # type: ignore[type-arg]
variable_path = match.group(1).strip()
filter_name = match.group(2).strip() if match.group(2) else None
@ -123,7 +122,7 @@ def _render_string(template_str: str, context: Dict[str, Any]) -> str:
return str(value)
# Replace template variables
result = re.sub(pattern, _replace, template_str)
result = re.sub(TEMPLATE_VAR_PATTERN, _replace, template_str)
# Handle line breaks (convert literal \n to actual newlines)
result = result.replace("\\n", "\n")

View file

@ -298,12 +298,14 @@ export default function NewCampaignPage() {
// Handle sheet selection
const handleSheetSelected = (sheetUrl: string) => {
setSourceId(sheetUrl);
setCreateError(null);
};
// Handle CSV file upload
const handleFileUploaded = (fileKey: string, fileName: string) => {
setSourceId(fileKey);
setSelectedFileName(fileName);
setCreateError(null);
};
return (

View file

@ -72,6 +72,7 @@ export function WorkflowTable({ workflows, showArchived }: WorkflowTableProps) {
<Table>
<TableHeader>
<TableRow>
<TableHead className="font-semibold">ID</TableHead>
<TableHead className="font-semibold">Agent Name</TableHead>
<TableHead className="font-semibold">Created At</TableHead>
<TableHead className="font-semibold text-center">Total Runs</TableHead>
@ -84,6 +85,9 @@ export function WorkflowTable({ workflows, showArchived }: WorkflowTableProps) {
key={workflow.id}
className={`hover:bg-accent transition-colors ${showArchived ? 'opacity-60' : ''}`}
>
<TableCell className="text-muted-foreground">
{workflow.id}
</TableCell>
<TableCell className="font-medium">
{workflow.name}
</TableCell>