""" Standalone unit tests for Configuration Service Logic Tests core configuration logic without requiring full package imports. This focuses on testing the business logic that would be used by the configuration service components. """ import pytest import json from unittest.mock import Mock, AsyncMock from typing import Dict, Any class MockConfigurationLogic: """Mock implementation of configuration logic for testing""" def __init__(self): self.data = {} def parse_key(self, full_key: str) -> tuple[str, str]: """Parse 'type.key' format into (type, key)""" if '.' not in full_key: raise ValueError(f"Invalid key format: {full_key}") type_name, key = full_key.split('.', 1) return type_name, key def validate_schema_json(self, schema_json: str) -> bool: """Validate that schema JSON is properly formatted""" try: schema = json.loads(schema_json) # Check required fields if "fields" not in schema: return False for field in schema["fields"]: if "name" not in field or "type" not in field: return False # Validate field type valid_types = ["string", "integer", "float", "boolean", "timestamp", "date", "time", "uuid"] if field["type"] not in valid_types: return False return True except (json.JSONDecodeError, KeyError): return False def put_values(self, values: Dict[str, str]) -> Dict[str, bool]: """Store configuration values, return success status for each""" results = {} for full_key, value in values.items(): try: type_name, key = self.parse_key(full_key) # Validate schema if it's a schema type if type_name == "schema" and not self.validate_schema_json(value): results[full_key] = False continue # Store the value if type_name not in self.data: self.data[type_name] = {} self.data[type_name][key] = value results[full_key] = True except Exception: results[full_key] = False return results def get_values(self, keys: list[str]) -> Dict[str, str | None]: """Retrieve configuration values""" results = {} for full_key in keys: try: type_name, key = self.parse_key(full_key) value = self.data.get(type_name, {}).get(key) results[full_key] = value except Exception: results[full_key] = None return results def delete_values(self, keys: list[str]) -> Dict[str, bool]: """Delete configuration values""" results = {} for full_key in keys: try: type_name, key = self.parse_key(full_key) if type_name in self.data and key in self.data[type_name]: del self.data[type_name][key] results[full_key] = True else: results[full_key] = False except Exception: results[full_key] = False return results def list_keys(self, type_name: str) -> list[str]: """List all keys for a given type""" return list(self.data.get(type_name, {}).keys()) def get_type_values(self, type_name: str) -> Dict[str, str]: """Get all key-value pairs for a type""" return dict(self.data.get(type_name, {})) def get_all_data(self) -> Dict[str, Dict[str, str]]: """Get all configuration data""" return dict(self.data) class TestConfigurationLogic: """Test cases for configuration business logic""" @pytest.fixture def config_logic(self): return MockConfigurationLogic() @pytest.fixture def sample_schema_json(self): return json.dumps({ "name": "customer_records", "description": "Customer information schema", "fields": [ { "name": "customer_id", "type": "string", "primary_key": True, "required": True, "indexed": True, "description": "Unique customer identifier" }, { "name": "name", "type": "string", "required": True, "description": "Customer full name" }, { "name": "email", "type": "string", "required": True, "indexed": True, "description": "Customer email address" } ] }) def test_parse_key_valid(self, config_logic): """Test parsing valid configuration keys""" # Act & Assert type_name, key = config_logic.parse_key("schema.customer_records") assert type_name == "schema" assert key == "customer_records" type_name, key = config_logic.parse_key("flows.processing_flow") assert type_name == "flows" assert key == "processing_flow" def test_parse_key_invalid(self, config_logic): """Test parsing invalid configuration keys""" with pytest.raises(ValueError): config_logic.parse_key("invalid_key") def test_validate_schema_json_valid(self, config_logic, sample_schema_json): """Test validation of valid schema JSON""" assert config_logic.validate_schema_json(sample_schema_json) is True def test_validate_schema_json_invalid(self, config_logic): """Test validation of invalid schema JSON""" # Invalid JSON assert config_logic.validate_schema_json("not json") is False # Missing fields assert config_logic.validate_schema_json('{"name": "test"}') is False # Invalid field type invalid_schema = json.dumps({ "fields": [{"name": "test", "type": "invalid_type"}] }) assert config_logic.validate_schema_json(invalid_schema) is False # Missing field name invalid_schema2 = json.dumps({ "fields": [{"type": "string"}] }) assert config_logic.validate_schema_json(invalid_schema2) is False def test_put_values_success(self, config_logic, sample_schema_json): """Test storing configuration values successfully""" # Arrange values = { "schema.customer_records": sample_schema_json, "flows.test_flow": '{"steps": []}', "schema.product_catalog": json.dumps({ "fields": [{"name": "sku", "type": "string"}] }) } # Act results = config_logic.put_values(values) # Assert assert all(results.values()) # All should succeed assert len(results) == 3 # Verify data was stored assert "schema" in config_logic.data assert "customer_records" in config_logic.data["schema"] assert config_logic.data["schema"]["customer_records"] == sample_schema_json def test_put_values_with_invalid_schema(self, config_logic): """Test storing values with invalid schema""" # Arrange values = { "schema.valid": json.dumps({"fields": [{"name": "id", "type": "string"}]}), "schema.invalid": "not valid json", "flows.test": '{"steps": []}' # Non-schema should still work } # Act results = config_logic.put_values(values) # Assert assert results["schema.valid"] is True assert results["schema.invalid"] is False assert results["flows.test"] is True # Only valid values should be stored assert "valid" in config_logic.data.get("schema", {}) assert "invalid" not in config_logic.data.get("schema", {}) assert "test" in config_logic.data.get("flows", {}) def test_get_values(self, config_logic, sample_schema_json): """Test retrieving configuration values""" # Arrange config_logic.data = { "schema": {"customer_records": sample_schema_json}, "flows": {"test_flow": '{"steps": []}'} } keys = ["schema.customer_records", "schema.nonexistent", "flows.test_flow"] # Act results = config_logic.get_values(keys) # Assert assert results["schema.customer_records"] == sample_schema_json assert results["schema.nonexistent"] is None assert results["flows.test_flow"] == '{"steps": []}' def test_delete_values(self, config_logic, sample_schema_json): """Test deleting configuration values""" # Arrange config_logic.data = { "schema": { "customer_records": sample_schema_json, "product_catalog": '{"fields": []}' } } keys = ["schema.customer_records", "schema.nonexistent"] # Act results = config_logic.delete_values(keys) # Assert assert results["schema.customer_records"] is True assert results["schema.nonexistent"] is False # Verify deletion assert "customer_records" not in config_logic.data["schema"] assert "product_catalog" in config_logic.data["schema"] # Should remain def test_list_keys(self, config_logic): """Test listing keys for a type""" # Arrange config_logic.data = { "schema": {"customer_records": "...", "product_catalog": "..."}, "flows": {"flow1": "...", "flow2": "..."} } # Act schema_keys = config_logic.list_keys("schema") flow_keys = config_logic.list_keys("flows") empty_keys = config_logic.list_keys("nonexistent") # Assert assert set(schema_keys) == {"customer_records", "product_catalog"} assert set(flow_keys) == {"flow1", "flow2"} assert empty_keys == [] def test_get_type_values(self, config_logic, sample_schema_json): """Test getting all values for a type""" # Arrange config_logic.data = { "schema": { "customer_records": sample_schema_json, "product_catalog": '{"fields": []}' } } # Act schema_values = config_logic.get_type_values("schema") # Assert assert len(schema_values) == 2 assert schema_values["customer_records"] == sample_schema_json assert schema_values["product_catalog"] == '{"fields": []}' def test_get_all_data(self, config_logic): """Test getting all configuration data""" # Arrange test_data = { "schema": {"test_schema": "{}"}, "flows": {"test_flow": "{}"} } config_logic.data = test_data # Act all_data = config_logic.get_all_data() # Assert assert all_data == test_data assert all_data is not config_logic.data # Should be a copy class TestSchemaValidationLogic: """Test schema validation business logic""" def test_valid_schema_all_field_types(self): """Test schema with all supported field types""" schema = { "name": "all_types_schema", "description": "Schema with all field types", "fields": [ {"name": "text_field", "type": "string", "required": True}, {"name": "int_field", "type": "integer", "size": 4}, {"name": "bigint_field", "type": "integer", "size": 8}, {"name": "float_field", "type": "float", "size": 4}, {"name": "double_field", "type": "float", "size": 8}, {"name": "bool_field", "type": "boolean"}, {"name": "timestamp_field", "type": "timestamp"}, {"name": "date_field", "type": "date"}, {"name": "time_field", "type": "time"}, {"name": "uuid_field", "type": "uuid"}, {"name": "primary_field", "type": "string", "primary_key": True}, {"name": "indexed_field", "type": "string", "indexed": True}, {"name": "enum_field", "type": "string", "enum": ["active", "inactive"]} ] } schema_json = json.dumps(schema) logic = MockConfigurationLogic() assert logic.validate_schema_json(schema_json) is True def test_schema_field_constraints(self): """Test various schema field constraint scenarios""" logic = MockConfigurationLogic() # Test required vs optional fields schema_with_required = { "fields": [ {"name": "required_field", "type": "string", "required": True}, {"name": "optional_field", "type": "string", "required": False} ] } assert logic.validate_schema_json(json.dumps(schema_with_required)) is True # Test primary key fields schema_with_primary = { "fields": [ {"name": "id", "type": "string", "primary_key": True}, {"name": "data", "type": "string"} ] } assert logic.validate_schema_json(json.dumps(schema_with_primary)) is True # Test indexed fields schema_with_indexes = { "fields": [ {"name": "searchable", "type": "string", "indexed": True}, {"name": "non_searchable", "type": "string", "indexed": False} ] } assert logic.validate_schema_json(json.dumps(schema_with_indexes)) is True def test_configuration_versioning_logic(self): """Test configuration versioning concepts""" # This tests the logical concepts around versioning # that would be used in the actual implementation version_history = [] def increment_version(current_version: int) -> int: new_version = current_version + 1 version_history.append(new_version) return new_version def get_latest_version() -> int: return max(version_history) if version_history else 0 # Test version progression assert get_latest_version() == 0 v1 = increment_version(0) assert v1 == 1 assert get_latest_version() == 1 v2 = increment_version(v1) assert v2 == 2 assert get_latest_version() == 2 assert len(version_history) == 2