mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-27 01:16:22 +02:00
MCP auth for the simple case (#557)
* MCP auth token header * Mention limitations * Fix AgentStep schema error by converting argument values to strings. * Added tests for MCP auth and agent step parsing
This commit is contained in:
parent
d9d4c91363
commit
4c3db4dbbe
8 changed files with 1361 additions and 56 deletions
376
tests/unit/test_agent/test_agent_step_arguments.py
Normal file
376
tests/unit/test_agent/test_agent_step_arguments.py
Normal file
|
|
@ -0,0 +1,376 @@
|
|||
"""
|
||||
Unit tests for AgentStep arguments type conversion
|
||||
|
||||
Tests the fix for converting agent tool arguments to strings when creating
|
||||
AgentStep records, ensuring compatibility with Pulsar schema that requires
|
||||
Map(String()) for the arguments field.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, AsyncMock
|
||||
from trustgraph.schema import AgentStep
|
||||
from trustgraph.agent.react.types import Action
|
||||
|
||||
|
||||
class TestAgentStepArgumentsConversion:
|
||||
"""Test cases for AgentStep arguments string conversion"""
|
||||
|
||||
def test_agent_step_with_integer_arguments(self):
|
||||
"""Test that integer arguments are converted to strings"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Set volume to 10",
|
||||
name="set_volume",
|
||||
arguments={"volume_level": 10, "device": "speaker"},
|
||||
observation="Volume set successfully"
|
||||
)
|
||||
|
||||
# Act - simulate the conversion that happens in service.py
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["volume_level"] == "10"
|
||||
assert isinstance(agent_step.arguments["volume_level"], str)
|
||||
assert agent_step.arguments["device"] == "speaker"
|
||||
assert isinstance(agent_step.arguments["device"], str)
|
||||
|
||||
def test_agent_step_with_float_arguments(self):
|
||||
"""Test that float arguments are converted to strings"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Set temperature",
|
||||
name="set_temperature",
|
||||
arguments={"temperature": 23.5, "unit": "celsius"},
|
||||
observation="Temperature set"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["temperature"] == "23.5"
|
||||
assert isinstance(agent_step.arguments["temperature"], str)
|
||||
assert agent_step.arguments["unit"] == "celsius"
|
||||
|
||||
def test_agent_step_with_boolean_arguments(self):
|
||||
"""Test that boolean arguments are converted to strings"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Enable feature",
|
||||
name="toggle_feature",
|
||||
arguments={"enabled": True, "feature_name": "dark_mode"},
|
||||
observation="Feature toggled"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["enabled"] == "True"
|
||||
assert isinstance(agent_step.arguments["enabled"], str)
|
||||
assert agent_step.arguments["feature_name"] == "dark_mode"
|
||||
|
||||
def test_agent_step_with_none_arguments(self):
|
||||
"""Test that None arguments are converted to strings"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Check status",
|
||||
name="get_status",
|
||||
arguments={"filter": None, "category": "all"},
|
||||
observation="Status retrieved"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["filter"] == "None"
|
||||
assert isinstance(agent_step.arguments["filter"], str)
|
||||
assert agent_step.arguments["category"] == "all"
|
||||
|
||||
def test_agent_step_with_mixed_type_arguments(self):
|
||||
"""Test that mixed type arguments are all converted to strings"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Configure device",
|
||||
name="configure_device",
|
||||
arguments={
|
||||
"name": "Hifi",
|
||||
"volume_level": 10,
|
||||
"bass_boost": 1.5,
|
||||
"enabled": True,
|
||||
"preset": None
|
||||
},
|
||||
observation="Device configured"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert - all values should be strings
|
||||
assert all(isinstance(v, str) for v in agent_step.arguments.values())
|
||||
assert agent_step.arguments["name"] == "Hifi"
|
||||
assert agent_step.arguments["volume_level"] == "10"
|
||||
assert agent_step.arguments["bass_boost"] == "1.5"
|
||||
assert agent_step.arguments["enabled"] == "True"
|
||||
assert agent_step.arguments["preset"] == "None"
|
||||
|
||||
def test_agent_step_with_string_arguments(self):
|
||||
"""Test that string arguments remain strings (no double conversion)"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Search for information",
|
||||
name="search",
|
||||
arguments={"query": "test query", "limit": "10"},
|
||||
observation="Search completed"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["query"] == "test query"
|
||||
assert agent_step.arguments["limit"] == "10"
|
||||
assert all(isinstance(v, str) for v in agent_step.arguments.values())
|
||||
|
||||
def test_agent_step_with_empty_arguments(self):
|
||||
"""Test that empty arguments dict works correctly"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Perform action",
|
||||
name="do_something",
|
||||
arguments={},
|
||||
observation="Action completed"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments == {}
|
||||
assert isinstance(agent_step.arguments, dict)
|
||||
|
||||
def test_agent_step_with_numeric_string_values(self):
|
||||
"""Test arguments that are already strings containing numbers"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Process order",
|
||||
name="process_order",
|
||||
arguments={"order_id": "12345", "quantity": 10},
|
||||
observation="Order processed"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["order_id"] == "12345"
|
||||
assert agent_step.arguments["quantity"] == "10"
|
||||
assert all(isinstance(v, str) for v in agent_step.arguments.values())
|
||||
|
||||
def test_agent_step_conversion_preserves_keys(self):
|
||||
"""Test that argument keys are preserved during conversion"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Test",
|
||||
name="test_action",
|
||||
arguments={
|
||||
"param1": 1,
|
||||
"param_two": 2,
|
||||
"PARAM_THREE": 3,
|
||||
"param-four": 4
|
||||
},
|
||||
observation="Done"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert - verify all keys are preserved
|
||||
assert set(agent_step.arguments.keys()) == {
|
||||
"param1", "param_two", "PARAM_THREE", "param-four"
|
||||
}
|
||||
# Verify values are converted
|
||||
assert agent_step.arguments["param1"] == "1"
|
||||
assert agent_step.arguments["param_two"] == "2"
|
||||
assert agent_step.arguments["PARAM_THREE"] == "3"
|
||||
assert agent_step.arguments["param-four"] == "4"
|
||||
|
||||
def test_real_world_home_assistant_example(self):
|
||||
"""Test with real-world Home Assistant volume control example"""
|
||||
# Arrange - this is the exact scenario from the bug report
|
||||
action = Action(
|
||||
thought='The user wants to set the volume of the Hifi. The `set_device_volume` tool can be used for this purpose. The device name is "Hifi" and the desired volume level is 10.',
|
||||
name='set_device_volume',
|
||||
arguments={'name': 'Hifi', 'volume_level': 10},
|
||||
observation='{"speech": {}, "response_type": "action_done", "data": {"targets": [], "success": [{"name": "Hifi", "type": "entity", "id": "media_player.hifi"}], "failed": []}}'
|
||||
)
|
||||
|
||||
# Act - this should not raise TypeError
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["name"] == "Hifi"
|
||||
assert agent_step.arguments["volume_level"] == "10"
|
||||
assert isinstance(agent_step.arguments["volume_level"], str)
|
||||
|
||||
def test_multiple_actions_in_history(self):
|
||||
"""Test converting multiple actions in history (as done in service.py)"""
|
||||
# Arrange
|
||||
history = [
|
||||
Action(
|
||||
thought="First action",
|
||||
name="action1",
|
||||
arguments={"count": 5},
|
||||
observation="Done 1"
|
||||
),
|
||||
Action(
|
||||
thought="Second action",
|
||||
name="action2",
|
||||
arguments={"enabled": True, "name": "test"},
|
||||
observation="Done 2"
|
||||
),
|
||||
Action(
|
||||
thought="Third action",
|
||||
name="action3",
|
||||
arguments={"value": 3.14},
|
||||
observation="Done 3"
|
||||
)
|
||||
]
|
||||
|
||||
# Act - simulate the list comprehension in service.py
|
||||
agent_steps = [
|
||||
AgentStep(
|
||||
thought=h.thought,
|
||||
action=h.name,
|
||||
arguments={k: str(v) for k, v in h.arguments.items()},
|
||||
observation=h.observation
|
||||
)
|
||||
for h in history
|
||||
]
|
||||
|
||||
# Assert
|
||||
assert len(agent_steps) == 3
|
||||
|
||||
# First action
|
||||
assert agent_steps[0].arguments["count"] == "5"
|
||||
assert isinstance(agent_steps[0].arguments["count"], str)
|
||||
|
||||
# Second action
|
||||
assert agent_steps[1].arguments["enabled"] == "True"
|
||||
assert agent_steps[1].arguments["name"] == "test"
|
||||
assert all(isinstance(v, str) for v in agent_steps[1].arguments.values())
|
||||
|
||||
# Third action
|
||||
assert agent_steps[2].arguments["value"] == "3.14"
|
||||
assert isinstance(agent_steps[2].arguments["value"], str)
|
||||
|
||||
def test_arguments_with_special_characters(self):
|
||||
"""Test arguments containing special characters are properly converted"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Process data",
|
||||
name="process",
|
||||
arguments={
|
||||
"text": "Hello, World!",
|
||||
"path": "/home/user/file.txt",
|
||||
"pattern": "test-*-pattern",
|
||||
"count": 42
|
||||
},
|
||||
observation="Processed"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["text"] == "Hello, World!"
|
||||
assert agent_step.arguments["path"] == "/home/user/file.txt"
|
||||
assert agent_step.arguments["pattern"] == "test-*-pattern"
|
||||
assert agent_step.arguments["count"] == "42"
|
||||
assert all(isinstance(v, str) for v in agent_step.arguments.values())
|
||||
|
||||
def test_zero_and_negative_numbers(self):
|
||||
"""Test that zero and negative numbers are converted correctly"""
|
||||
# Arrange
|
||||
action = Action(
|
||||
thought="Test edge cases",
|
||||
name="edge_test",
|
||||
arguments={
|
||||
"zero": 0,
|
||||
"negative": -5,
|
||||
"negative_float": -3.14,
|
||||
"positive": 10
|
||||
},
|
||||
observation="Done"
|
||||
)
|
||||
|
||||
# Act
|
||||
agent_step = AgentStep(
|
||||
thought=action.thought,
|
||||
action=action.name,
|
||||
arguments={k: str(v) for k, v in action.arguments.items()},
|
||||
observation=action.observation
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert agent_step.arguments["zero"] == "0"
|
||||
assert agent_step.arguments["negative"] == "-5"
|
||||
assert agent_step.arguments["negative_float"] == "-3.14"
|
||||
assert agent_step.arguments["positive"] == "10"
|
||||
assert all(isinstance(v, str) for v in agent_step.arguments.values())
|
||||
233
tests/unit/test_agent/test_mcp_tool_auth.py
Normal file
233
tests/unit/test_agent/test_mcp_tool_auth.py
Normal file
|
|
@ -0,0 +1,233 @@
|
|||
"""
|
||||
Unit tests for MCP tool bearer token authentication
|
||||
|
||||
Tests the authentication feature added to MCP tool service that allows
|
||||
configuring optional bearer tokens for MCP server connections.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
||||
import json
|
||||
|
||||
|
||||
class TestMcpToolAuthentication:
|
||||
"""Test cases for MCP tool bearer token authentication"""
|
||||
|
||||
def test_mcp_tool_with_auth_token_header_building(self):
|
||||
"""Test that auth token is correctly formatted in headers"""
|
||||
# Arrange
|
||||
mcp_config = {
|
||||
"url": "https://secure.example.com/mcp",
|
||||
"remote-name": "secure-tool",
|
||||
"auth-token": "test-token-12345"
|
||||
}
|
||||
|
||||
# Act - simulate header building logic from service.py
|
||||
headers = {}
|
||||
if "auth-token" in mcp_config and mcp_config["auth-token"]:
|
||||
token = mcp_config["auth-token"]
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
# Assert
|
||||
assert "Authorization" in headers
|
||||
assert headers["Authorization"] == "Bearer test-token-12345"
|
||||
assert headers["Authorization"].startswith("Bearer ")
|
||||
|
||||
def test_mcp_tool_without_auth_token_header_building(self):
|
||||
"""Test that no auth header is added when token is not present (backward compatibility)"""
|
||||
# Arrange
|
||||
mcp_config = {
|
||||
"url": "http://public.example.com/mcp",
|
||||
"remote-name": "public-tool"
|
||||
# No auth-token field
|
||||
}
|
||||
|
||||
# Act - simulate header building logic from service.py
|
||||
headers = {}
|
||||
if "auth-token" in mcp_config and mcp_config["auth-token"]:
|
||||
token = mcp_config["auth-token"]
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
# Assert
|
||||
assert headers == {}
|
||||
assert "Authorization" not in headers
|
||||
|
||||
def test_mcp_config_with_auth_token(self):
|
||||
"""Test MCP configuration parsing with auth-token"""
|
||||
# Arrange
|
||||
config = {
|
||||
"mcp": {
|
||||
"secure-tool": json.dumps({
|
||||
"url": "https://secure.example.com/mcp",
|
||||
"remote-name": "secure-tool",
|
||||
"auth-token": "test-token-xyz"
|
||||
}),
|
||||
"public-tool": json.dumps({
|
||||
"url": "http://public.example.com/mcp",
|
||||
"remote-name": "public-tool"
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
# Act - simulate on_mcp_config
|
||||
mcp_services = {
|
||||
k: json.loads(v)
|
||||
for k, v in config["mcp"].items()
|
||||
}
|
||||
|
||||
# Assert
|
||||
assert "secure-tool" in mcp_services
|
||||
assert mcp_services["secure-tool"]["auth-token"] == "test-token-xyz"
|
||||
assert mcp_services["secure-tool"]["url"] == "https://secure.example.com/mcp"
|
||||
|
||||
assert "public-tool" in mcp_services
|
||||
assert "auth-token" not in mcp_services["public-tool"]
|
||||
assert mcp_services["public-tool"]["url"] == "http://public.example.com/mcp"
|
||||
|
||||
def test_auth_token_with_empty_string(self):
|
||||
"""Test that empty auth-token string is treated as no auth"""
|
||||
# Arrange
|
||||
config_data = {
|
||||
"url": "https://example.com/mcp",
|
||||
"remote-name": "test-tool",
|
||||
"auth-token": ""
|
||||
}
|
||||
|
||||
# Act - simulate header building logic
|
||||
headers = {}
|
||||
if "auth-token" in config_data and config_data["auth-token"]:
|
||||
headers["Authorization"] = f"Bearer {config_data['auth-token']}"
|
||||
|
||||
# Assert
|
||||
assert headers == {}, "Empty auth-token should not add Authorization header"
|
||||
|
||||
def test_auth_token_with_special_characters(self):
|
||||
"""Test auth token with special characters (JWT-like)"""
|
||||
# Arrange
|
||||
jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
|
||||
|
||||
config_data = {
|
||||
"url": "https://example.com/mcp",
|
||||
"auth-token": jwt_token
|
||||
}
|
||||
|
||||
# Act - simulate header building
|
||||
headers = {}
|
||||
if "auth-token" in config_data and config_data["auth-token"]:
|
||||
token = config_data["auth-token"]
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
# Assert
|
||||
assert headers["Authorization"] == f"Bearer {jwt_token}"
|
||||
assert "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" in headers["Authorization"]
|
||||
|
||||
def test_multiple_tools_with_different_auth_configs(self):
|
||||
"""Test handling multiple MCP tools with mixed auth configurations"""
|
||||
# Arrange
|
||||
mcp_services = {
|
||||
"tool-a": {
|
||||
"url": "https://a.example.com/mcp",
|
||||
"auth-token": "token-a"
|
||||
},
|
||||
"tool-b": {
|
||||
"url": "https://b.example.com/mcp",
|
||||
"auth-token": "token-b"
|
||||
},
|
||||
"tool-c": {
|
||||
"url": "http://c.example.com/mcp"
|
||||
# No auth-token
|
||||
}
|
||||
}
|
||||
|
||||
# Act - simulate header building for each tool
|
||||
headers_a = {}
|
||||
if "auth-token" in mcp_services["tool-a"] and mcp_services["tool-a"]["auth-token"]:
|
||||
headers_a["Authorization"] = f"Bearer {mcp_services['tool-a']['auth-token']}"
|
||||
|
||||
headers_b = {}
|
||||
if "auth-token" in mcp_services["tool-b"] and mcp_services["tool-b"]["auth-token"]:
|
||||
headers_b["Authorization"] = f"Bearer {mcp_services['tool-b']['auth-token']}"
|
||||
|
||||
headers_c = {}
|
||||
if "auth-token" in mcp_services["tool-c"] and mcp_services["tool-c"]["auth-token"]:
|
||||
headers_c["Authorization"] = f"Bearer {mcp_services['tool-c']['auth-token']}"
|
||||
|
||||
# Assert
|
||||
assert headers_a == {"Authorization": "Bearer token-a"}
|
||||
assert headers_b == {"Authorization": "Bearer token-b"}
|
||||
assert headers_c == {}
|
||||
|
||||
def test_auth_token_not_logged(self):
|
||||
"""Test that auth tokens are not exposed in logs"""
|
||||
# This is more of a guideline test - in real implementation,
|
||||
# we should ensure tokens are never logged
|
||||
|
||||
# Arrange
|
||||
auth_token = "super-secret-token-123"
|
||||
config = {
|
||||
"url": "https://secure.example.com/mcp",
|
||||
"auth-token": auth_token
|
||||
}
|
||||
|
||||
# Act - simulate log-safe representation
|
||||
def get_log_safe_config(cfg):
|
||||
"""Return config with sensitive data masked"""
|
||||
safe_config = cfg.copy()
|
||||
if "auth-token" in safe_config and safe_config["auth-token"]:
|
||||
safe_config["auth-token"] = "****"
|
||||
return safe_config
|
||||
|
||||
log_safe = get_log_safe_config(config)
|
||||
|
||||
# Assert
|
||||
assert log_safe["auth-token"] == "****"
|
||||
assert auth_token not in str(log_safe)
|
||||
assert "url" in log_safe
|
||||
assert log_safe["url"] == "https://secure.example.com/mcp"
|
||||
|
||||
def test_auth_token_with_remote_name_configuration(self):
|
||||
"""Test MCP tool configuration with both auth-token and remote-name"""
|
||||
# Arrange
|
||||
mcp_config = {
|
||||
"url": "https://server.example.com/mcp",
|
||||
"remote-name": "actual_tool_name",
|
||||
"auth-token": "my-token-456"
|
||||
}
|
||||
|
||||
# Act - simulate header building and remote name extraction
|
||||
headers = {}
|
||||
if "auth-token" in mcp_config and mcp_config["auth-token"]:
|
||||
token = mcp_config["auth-token"]
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
remote_name = mcp_config.get("remote-name", "default-name")
|
||||
|
||||
# Assert
|
||||
assert headers["Authorization"] == "Bearer my-token-456"
|
||||
assert remote_name == "actual_tool_name"
|
||||
assert "url" in mcp_config
|
||||
assert mcp_config["url"] == "https://server.example.com/mcp"
|
||||
|
||||
def test_bearer_token_format(self):
|
||||
"""Test that Bearer token format is correct"""
|
||||
# Arrange
|
||||
tokens = [
|
||||
"simple-token",
|
||||
"token_with_underscore",
|
||||
"token-with-dash",
|
||||
"TokenWithMixedCase123",
|
||||
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.payload.signature"
|
||||
]
|
||||
|
||||
# Act & Assert
|
||||
for token in tokens:
|
||||
headers = {}
|
||||
if token:
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
# Verify format is "Bearer <token>" with single space
|
||||
assert headers["Authorization"].startswith("Bearer ")
|
||||
assert headers["Authorization"] == f"Bearer {token}"
|
||||
# Verify no extra spaces
|
||||
assert headers["Authorization"].count("Bearer") == 1
|
||||
assert headers["Authorization"].split("Bearer ")[1] == token
|
||||
Loading…
Add table
Add a link
Reference in a new issue