mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 16:36:21 +02:00
* Structured data tech spec * Architecture principles * New schemas * Updated schemas and specs * Object extractor * Add .coveragerc * New tests * Cassandra object storage * Trying to object extraction working, issues exist
421 lines
No EOL
15 KiB
Python
421 lines
No EOL
15 KiB
Python
"""
|
|
Standalone unit tests for Configuration Service Logic
|
|
|
|
Tests core configuration logic without requiring full package imports.
|
|
This focuses on testing the business logic that would be used by the
|
|
configuration service components.
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
from unittest.mock import Mock, AsyncMock
|
|
from typing import Dict, Any
|
|
|
|
|
|
class MockConfigurationLogic:
|
|
"""Mock implementation of configuration logic for testing"""
|
|
|
|
def __init__(self):
|
|
self.data = {}
|
|
|
|
def parse_key(self, full_key: str) -> tuple[str, str]:
|
|
"""Parse 'type.key' format into (type, key)"""
|
|
if '.' not in full_key:
|
|
raise ValueError(f"Invalid key format: {full_key}")
|
|
type_name, key = full_key.split('.', 1)
|
|
return type_name, key
|
|
|
|
def validate_schema_json(self, schema_json: str) -> bool:
|
|
"""Validate that schema JSON is properly formatted"""
|
|
try:
|
|
schema = json.loads(schema_json)
|
|
|
|
# Check required fields
|
|
if "fields" not in schema:
|
|
return False
|
|
|
|
for field in schema["fields"]:
|
|
if "name" not in field or "type" not in field:
|
|
return False
|
|
|
|
# Validate field type
|
|
valid_types = ["string", "integer", "float", "boolean", "timestamp", "date", "time", "uuid"]
|
|
if field["type"] not in valid_types:
|
|
return False
|
|
|
|
return True
|
|
except (json.JSONDecodeError, KeyError):
|
|
return False
|
|
|
|
def put_values(self, values: Dict[str, str]) -> Dict[str, bool]:
|
|
"""Store configuration values, return success status for each"""
|
|
results = {}
|
|
|
|
for full_key, value in values.items():
|
|
try:
|
|
type_name, key = self.parse_key(full_key)
|
|
|
|
# Validate schema if it's a schema type
|
|
if type_name == "schema" and not self.validate_schema_json(value):
|
|
results[full_key] = False
|
|
continue
|
|
|
|
# Store the value
|
|
if type_name not in self.data:
|
|
self.data[type_name] = {}
|
|
self.data[type_name][key] = value
|
|
results[full_key] = True
|
|
|
|
except Exception:
|
|
results[full_key] = False
|
|
|
|
return results
|
|
|
|
def get_values(self, keys: list[str]) -> Dict[str, str | None]:
|
|
"""Retrieve configuration values"""
|
|
results = {}
|
|
|
|
for full_key in keys:
|
|
try:
|
|
type_name, key = self.parse_key(full_key)
|
|
value = self.data.get(type_name, {}).get(key)
|
|
results[full_key] = value
|
|
except Exception:
|
|
results[full_key] = None
|
|
|
|
return results
|
|
|
|
def delete_values(self, keys: list[str]) -> Dict[str, bool]:
|
|
"""Delete configuration values"""
|
|
results = {}
|
|
|
|
for full_key in keys:
|
|
try:
|
|
type_name, key = self.parse_key(full_key)
|
|
if type_name in self.data and key in self.data[type_name]:
|
|
del self.data[type_name][key]
|
|
results[full_key] = True
|
|
else:
|
|
results[full_key] = False
|
|
except Exception:
|
|
results[full_key] = False
|
|
|
|
return results
|
|
|
|
def list_keys(self, type_name: str) -> list[str]:
|
|
"""List all keys for a given type"""
|
|
return list(self.data.get(type_name, {}).keys())
|
|
|
|
def get_type_values(self, type_name: str) -> Dict[str, str]:
|
|
"""Get all key-value pairs for a type"""
|
|
return dict(self.data.get(type_name, {}))
|
|
|
|
def get_all_data(self) -> Dict[str, Dict[str, str]]:
|
|
"""Get all configuration data"""
|
|
return dict(self.data)
|
|
|
|
|
|
class TestConfigurationLogic:
|
|
"""Test cases for configuration business logic"""
|
|
|
|
@pytest.fixture
|
|
def config_logic(self):
|
|
return MockConfigurationLogic()
|
|
|
|
@pytest.fixture
|
|
def sample_schema_json(self):
|
|
return json.dumps({
|
|
"name": "customer_records",
|
|
"description": "Customer information schema",
|
|
"fields": [
|
|
{
|
|
"name": "customer_id",
|
|
"type": "string",
|
|
"primary_key": True,
|
|
"required": True,
|
|
"indexed": True,
|
|
"description": "Unique customer identifier"
|
|
},
|
|
{
|
|
"name": "name",
|
|
"type": "string",
|
|
"required": True,
|
|
"description": "Customer full name"
|
|
},
|
|
{
|
|
"name": "email",
|
|
"type": "string",
|
|
"required": True,
|
|
"indexed": True,
|
|
"description": "Customer email address"
|
|
}
|
|
]
|
|
})
|
|
|
|
def test_parse_key_valid(self, config_logic):
|
|
"""Test parsing valid configuration keys"""
|
|
# Act & Assert
|
|
type_name, key = config_logic.parse_key("schema.customer_records")
|
|
assert type_name == "schema"
|
|
assert key == "customer_records"
|
|
|
|
type_name, key = config_logic.parse_key("flows.processing_flow")
|
|
assert type_name == "flows"
|
|
assert key == "processing_flow"
|
|
|
|
def test_parse_key_invalid(self, config_logic):
|
|
"""Test parsing invalid configuration keys"""
|
|
with pytest.raises(ValueError):
|
|
config_logic.parse_key("invalid_key")
|
|
|
|
def test_validate_schema_json_valid(self, config_logic, sample_schema_json):
|
|
"""Test validation of valid schema JSON"""
|
|
assert config_logic.validate_schema_json(sample_schema_json) is True
|
|
|
|
def test_validate_schema_json_invalid(self, config_logic):
|
|
"""Test validation of invalid schema JSON"""
|
|
# Invalid JSON
|
|
assert config_logic.validate_schema_json("not json") is False
|
|
|
|
# Missing fields
|
|
assert config_logic.validate_schema_json('{"name": "test"}') is False
|
|
|
|
# Invalid field type
|
|
invalid_schema = json.dumps({
|
|
"fields": [{"name": "test", "type": "invalid_type"}]
|
|
})
|
|
assert config_logic.validate_schema_json(invalid_schema) is False
|
|
|
|
# Missing field name
|
|
invalid_schema2 = json.dumps({
|
|
"fields": [{"type": "string"}]
|
|
})
|
|
assert config_logic.validate_schema_json(invalid_schema2) is False
|
|
|
|
def test_put_values_success(self, config_logic, sample_schema_json):
|
|
"""Test storing configuration values successfully"""
|
|
# Arrange
|
|
values = {
|
|
"schema.customer_records": sample_schema_json,
|
|
"flows.test_flow": '{"steps": []}',
|
|
"schema.product_catalog": json.dumps({
|
|
"fields": [{"name": "sku", "type": "string"}]
|
|
})
|
|
}
|
|
|
|
# Act
|
|
results = config_logic.put_values(values)
|
|
|
|
# Assert
|
|
assert all(results.values()) # All should succeed
|
|
assert len(results) == 3
|
|
|
|
# Verify data was stored
|
|
assert "schema" in config_logic.data
|
|
assert "customer_records" in config_logic.data["schema"]
|
|
assert config_logic.data["schema"]["customer_records"] == sample_schema_json
|
|
|
|
def test_put_values_with_invalid_schema(self, config_logic):
|
|
"""Test storing values with invalid schema"""
|
|
# Arrange
|
|
values = {
|
|
"schema.valid": json.dumps({"fields": [{"name": "id", "type": "string"}]}),
|
|
"schema.invalid": "not valid json",
|
|
"flows.test": '{"steps": []}' # Non-schema should still work
|
|
}
|
|
|
|
# Act
|
|
results = config_logic.put_values(values)
|
|
|
|
# Assert
|
|
assert results["schema.valid"] is True
|
|
assert results["schema.invalid"] is False
|
|
assert results["flows.test"] is True
|
|
|
|
# Only valid values should be stored
|
|
assert "valid" in config_logic.data.get("schema", {})
|
|
assert "invalid" not in config_logic.data.get("schema", {})
|
|
assert "test" in config_logic.data.get("flows", {})
|
|
|
|
def test_get_values(self, config_logic, sample_schema_json):
|
|
"""Test retrieving configuration values"""
|
|
# Arrange
|
|
config_logic.data = {
|
|
"schema": {"customer_records": sample_schema_json},
|
|
"flows": {"test_flow": '{"steps": []}'}
|
|
}
|
|
|
|
keys = ["schema.customer_records", "schema.nonexistent", "flows.test_flow"]
|
|
|
|
# Act
|
|
results = config_logic.get_values(keys)
|
|
|
|
# Assert
|
|
assert results["schema.customer_records"] == sample_schema_json
|
|
assert results["schema.nonexistent"] is None
|
|
assert results["flows.test_flow"] == '{"steps": []}'
|
|
|
|
def test_delete_values(self, config_logic, sample_schema_json):
|
|
"""Test deleting configuration values"""
|
|
# Arrange
|
|
config_logic.data = {
|
|
"schema": {
|
|
"customer_records": sample_schema_json,
|
|
"product_catalog": '{"fields": []}'
|
|
}
|
|
}
|
|
|
|
keys = ["schema.customer_records", "schema.nonexistent"]
|
|
|
|
# Act
|
|
results = config_logic.delete_values(keys)
|
|
|
|
# Assert
|
|
assert results["schema.customer_records"] is True
|
|
assert results["schema.nonexistent"] is False
|
|
|
|
# Verify deletion
|
|
assert "customer_records" not in config_logic.data["schema"]
|
|
assert "product_catalog" in config_logic.data["schema"] # Should remain
|
|
|
|
def test_list_keys(self, config_logic):
|
|
"""Test listing keys for a type"""
|
|
# Arrange
|
|
config_logic.data = {
|
|
"schema": {"customer_records": "...", "product_catalog": "..."},
|
|
"flows": {"flow1": "...", "flow2": "..."}
|
|
}
|
|
|
|
# Act
|
|
schema_keys = config_logic.list_keys("schema")
|
|
flow_keys = config_logic.list_keys("flows")
|
|
empty_keys = config_logic.list_keys("nonexistent")
|
|
|
|
# Assert
|
|
assert set(schema_keys) == {"customer_records", "product_catalog"}
|
|
assert set(flow_keys) == {"flow1", "flow2"}
|
|
assert empty_keys == []
|
|
|
|
def test_get_type_values(self, config_logic, sample_schema_json):
|
|
"""Test getting all values for a type"""
|
|
# Arrange
|
|
config_logic.data = {
|
|
"schema": {
|
|
"customer_records": sample_schema_json,
|
|
"product_catalog": '{"fields": []}'
|
|
}
|
|
}
|
|
|
|
# Act
|
|
schema_values = config_logic.get_type_values("schema")
|
|
|
|
# Assert
|
|
assert len(schema_values) == 2
|
|
assert schema_values["customer_records"] == sample_schema_json
|
|
assert schema_values["product_catalog"] == '{"fields": []}'
|
|
|
|
def test_get_all_data(self, config_logic):
|
|
"""Test getting all configuration data"""
|
|
# Arrange
|
|
test_data = {
|
|
"schema": {"test_schema": "{}"},
|
|
"flows": {"test_flow": "{}"}
|
|
}
|
|
config_logic.data = test_data
|
|
|
|
# Act
|
|
all_data = config_logic.get_all_data()
|
|
|
|
# Assert
|
|
assert all_data == test_data
|
|
assert all_data is not config_logic.data # Should be a copy
|
|
|
|
|
|
class TestSchemaValidationLogic:
|
|
"""Test schema validation business logic"""
|
|
|
|
def test_valid_schema_all_field_types(self):
|
|
"""Test schema with all supported field types"""
|
|
schema = {
|
|
"name": "all_types_schema",
|
|
"description": "Schema with all field types",
|
|
"fields": [
|
|
{"name": "text_field", "type": "string", "required": True},
|
|
{"name": "int_field", "type": "integer", "size": 4},
|
|
{"name": "bigint_field", "type": "integer", "size": 8},
|
|
{"name": "float_field", "type": "float", "size": 4},
|
|
{"name": "double_field", "type": "float", "size": 8},
|
|
{"name": "bool_field", "type": "boolean"},
|
|
{"name": "timestamp_field", "type": "timestamp"},
|
|
{"name": "date_field", "type": "date"},
|
|
{"name": "time_field", "type": "time"},
|
|
{"name": "uuid_field", "type": "uuid"},
|
|
{"name": "primary_field", "type": "string", "primary_key": True},
|
|
{"name": "indexed_field", "type": "string", "indexed": True},
|
|
{"name": "enum_field", "type": "string", "enum": ["active", "inactive"]}
|
|
]
|
|
}
|
|
|
|
schema_json = json.dumps(schema)
|
|
logic = MockConfigurationLogic()
|
|
|
|
assert logic.validate_schema_json(schema_json) is True
|
|
|
|
def test_schema_field_constraints(self):
|
|
"""Test various schema field constraint scenarios"""
|
|
logic = MockConfigurationLogic()
|
|
|
|
# Test required vs optional fields
|
|
schema_with_required = {
|
|
"fields": [
|
|
{"name": "required_field", "type": "string", "required": True},
|
|
{"name": "optional_field", "type": "string", "required": False}
|
|
]
|
|
}
|
|
assert logic.validate_schema_json(json.dumps(schema_with_required)) is True
|
|
|
|
# Test primary key fields
|
|
schema_with_primary = {
|
|
"fields": [
|
|
{"name": "id", "type": "string", "primary_key": True},
|
|
{"name": "data", "type": "string"}
|
|
]
|
|
}
|
|
assert logic.validate_schema_json(json.dumps(schema_with_primary)) is True
|
|
|
|
# Test indexed fields
|
|
schema_with_indexes = {
|
|
"fields": [
|
|
{"name": "searchable", "type": "string", "indexed": True},
|
|
{"name": "non_searchable", "type": "string", "indexed": False}
|
|
]
|
|
}
|
|
assert logic.validate_schema_json(json.dumps(schema_with_indexes)) is True
|
|
|
|
def test_configuration_versioning_logic(self):
|
|
"""Test configuration versioning concepts"""
|
|
# This tests the logical concepts around versioning
|
|
# that would be used in the actual implementation
|
|
|
|
version_history = []
|
|
|
|
def increment_version(current_version: int) -> int:
|
|
new_version = current_version + 1
|
|
version_history.append(new_version)
|
|
return new_version
|
|
|
|
def get_latest_version() -> int:
|
|
return max(version_history) if version_history else 0
|
|
|
|
# Test version progression
|
|
assert get_latest_version() == 0
|
|
|
|
v1 = increment_version(0)
|
|
assert v1 == 1
|
|
assert get_latest_version() == 1
|
|
|
|
v2 = increment_version(v1)
|
|
assert v2 == 2
|
|
assert get_latest_version() == 2
|
|
|
|
assert len(version_history) == 2 |