mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-25 08:48:13 +02:00
Initial Commit 🚀 🚀
This commit is contained in:
commit
4f2a629340
444 changed files with 76863 additions and 0 deletions
122
api/tests/test_disposition_mapper.py
Normal file
122
api/tests/test_disposition_mapper.py
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from api.services.workflow.disposition_mapper import (
|
||||
apply_disposition_mapping,
|
||||
get_organization_id_from_workflow_run,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_apply_disposition_mapping_with_valid_mapping():
|
||||
"""Test disposition mapping with valid configuration."""
|
||||
with patch("api.services.workflow.disposition_mapper.db_client") as mock_db_client:
|
||||
# Mock disposition mapping configuration
|
||||
mock_db_client.get_configuration_value = AsyncMock(
|
||||
return_value={
|
||||
"XFER": "TRANSFERRED",
|
||||
"ND": "NOT_QUALIFIED",
|
||||
"user_hangup": "HANGUP",
|
||||
}
|
||||
)
|
||||
|
||||
# Test mapping exists
|
||||
result = await apply_disposition_mapping("XFER", 1)
|
||||
assert result == "TRANSFERRED"
|
||||
|
||||
# Test mapping doesn't exist
|
||||
result = await apply_disposition_mapping("UNKNOWN", 1)
|
||||
assert result == "UNKNOWN"
|
||||
|
||||
# Verify db_client was called correctly
|
||||
mock_db_client.get_configuration_value.assert_called_with(
|
||||
1, "DISPOSITION_CODE_MAPPING", default={}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_apply_disposition_mapping_no_organization_id():
|
||||
"""Test disposition mapping with no organization ID."""
|
||||
# Should return original value
|
||||
result = await apply_disposition_mapping("XFER", None)
|
||||
assert result == "XFER"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_apply_disposition_mapping_empty_value():
|
||||
"""Test disposition mapping with empty value."""
|
||||
# Should return original empty value
|
||||
result = await apply_disposition_mapping("", 1)
|
||||
assert result == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_apply_disposition_mapping_error_handling():
|
||||
"""Test disposition mapping handles errors gracefully."""
|
||||
with patch("api.services.workflow.disposition_mapper.db_client") as mock_db_client:
|
||||
# Mock database error
|
||||
mock_db_client.get_configuration_value = AsyncMock(
|
||||
side_effect=Exception("Database error")
|
||||
)
|
||||
|
||||
# Should return original value on error
|
||||
result = await apply_disposition_mapping("XFER", 1)
|
||||
assert result == "XFER"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_organization_id_from_workflow_run():
|
||||
"""Test getting organization ID from workflow run ID."""
|
||||
with patch("api.services.workflow.disposition_mapper.db_client") as mock_db_client:
|
||||
# Mock workflow run with organization
|
||||
mock_workflow_run = MagicMock()
|
||||
mock_workflow_run.workflow.user.selected_organization_id = 123
|
||||
mock_db_client.get_workflow_run_by_id = AsyncMock(
|
||||
return_value=mock_workflow_run
|
||||
)
|
||||
|
||||
result = await get_organization_id_from_workflow_run(1)
|
||||
assert result == 123
|
||||
|
||||
# Verify db_client was called correctly
|
||||
mock_db_client.get_workflow_run_by_id.assert_called_once_with(1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_organization_id_no_workflow_run():
|
||||
"""Test getting organization ID when workflow run doesn't exist."""
|
||||
with patch("api.services.workflow.disposition_mapper.db_client") as mock_db_client:
|
||||
# Mock no workflow run found
|
||||
mock_db_client.get_workflow_run_by_id = AsyncMock(return_value=None)
|
||||
|
||||
result = await get_organization_id_from_workflow_run(1)
|
||||
assert result is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_organization_id_no_user():
|
||||
"""Test getting organization ID when workflow has no user."""
|
||||
with patch("api.services.workflow.disposition_mapper.db_client") as mock_db_client:
|
||||
# Mock workflow run with no user
|
||||
mock_workflow_run = MagicMock()
|
||||
mock_workflow_run.workflow.user = None
|
||||
mock_db_client.get_workflow_run_by_id = AsyncMock(
|
||||
return_value=mock_workflow_run
|
||||
)
|
||||
|
||||
result = await get_organization_id_from_workflow_run(1)
|
||||
assert result is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_organization_id_error_handling():
|
||||
"""Test getting organization ID handles errors gracefully."""
|
||||
with patch("api.services.workflow.disposition_mapper.db_client") as mock_db_client:
|
||||
# Mock database error
|
||||
mock_db_client.get_workflow_run_by_id = AsyncMock(
|
||||
side_effect=Exception("Database error")
|
||||
)
|
||||
|
||||
result = await get_organization_id_from_workflow_run(1)
|
||||
assert result is None
|
||||
Loading…
Add table
Add a link
Reference in a new issue