dograh/api/tests/test_workflow_routes.py
Abhishek Kumar 4f2a629340 Initial Commit 🚀 🚀
2025-09-09 14:37:32 +05:30

667 lines
25 KiB
Python

"""
Tests for workflow API routes.
This module tests the create, update, get, and validate workflow endpoints.
The fixtures for database setup, test client, and utilities are in conftest.py.
"""
import pytest
from fastapi import status
@pytest.fixture
def sample_workflow_definition():
"""Sample workflow definition for testing."""
return {
"nodes": [
{
"id": "6581",
"type": "startCall",
"position": {"x": 427, "y": 23},
"data": {
"prompt": "Hello, I am Abhishek from Dograh. ",
"is_static": True,
"name": "Start Call",
"is_start": True,
"invalid": False,
"validationMessage": None,
"allow_interrupt": False,
},
"measured": {"width": 300, "height": 100},
"selected": True,
"dragging": False,
},
{
"id": "915",
"type": "agentNode",
"position": {"x": 305, "y": 340},
"data": {
"prompt": "You are a voice agent whose mode of speaking is voice. Ask the user whether they want to talk to a sales guy or a customer service agent.",
"name": "Agent",
"invalid": False,
"validationMessage": None,
"allow_interrupt": False,
},
"measured": {"width": 300, "height": 100},
"selected": False,
"dragging": False,
},
{
"id": "7598",
"type": "agentNode",
"position": {"x": 90, "y": 650},
"data": {
"prompt": "You are a customer service agent whose mode of communication with the user is voice. Tell them that someone from our team will reach out to them soon",
"name": "Agent",
"invalid": False,
"validationMessage": None,
"allow_interrupt": True,
},
"measured": {"width": 300, "height": 100},
"selected": False,
"dragging": False,
},
{
"id": "6919",
"type": "agentNode",
"position": {"x": 520, "y": 650},
"data": {
"prompt": "You are a sales representative whose mode of communication with the user is voice. Tell the user that someone from our team will reach out to you soon",
"name": "Agent",
"invalid": False,
"validationMessage": None,
"allow_interrupt": True,
},
"measured": {"width": 300, "height": 100},
"selected": False,
"dragging": False,
},
{
"id": "1802",
"type": "endCall",
"position": {"x": 305, "y": 960},
"data": {
"prompt": "Thank you!",
"invalid": False,
"validationMessage": None,
"is_static": True,
"name": "End Call",
"is_end": True,
"allow_interrupt": False,
},
"measured": {"width": 300, "height": 100},
"selected": False,
"dragging": False,
},
],
"edges": [
{
"animated": True,
"type": "custom",
"source": "915",
"target": "7598",
"id": "xy-edge__915-7598",
"selected": False,
"data": {
"condition": "The customer wants to talk to a customer service agent",
"label": "customer service agent",
"invalid": False,
"validationMessage": None,
},
},
{
"animated": True,
"type": "custom",
"source": "915",
"target": "6919",
"id": "xy-edge__915-6919",
"selected": False,
"data": {
"condition": "customer wants to talk to a sales representative",
"label": "sales representative",
"invalid": False,
"validationMessage": None,
},
},
{
"animated": True,
"type": "custom",
"source": "6581",
"target": "915",
"id": "xy-edge__6581-915",
"selected": False,
"data": {
"condition": "Always take this route",
"label": "Always take this route",
"invalid": False,
"validationMessage": None,
},
},
{
"animated": True,
"type": "custom",
"source": "7598",
"target": "1802",
"id": "xy-edge__7598-1802",
"selected": False,
"data": {
"condition": "end call",
"label": "end call",
"invalid": False,
"validationMessage": None,
},
},
{
"animated": True,
"type": "custom",
"source": "6919",
"target": "1802",
"id": "xy-edge__6919-1802",
"selected": False,
"data": {
"condition": "end call",
"label": "end call",
"invalid": False,
"validationMessage": None,
},
},
],
"viewport": {"x": 0, "y": 0, "zoom": 1},
}
class TestCreateWorkflow:
"""Test cases for creating workflows."""
async def test_create_workflow_success(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test successful workflow creation."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_create_success"
)
request_data = {
"name": "Test Workflow",
"workflow_definition": sample_workflow_definition,
}
async with test_client_factory(test_user) as client:
response = await client.post("/api/v1/workflow/create", json=request_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "id" in data
assert data["name"] == "Test Workflow"
assert data["workflow_definition"] == sample_workflow_definition
assert "created_at" in data
assert "current_definition_id" in data
async def test_create_workflow_invalid_definition(
self, test_client_factory, db_session
):
"""Test workflow creation with invalid definition."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_invalid_def"
)
request_data = {
"name": "Invalid Workflow",
"workflow_definition": {"invalid": "structure"},
}
async with test_client_factory(test_user) as client:
response = await client.post("/api/v1/workflow/create", json=request_data)
# The API should still create the workflow even with invalid definition
# Validation happens in the validate endpoint
assert response.status_code == status.HTTP_200_OK
@pytest.mark.asyncio
async def test_create_workflow_missing_name(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test workflow creation without name."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_missing_name"
)
request_data = {"workflow_definition": sample_workflow_definition}
async with test_client_factory(test_user) as client:
response = await client.post("/api/v1/workflow/create", json=request_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_create_workflow_missing_definition(
self, test_client_factory, db_session
):
"""Test workflow creation without workflow definition."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_missing_definition"
)
request_data = {"name": "Test Workflow"}
async with test_client_factory(test_user) as client:
response = await client.post("/api/v1/workflow/create", json=request_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
class TestGetWorkflows:
"""Test cases for fetching workflows."""
@pytest.mark.asyncio
async def test_get_all_workflows_empty(self, test_client_factory, db_session):
"""Test getting all workflows when none exist."""
# Create a test user within the test function
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_empty_workflows"
)
# Create a test client for this specific user
async with test_client_factory(test_user) as client:
response = await client.get("/api/v1/workflow/fetch")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert isinstance(data, list)
assert len(data) == 0
@pytest.mark.asyncio
async def test_get_all_workflows_with_data(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test getting all workflows when some exist."""
# Create a test user within the test function
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_with_workflows"
)
# Create a test client for this specific user
async with test_client_factory(test_user) as client:
# Create a workflow first
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Test Workflow 1",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
# Create another workflow
create_response2 = await client.post(
"/api/v1/workflow/create",
json={
"name": "Test Workflow 2",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response2.status_code == status.HTTP_200_OK
# Get all workflows
response = await client.get("/api/v1/workflow/fetch")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert isinstance(data, list)
assert len(data) == 2
# Check that both workflows are returned
workflow_names = [w["name"] for w in data]
assert "Test Workflow 1" in workflow_names
assert "Test Workflow 2" in workflow_names
@pytest.mark.asyncio
async def test_get_specific_workflow(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test getting a specific workflow by ID."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_specific_workflow"
)
async with test_client_factory(test_user) as client:
# Create a workflow first
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Specific Workflow",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
created_workflow = create_response.json()
workflow_id = created_workflow["id"]
# Get the specific workflow
response = await client.get(
f"/api/v1/workflow/fetch?workflow_id={workflow_id}"
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == workflow_id
assert data["name"] == "Specific Workflow"
assert data["workflow_definition"] == sample_workflow_definition
@pytest.mark.asyncio
async def test_get_nonexistent_workflow(self, test_client_factory, db_session):
"""Test getting a workflow that doesn't exist."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_nonexistent"
)
async with test_client_factory(test_user) as client:
response = await client.get("/api/v1/workflow/fetch?workflow_id=99999")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "not found" in response.json()["detail"].lower()
class TestUpdateWorkflow:
"""Test cases for updating workflows."""
@pytest.mark.asyncio
async def test_update_workflow_name_only(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test updating only the workflow name."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_update_name"
)
async with test_client_factory(test_user) as client:
# Create a workflow first
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Original Name",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
workflow_id = create_response.json()["id"]
# Update the workflow name
update_data = {"name": "Updated Name"}
response = await client.put(
f"/api/v1/workflow/{workflow_id}", json=update_data
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == workflow_id
assert data["name"] == "Updated Name"
assert (
data["workflow_definition"] == sample_workflow_definition
) # Should remain unchanged
@pytest.mark.asyncio
async def test_update_workflow_name_and_definition(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test updating both workflow name and definition."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_update_both"
)
async with test_client_factory(test_user) as client:
# Create a workflow first
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Original Name",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
workflow_id = create_response.json()["id"]
# Create new workflow definition
new_definition = {
"nodes": [
{
"id": "start",
"type": "start",
"position": {"x": 50, "y": 50},
"data": {"label": "New Start"},
}
],
"edges": [],
}
# Update the workflow
update_data = {
"name": "Updated Name",
"workflow_definition": new_definition,
}
response = await client.put(
f"/api/v1/workflow/{workflow_id}", json=update_data
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == workflow_id
assert data["name"] == "Updated Name"
assert data["workflow_definition"] == new_definition
@pytest.mark.asyncio
async def test_update_nonexistent_workflow(self, test_client_factory, db_session):
"""Test updating a workflow that doesn't exist."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_update_nonexistent"
)
update_data = {"name": "Updated Name"}
async with test_client_factory(test_user) as client:
response = await client.put("/api/v1/workflow/99999", json=update_data)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "not found" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_update_workflow_missing_name(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test updating a workflow without providing a name."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_update_missing_name"
)
async with test_client_factory(test_user) as client:
# Create a workflow first
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Original Name",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
workflow_id = create_response.json()["id"]
# Try to update without providing name
update_data = {"workflow_definition": sample_workflow_definition}
response = await client.put(
f"/api/v1/workflow/{workflow_id}", json=update_data
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
class TestWorkflowValidation:
"""Test cases for workflow validation endpoint."""
@pytest.mark.asyncio
async def test_validate_workflow_success(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test successful workflow validation."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_validate_success"
)
async with test_client_factory(test_user) as client:
# Create a workflow first
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Valid Workflow",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
workflow_id = create_response.json()["id"]
# Validate the workflow
response = await client.post(f"/api/v1/workflow/{workflow_id}/validate")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["is_valid"] is True
assert data["errors"] == []
@pytest.mark.asyncio
async def test_validate_nonexistent_workflow(self, test_client_factory, db_session):
"""Test validating a workflow that doesn't exist."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_validate_nonexistent"
)
async with test_client_factory(test_user) as client:
response = await client.post("/api/v1/workflow/99999/validate")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "not found" in response.json()["detail"].lower()
class TestWorkflowIntegration:
"""Integration tests for workflow operations."""
@pytest.mark.asyncio
async def test_full_workflow_lifecycle(
self, test_client_factory, db_session, sample_workflow_definition
):
"""Test the complete lifecycle of a workflow: create, get, update, validate."""
# Create a test user for this test
test_user = await db_session.get_or_create_user_by_provider_id(
"test_user_lifecycle"
)
async with test_client_factory(test_user) as client:
# 1. Create workflow
create_response = await client.post(
"/api/v1/workflow/create",
json={
"name": "Lifecycle Test Workflow",
"workflow_definition": sample_workflow_definition,
},
)
assert create_response.status_code == status.HTTP_200_OK
workflow_id = create_response.json()["id"]
# 2. Get the created workflow
get_response = await client.get(
f"/api/v1/workflow/fetch?workflow_id={workflow_id}"
)
assert get_response.status_code == status.HTTP_200_OK
workflow_data = get_response.json()
assert workflow_data["name"] == "Lifecycle Test Workflow"
# 3. Add a new node in the workflow definition
new_node = {
"id": "6919_new",
"type": "agentNode",
"position": {"x": 520, "y": 650},
"data": {
"prompt": "Something new",
"name": "Agent",
"invalid": False,
"validationMessage": None,
"allow_interrupt": True,
},
"measured": {"width": 300, "height": 100},
"selected": False,
"dragging": False,
}
new_edges = [
{
"source": "6919",
"target": "6919_new",
"id": "xy-edge__6919-6919_new",
"data": {
"condition": "Always take this route",
"label": "Always take this route",
"invalid": False,
"validationMessage": None,
},
},
{
"source": "6919_new",
"target": "1802",
"id": "xy-edge__6919_new-1802",
"data": {
"condition": "Always take this route",
"label": "Always take this route",
"invalid": False,
"validationMessage": None,
},
},
]
new_definition = {
"nodes": [
*sample_workflow_definition["nodes"],
new_node,
],
"edges": [
*sample_workflow_definition["edges"],
*new_edges,
],
}
update_response = await client.put(
f"/api/v1/workflow/{workflow_id}",
json={
"name": "Updated Lifecycle Workflow",
"workflow_definition": new_definition,
},
)
assert update_response.status_code == status.HTTP_200_OK
assert update_response.json()["name"] == "Updated Lifecycle Workflow"
# 4. Validate the updated workflow
validate_response = await client.post(
f"/api/v1/workflow/{workflow_id}/validate"
)
assert validate_response.status_code == status.HTTP_200_OK
assert validate_response.json()["is_valid"] is True
# 5. Verify the update by getting the workflow again
final_get_response = await client.get(
f"/api/v1/workflow/fetch?workflow_id={workflow_id}"
)
assert final_get_response.status_code == status.HTTP_200_OK
final_data = final_get_response.json()
assert final_data["name"] == "Updated Lifecycle Workflow"
assert final_data["workflow_definition"] == new_definition