Add unit tests for base helper modules

This commit is contained in:
Zeel 2026-04-13 15:34:54 -04:00
parent ec8f740de3
commit 4e63dbda4f
4 changed files with 338 additions and 2 deletions

View file

@ -0,0 +1,80 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock
from trustgraph.base.flow import Flow
from trustgraph.base.parameter_spec import Parameter, ParameterSpec
from trustgraph.base.spec import Spec
def test_parameter_spec_is_a_spec_and_adds_parameter_value():
spec = ParameterSpec("temperature")
flow = MagicMock(parameter={})
processor = MagicMock()
spec.add(flow, processor, {"temperature": 0.7})
assert isinstance(spec, Spec)
assert "temperature" in flow.parameter
assert isinstance(flow.parameter["temperature"], Parameter)
assert flow.parameter["temperature"].value == 0.7
def test_parameter_spec_defaults_missing_values_to_none():
spec = ParameterSpec("model")
flow = MagicMock(parameter={})
spec.add(flow, MagicMock(), {})
assert flow.parameter["model"].value is None
def test_parameter_start_and_stop_are_awaitable():
parameter = Parameter("value")
assert asyncio.run(parameter.start()) is None
assert asyncio.run(parameter.stop()) is None
def test_flow_initialization_calls_registered_specs():
spec_one = MagicMock()
spec_two = MagicMock()
processor = MagicMock(specifications=[spec_one, spec_two])
flow = Flow("processor-1", "flow-a", processor, {"answer": 42})
assert flow.id == "processor-1"
assert flow.name == "flow-a"
assert flow.producer == {}
assert flow.consumer == {}
assert flow.parameter == {}
spec_one.add.assert_called_once_with(flow, processor, {"answer": 42})
spec_two.add.assert_called_once_with(flow, processor, {"answer": 42})
def test_flow_start_and_stop_visit_all_consumers():
consumer_one = AsyncMock()
consumer_two = AsyncMock()
flow = Flow("processor-1", "flow-a", MagicMock(specifications=[]), {})
flow.consumer = {"one": consumer_one, "two": consumer_two}
asyncio.run(flow.start())
asyncio.run(flow.stop())
consumer_one.start.assert_called_once_with()
consumer_two.start.assert_called_once_with()
consumer_one.stop.assert_called_once_with()
consumer_two.stop.assert_called_once_with()
def test_flow_call_returns_values_in_priority_order():
flow = Flow("processor-1", "flow-a", MagicMock(specifications=[]), {})
flow.producer["shared"] = "producer-value"
flow.consumer["consumer-only"] = "consumer-value"
flow.consumer["shared"] = "consumer-value"
flow.parameter["parameter-only"] = Parameter("parameter-value")
flow.parameter["shared"] = Parameter("parameter-value")
assert flow("shared") == "producer-value"
assert flow("consumer-only") == "consumer-value"
assert flow("parameter-only") == "parameter-value"
assert flow("missing") is None

View file

@ -0,0 +1,127 @@
import argparse
import logging
import sys
from types import SimpleNamespace
from unittest.mock import MagicMock
from trustgraph.base.logging import add_logging_args, setup_logging
def test_add_logging_args_uses_environment_defaults(monkeypatch):
monkeypatch.setenv("LOKI_URL", "http://example.test/loki")
monkeypatch.setenv("LOKI_USERNAME", "user")
monkeypatch.setenv("LOKI_PASSWORD", "pass")
parser = argparse.ArgumentParser()
add_logging_args(parser)
args = parser.parse_args([])
assert args.log_level == "INFO"
assert args.loki_enabled is True
assert args.loki_url == "http://example.test/loki"
assert args.loki_username == "user"
assert args.loki_password == "pass"
def test_add_logging_args_supports_disabling_loki():
parser = argparse.ArgumentParser()
add_logging_args(parser)
args = parser.parse_args(["--no-loki-enabled"])
assert args.loki_enabled is False
def test_setup_logging_without_loki_configures_console(monkeypatch):
basic_config = MagicMock()
logger = MagicMock()
monkeypatch.setattr(logging, "basicConfig", basic_config)
monkeypatch.setattr(logging, "getLogger", lambda name=None: logger)
setup_logging({"log_level": "debug", "loki_enabled": False, "id": "processor-1"})
kwargs = basic_config.call_args.kwargs
assert kwargs["level"] == logging.DEBUG
assert kwargs["force"] is True
assert "processor-1" in kwargs["format"]
assert len(kwargs["handlers"]) == 1
logger.info.assert_called_once_with("Logging configured with level: debug")
def test_setup_logging_with_loki_enables_queue_listener(monkeypatch):
basic_config = MagicMock()
root_logger = MagicMock()
module_logger = MagicMock()
urllib3_logger = MagicMock()
connectionpool_logger = MagicMock()
queue_handler = MagicMock()
queue_listener = MagicMock()
loki_handler = MagicMock()
logger_map = {
None: root_logger,
"trustgraph.base.logging": module_logger,
"urllib3": urllib3_logger,
"urllib3.connectionpool": connectionpool_logger,
}
monkeypatch.setattr(logging, "basicConfig", basic_config)
monkeypatch.setattr(logging, "getLogger", lambda name=None: logger_map[name])
monkeypatch.setattr(
logging.handlers,
"QueueHandler",
MagicMock(return_value=queue_handler),
)
monkeypatch.setattr(
logging.handlers,
"QueueListener",
MagicMock(return_value=queue_listener),
)
monkeypatch.setitem(
sys.modules,
"logging_loki",
SimpleNamespace(LokiHandler=MagicMock(return_value=loki_handler)),
)
setup_logging(
{
"log_level": "INFO",
"loki_enabled": True,
"loki_url": "http://loki.test/push",
"loki_username": "user",
"loki_password": "pass",
"id": "processor-1",
}
)
assert root_logger.loki_queue_listener is queue_listener
queue_listener.start.assert_called_once_with()
urllib3_logger.setLevel.assert_called_once_with(logging.WARNING)
connectionpool_logger.setLevel.assert_called_once_with(logging.WARNING)
module_logger.info.assert_any_call("Logging configured with level: INFO")
module_logger.info.assert_any_call("Loki logging enabled: http://loki.test/push")
def test_setup_logging_falls_back_when_loki_module_missing(monkeypatch, capsys):
basic_config = MagicMock()
logger = MagicMock()
monkeypatch.setattr(logging, "basicConfig", basic_config)
monkeypatch.setattr(logging, "getLogger", lambda name=None: logger)
monkeypatch.delitem(sys.modules, "logging_loki", raising=False)
real_import = __import__
def fake_import(name, *args, **kwargs):
if name == "logging_loki":
raise ImportError("missing")
return real_import(name, *args, **kwargs)
monkeypatch.setattr("builtins.__import__", fake_import)
setup_logging({"log_level": "INFO", "loki_enabled": True, "id": "processor-1"})
output = capsys.readouterr().out
assert "python-logging-loki not installed" in output
logger.warning.assert_called_once_with("Loki logging requested but not available")

View file

@ -0,0 +1,129 @@
from unittest.mock import MagicMock
import pytest
from trustgraph.base import metrics
@pytest.fixture(autouse=True)
def reset_metric_singletons():
classes_and_attrs = {
metrics.ConsumerMetrics: [
"state_metric",
"request_metric",
"processing_metric",
"rate_limit_metric",
],
metrics.ProducerMetrics: ["producer_metric"],
metrics.ProcessorMetrics: ["processor_metric"],
metrics.SubscriberMetrics: [
"state_metric",
"received_metric",
"dropped_metric",
],
}
for cls, attrs in classes_and_attrs.items():
for attr in attrs:
if hasattr(cls, attr):
delattr(cls, attr)
yield
for cls, attrs in classes_and_attrs.items():
for attr in attrs:
if hasattr(cls, attr):
delattr(cls, attr)
def test_consumer_metrics_reuses_singletons_and_records_events(monkeypatch):
enum_factory = MagicMock()
histogram_factory = MagicMock()
counter_factory = MagicMock()
state_labels = MagicMock()
request_labels = MagicMock()
processing_labels = MagicMock()
rate_limit_labels = MagicMock()
timer = MagicMock()
enum_factory.return_value.labels.return_value = state_labels
histogram_factory.return_value.labels.return_value = request_labels
request_labels.time.return_value = timer
counter_factory.side_effect = [
MagicMock(labels=MagicMock(return_value=processing_labels)),
MagicMock(labels=MagicMock(return_value=rate_limit_labels)),
]
monkeypatch.setattr(metrics, "Enum", enum_factory)
monkeypatch.setattr(metrics, "Histogram", histogram_factory)
monkeypatch.setattr(metrics, "Counter", counter_factory)
first = metrics.ConsumerMetrics("proc", "flow", "name")
second = metrics.ConsumerMetrics("proc-2", "flow-2", "name-2")
assert enum_factory.call_count == 1
assert histogram_factory.call_count == 1
assert counter_factory.call_count == 2
first.process("ok")
first.rate_limit()
first.state("running")
assert first.record_time() is timer
processing_labels.inc.assert_called_once_with()
rate_limit_labels.inc.assert_called_once_with()
state_labels.state.assert_called_once_with("running")
def test_producer_metrics_increments_counter_once(monkeypatch):
counter_factory = MagicMock()
labels = MagicMock()
counter_factory.return_value.labels.return_value = labels
monkeypatch.setattr(metrics, "Counter", counter_factory)
producer_metrics = metrics.ProducerMetrics("proc", "flow", "output")
producer_metrics.inc()
counter_factory.assert_called_once()
labels.inc.assert_called_once_with()
def test_processor_metrics_reports_info(monkeypatch):
info_factory = MagicMock()
labels = MagicMock()
info_factory.return_value.labels.return_value = labels
monkeypatch.setattr(metrics, "Info", info_factory)
processor_metrics = metrics.ProcessorMetrics("proc")
processor_metrics.info({"kind": "test"})
info_factory.assert_called_once()
labels.info.assert_called_once_with({"kind": "test"})
def test_subscriber_metrics_tracks_received_state_and_dropped(monkeypatch):
enum_factory = MagicMock()
counter_factory = MagicMock()
state_labels = MagicMock()
received_labels = MagicMock()
dropped_labels = MagicMock()
enum_factory.return_value.labels.return_value = state_labels
counter_factory.side_effect = [
MagicMock(labels=MagicMock(return_value=received_labels)),
MagicMock(labels=MagicMock(return_value=dropped_labels)),
]
monkeypatch.setattr(metrics, "Enum", enum_factory)
monkeypatch.setattr(metrics, "Counter", counter_factory)
subscriber_metrics = metrics.SubscriberMetrics("proc", "flow", "input")
subscriber_metrics.received()
subscriber_metrics.state("running")
subscriber_metrics.dropped("ignored")
received_labels.inc.assert_called_once_with()
dropped_labels.inc.assert_called_once_with()
state_labels.state.assert_called_once_with("running")

View file

@ -4,9 +4,9 @@ from . spec import Spec
class Parameter:
def __init__(self, value):
self.value = value
async def start():
async def start(self):
pass
async def stop():
async def stop(self):
pass
class ParameterSpec(Spec):