trustgraph/tests/unit/test_config/test_config_logic.py

421 lines
15 KiB
Python
Raw Normal View History

"""
Standalone unit tests for Configuration Service Logic
Tests core configuration logic without requiring full package imports.
This focuses on testing the business logic that would be used by the
configuration service components.
"""
import pytest
import json
from unittest.mock import Mock, AsyncMock
from typing import Dict, Any
class MockConfigurationLogic:
"""Mock implementation of configuration logic for testing"""
def __init__(self):
self.data = {}
def parse_key(self, full_key: str) -> tuple[str, str]:
"""Parse 'type.key' format into (type, key)"""
if '.' not in full_key:
raise ValueError(f"Invalid key format: {full_key}")
type_name, key = full_key.split('.', 1)
return type_name, key
def validate_schema_json(self, schema_json: str) -> bool:
"""Validate that schema JSON is properly formatted"""
try:
schema = json.loads(schema_json)
# Check required fields
if "fields" not in schema:
return False
for field in schema["fields"]:
if "name" not in field or "type" not in field:
return False
# Validate field type
valid_types = ["string", "integer", "float", "boolean", "timestamp", "date", "time", "uuid"]
if field["type"] not in valid_types:
return False
return True
except (json.JSONDecodeError, KeyError):
return False
def put_values(self, values: Dict[str, str]) -> Dict[str, bool]:
"""Store configuration values, return success status for each"""
results = {}
for full_key, value in values.items():
try:
type_name, key = self.parse_key(full_key)
# Validate schema if it's a schema type
if type_name == "schema" and not self.validate_schema_json(value):
results[full_key] = False
continue
# Store the value
if type_name not in self.data:
self.data[type_name] = {}
self.data[type_name][key] = value
results[full_key] = True
except Exception:
results[full_key] = False
return results
def get_values(self, keys: list[str]) -> Dict[str, str | None]:
"""Retrieve configuration values"""
results = {}
for full_key in keys:
try:
type_name, key = self.parse_key(full_key)
value = self.data.get(type_name, {}).get(key)
results[full_key] = value
except Exception:
results[full_key] = None
return results
def delete_values(self, keys: list[str]) -> Dict[str, bool]:
"""Delete configuration values"""
results = {}
for full_key in keys:
try:
type_name, key = self.parse_key(full_key)
if type_name in self.data and key in self.data[type_name]:
del self.data[type_name][key]
results[full_key] = True
else:
results[full_key] = False
except Exception:
results[full_key] = False
return results
def list_keys(self, type_name: str) -> list[str]:
"""List all keys for a given type"""
return list(self.data.get(type_name, {}).keys())
def get_type_values(self, type_name: str) -> Dict[str, str]:
"""Get all key-value pairs for a type"""
return dict(self.data.get(type_name, {}))
def get_all_data(self) -> Dict[str, Dict[str, str]]:
"""Get all configuration data"""
return dict(self.data)
class TestConfigurationLogic:
"""Test cases for configuration business logic"""
@pytest.fixture
def config_logic(self):
return MockConfigurationLogic()
@pytest.fixture
def sample_schema_json(self):
return json.dumps({
"name": "customer_records",
"description": "Customer information schema",
"fields": [
{
"name": "customer_id",
"type": "string",
"primary_key": True,
"required": True,
"indexed": True,
"description": "Unique customer identifier"
},
{
"name": "name",
"type": "string",
"required": True,
"description": "Customer full name"
},
{
"name": "email",
"type": "string",
"required": True,
"indexed": True,
"description": "Customer email address"
}
]
})
def test_parse_key_valid(self, config_logic):
"""Test parsing valid configuration keys"""
# Act & Assert
type_name, key = config_logic.parse_key("schema.customer_records")
assert type_name == "schema"
assert key == "customer_records"
type_name, key = config_logic.parse_key("flows.processing_flow")
assert type_name == "flows"
assert key == "processing_flow"
def test_parse_key_invalid(self, config_logic):
"""Test parsing invalid configuration keys"""
with pytest.raises(ValueError):
config_logic.parse_key("invalid_key")
def test_validate_schema_json_valid(self, config_logic, sample_schema_json):
"""Test validation of valid schema JSON"""
assert config_logic.validate_schema_json(sample_schema_json) is True
def test_validate_schema_json_invalid(self, config_logic):
"""Test validation of invalid schema JSON"""
# Invalid JSON
assert config_logic.validate_schema_json("not json") is False
# Missing fields
assert config_logic.validate_schema_json('{"name": "test"}') is False
# Invalid field type
invalid_schema = json.dumps({
"fields": [{"name": "test", "type": "invalid_type"}]
})
assert config_logic.validate_schema_json(invalid_schema) is False
# Missing field name
invalid_schema2 = json.dumps({
"fields": [{"type": "string"}]
})
assert config_logic.validate_schema_json(invalid_schema2) is False
def test_put_values_success(self, config_logic, sample_schema_json):
"""Test storing configuration values successfully"""
# Arrange
values = {
"schema.customer_records": sample_schema_json,
"flows.test_flow": '{"steps": []}',
"schema.product_catalog": json.dumps({
"fields": [{"name": "sku", "type": "string"}]
})
}
# Act
results = config_logic.put_values(values)
# Assert
assert all(results.values()) # All should succeed
assert len(results) == 3
# Verify data was stored
assert "schema" in config_logic.data
assert "customer_records" in config_logic.data["schema"]
assert config_logic.data["schema"]["customer_records"] == sample_schema_json
def test_put_values_with_invalid_schema(self, config_logic):
"""Test storing values with invalid schema"""
# Arrange
values = {
"schema.valid": json.dumps({"fields": [{"name": "id", "type": "string"}]}),
"schema.invalid": "not valid json",
"flows.test": '{"steps": []}' # Non-schema should still work
}
# Act
results = config_logic.put_values(values)
# Assert
assert results["schema.valid"] is True
assert results["schema.invalid"] is False
assert results["flows.test"] is True
# Only valid values should be stored
assert "valid" in config_logic.data.get("schema", {})
assert "invalid" not in config_logic.data.get("schema", {})
assert "test" in config_logic.data.get("flows", {})
def test_get_values(self, config_logic, sample_schema_json):
"""Test retrieving configuration values"""
# Arrange
config_logic.data = {
"schema": {"customer_records": sample_schema_json},
"flows": {"test_flow": '{"steps": []}'}
}
keys = ["schema.customer_records", "schema.nonexistent", "flows.test_flow"]
# Act
results = config_logic.get_values(keys)
# Assert
assert results["schema.customer_records"] == sample_schema_json
assert results["schema.nonexistent"] is None
assert results["flows.test_flow"] == '{"steps": []}'
def test_delete_values(self, config_logic, sample_schema_json):
"""Test deleting configuration values"""
# Arrange
config_logic.data = {
"schema": {
"customer_records": sample_schema_json,
"product_catalog": '{"fields": []}'
}
}
keys = ["schema.customer_records", "schema.nonexistent"]
# Act
results = config_logic.delete_values(keys)
# Assert
assert results["schema.customer_records"] is True
assert results["schema.nonexistent"] is False
# Verify deletion
assert "customer_records" not in config_logic.data["schema"]
assert "product_catalog" in config_logic.data["schema"] # Should remain
def test_list_keys(self, config_logic):
"""Test listing keys for a type"""
# Arrange
config_logic.data = {
"schema": {"customer_records": "...", "product_catalog": "..."},
"flows": {"flow1": "...", "flow2": "..."}
}
# Act
schema_keys = config_logic.list_keys("schema")
flow_keys = config_logic.list_keys("flows")
empty_keys = config_logic.list_keys("nonexistent")
# Assert
assert set(schema_keys) == {"customer_records", "product_catalog"}
assert set(flow_keys) == {"flow1", "flow2"}
assert empty_keys == []
def test_get_type_values(self, config_logic, sample_schema_json):
"""Test getting all values for a type"""
# Arrange
config_logic.data = {
"schema": {
"customer_records": sample_schema_json,
"product_catalog": '{"fields": []}'
}
}
# Act
schema_values = config_logic.get_type_values("schema")
# Assert
assert len(schema_values) == 2
assert schema_values["customer_records"] == sample_schema_json
assert schema_values["product_catalog"] == '{"fields": []}'
def test_get_all_data(self, config_logic):
"""Test getting all configuration data"""
# Arrange
test_data = {
"schema": {"test_schema": "{}"},
"flows": {"test_flow": "{}"}
}
config_logic.data = test_data
# Act
all_data = config_logic.get_all_data()
# Assert
assert all_data == test_data
assert all_data is not config_logic.data # Should be a copy
class TestSchemaValidationLogic:
"""Test schema validation business logic"""
def test_valid_schema_all_field_types(self):
"""Test schema with all supported field types"""
schema = {
"name": "all_types_schema",
"description": "Schema with all field types",
"fields": [
{"name": "text_field", "type": "string", "required": True},
{"name": "int_field", "type": "integer", "size": 4},
{"name": "bigint_field", "type": "integer", "size": 8},
{"name": "float_field", "type": "float", "size": 4},
{"name": "double_field", "type": "float", "size": 8},
{"name": "bool_field", "type": "boolean"},
{"name": "timestamp_field", "type": "timestamp"},
{"name": "date_field", "type": "date"},
{"name": "time_field", "type": "time"},
{"name": "uuid_field", "type": "uuid"},
{"name": "primary_field", "type": "string", "primary_key": True},
{"name": "indexed_field", "type": "string", "indexed": True},
{"name": "enum_field", "type": "string", "enum": ["active", "inactive"]}
]
}
schema_json = json.dumps(schema)
logic = MockConfigurationLogic()
assert logic.validate_schema_json(schema_json) is True
def test_schema_field_constraints(self):
"""Test various schema field constraint scenarios"""
logic = MockConfigurationLogic()
# Test required vs optional fields
schema_with_required = {
"fields": [
{"name": "required_field", "type": "string", "required": True},
{"name": "optional_field", "type": "string", "required": False}
]
}
assert logic.validate_schema_json(json.dumps(schema_with_required)) is True
# Test primary key fields
schema_with_primary = {
"fields": [
{"name": "id", "type": "string", "primary_key": True},
{"name": "data", "type": "string"}
]
}
assert logic.validate_schema_json(json.dumps(schema_with_primary)) is True
# Test indexed fields
schema_with_indexes = {
"fields": [
{"name": "searchable", "type": "string", "indexed": True},
{"name": "non_searchable", "type": "string", "indexed": False}
]
}
assert logic.validate_schema_json(json.dumps(schema_with_indexes)) is True
def test_configuration_versioning_logic(self):
"""Test configuration versioning concepts"""
# This tests the logical concepts around versioning
# that would be used in the actual implementation
version_history = []
def increment_version(current_version: int) -> int:
new_version = current_version + 1
version_history.append(new_version)
return new_version
def get_latest_version() -> int:
return max(version_history) if version_history else 0
# Test version progression
assert get_latest_version() == 0
v1 = increment_version(0)
assert v1 == 1
assert get_latest_version() == 1
v2 = increment_version(v1)
assert v2 == 2
assert get_latest_version() == 2
assert len(version_history) == 2