Merge remote-tracking branch 'origin/master' into ts-port

This commit is contained in:
elpresidank 2026-04-26 20:07:57 -05:00
commit f8252ecd54
1038 changed files with 253274 additions and 8466 deletions

View file

@ -17,6 +17,12 @@ _real_config_loader = ConfigReceiver.config_loader
ConfigReceiver.config_loader = Mock()
def _notify(version, changes):
msg = Mock()
msg.value.return_value = Mock(version=version, changes=changes)
return msg
class TestConfigReceiver:
"""Test cases for ConfigReceiver class"""
@ -47,98 +53,70 @@ class TestConfigReceiver:
assert handler2 in config_receiver.flow_handlers
@pytest.mark.asyncio
async def test_on_config_notify_new_version(self):
"""Test on_config_notify triggers fetch for newer version"""
async def test_on_config_notify_new_version_fetches_per_workspace(self):
"""Notify with newer version fetches each affected workspace."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
config_receiver.config_version = 1
# Mock fetch_and_apply
fetch_calls = []
async def mock_fetch(**kwargs):
fetch_calls.append(kwargs)
config_receiver.fetch_and_apply = mock_fetch
# Create notify message with newer version
mock_msg = Mock()
mock_msg.value.return_value = Mock(version=2, types=["flow"])
async def mock_fetch(workspace, retry=False):
fetch_calls.append(workspace)
await config_receiver.on_config_notify(mock_msg, None, None)
config_receiver.fetch_and_apply_workspace = mock_fetch
assert len(fetch_calls) == 1
msg = _notify(2, {"flow": ["ws1", "ws2"]})
await config_receiver.on_config_notify(msg, None, None)
assert set(fetch_calls) == {"ws1", "ws2"}
assert config_receiver.config_version == 2
@pytest.mark.asyncio
async def test_on_config_notify_old_version_ignored(self):
"""Test on_config_notify ignores older versions"""
"""Older-version notifies are ignored."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
config_receiver.config_version = 5
fetch_calls = []
async def mock_fetch(**kwargs):
fetch_calls.append(kwargs)
config_receiver.fetch_and_apply = mock_fetch
# Create notify message with older version
mock_msg = Mock()
mock_msg.value.return_value = Mock(version=3, types=["flow"])
async def mock_fetch(workspace, retry=False):
fetch_calls.append(workspace)
await config_receiver.on_config_notify(mock_msg, None, None)
config_receiver.fetch_and_apply_workspace = mock_fetch
assert len(fetch_calls) == 0
msg = _notify(3, {"flow": ["ws1"]})
await config_receiver.on_config_notify(msg, None, None)
assert fetch_calls == []
@pytest.mark.asyncio
async def test_on_config_notify_irrelevant_types_ignored(self):
"""Test on_config_notify ignores types the gateway doesn't care about"""
"""Notifies without flow changes advance version but skip fetch."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
config_receiver.config_version = 1
fetch_calls = []
async def mock_fetch(**kwargs):
fetch_calls.append(kwargs)
config_receiver.fetch_and_apply = mock_fetch
# Create notify message with non-flow type
mock_msg = Mock()
mock_msg.value.return_value = Mock(version=2, types=["prompt"])
async def mock_fetch(workspace, retry=False):
fetch_calls.append(workspace)
await config_receiver.on_config_notify(mock_msg, None, None)
config_receiver.fetch_and_apply_workspace = mock_fetch
# Version should be updated but no fetch
assert len(fetch_calls) == 0
msg = _notify(2, {"prompt": ["ws1"]})
await config_receiver.on_config_notify(msg, None, None)
assert fetch_calls == []
assert config_receiver.config_version == 2
@pytest.mark.asyncio
async def test_on_config_notify_flow_type_triggers_fetch(self):
"""Test on_config_notify fetches for flow-related types"""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
config_receiver.config_version = 1
fetch_calls = []
async def mock_fetch(**kwargs):
fetch_calls.append(kwargs)
config_receiver.fetch_and_apply = mock_fetch
for type_name in ["flow", "active-flow"]:
fetch_calls.clear()
config_receiver.config_version = 1
mock_msg = Mock()
mock_msg.value.return_value = Mock(version=2, types=[type_name])
await config_receiver.on_config_notify(mock_msg, None, None)
assert len(fetch_calls) == 1, f"Expected fetch for type {type_name}"
@pytest.mark.asyncio
async def test_on_config_notify_exception_handling(self):
"""Test on_config_notify handles exceptions gracefully"""
"""on_config_notify swallows exceptions from message decode."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
# Create notify message that causes an exception
mock_msg = Mock()
mock_msg.value.side_effect = Exception("Test exception")
@ -146,19 +124,18 @@ class TestConfigReceiver:
await config_receiver.on_config_notify(mock_msg, None, None)
@pytest.mark.asyncio
async def test_fetch_and_apply_with_new_flows(self):
"""Test fetch_and_apply starts new flows"""
async def test_fetch_and_apply_workspace_starts_new_flows(self):
"""fetch_and_apply_workspace starts newly-configured flows."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
# Mock _create_config_client to return a mock client
mock_resp = Mock()
mock_resp.error = None
mock_resp.version = 5
mock_resp.config = {
"flow": {
"flow1": '{"name": "test_flow_1"}',
"flow2": '{"name": "test_flow_2"}'
"flow2": '{"name": "test_flow_2"}',
}
}
@ -167,36 +144,39 @@ class TestConfigReceiver:
config_receiver._create_config_client = Mock(return_value=mock_client)
start_flow_calls = []
async def mock_start_flow(id, flow):
start_flow_calls.append((id, flow))
async def mock_start_flow(workspace, id, flow):
start_flow_calls.append((workspace, id, flow))
config_receiver.start_flow = mock_start_flow
await config_receiver.fetch_and_apply()
await config_receiver.fetch_and_apply_workspace("default")
assert config_receiver.config_version == 5
assert "flow1" in config_receiver.flows
assert "flow2" in config_receiver.flows
assert "flow1" in config_receiver.flows["default"]
assert "flow2" in config_receiver.flows["default"]
assert len(start_flow_calls) == 2
assert all(c[0] == "default" for c in start_flow_calls)
@pytest.mark.asyncio
async def test_fetch_and_apply_with_removed_flows(self):
"""Test fetch_and_apply stops removed flows"""
async def test_fetch_and_apply_workspace_stops_removed_flows(self):
"""fetch_and_apply_workspace stops flows no longer configured."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
# Pre-populate with existing flows
config_receiver.flows = {
"flow1": {"name": "test_flow_1"},
"flow2": {"name": "test_flow_2"}
"default": {
"flow1": {"name": "test_flow_1"},
"flow2": {"name": "test_flow_2"},
}
}
# Config now only has flow1
mock_resp = Mock()
mock_resp.error = None
mock_resp.version = 5
mock_resp.config = {
"flow": {
"flow1": '{"name": "test_flow_1"}'
"flow1": '{"name": "test_flow_1"}',
}
}
@ -205,20 +185,22 @@ class TestConfigReceiver:
config_receiver._create_config_client = Mock(return_value=mock_client)
stop_flow_calls = []
async def mock_stop_flow(id, flow):
stop_flow_calls.append((id, flow))
async def mock_stop_flow(workspace, id, flow):
stop_flow_calls.append((workspace, id, flow))
config_receiver.stop_flow = mock_stop_flow
await config_receiver.fetch_and_apply()
await config_receiver.fetch_and_apply_workspace("default")
assert "flow1" in config_receiver.flows
assert "flow2" not in config_receiver.flows
assert "flow1" in config_receiver.flows["default"]
assert "flow2" not in config_receiver.flows["default"]
assert len(stop_flow_calls) == 1
assert stop_flow_calls[0][0] == "flow2"
assert stop_flow_calls[0][:2] == ("default", "flow2")
@pytest.mark.asyncio
async def test_fetch_and_apply_with_no_flows(self):
"""Test fetch_and_apply with empty config"""
async def test_fetch_and_apply_workspace_with_no_flows(self):
"""Empty workspace config clears any local flow state."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
@ -231,88 +213,100 @@ class TestConfigReceiver:
mock_client.request.return_value = mock_resp
config_receiver._create_config_client = Mock(return_value=mock_client)
await config_receiver.fetch_and_apply()
await config_receiver.fetch_and_apply_workspace("default")
assert config_receiver.flows == {}
assert config_receiver.flows.get("default", {}) == {}
assert config_receiver.config_version == 1
@pytest.mark.asyncio
async def test_start_flow_with_handlers(self):
"""Test start_flow method with multiple handlers"""
"""start_flow fans out to every registered flow handler."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
handler1 = Mock()
handler1.start_flow = Mock()
handler1.start_flow = AsyncMock()
handler2 = Mock()
handler2.start_flow = Mock()
handler2.start_flow = AsyncMock()
config_receiver.add_handler(handler1)
config_receiver.add_handler(handler2)
flow_data = {"name": "test_flow", "steps": []}
await config_receiver.start_flow("flow1", flow_data)
await config_receiver.start_flow("default", "flow1", flow_data)
handler1.start_flow.assert_called_once_with("flow1", flow_data)
handler2.start_flow.assert_called_once_with("flow1", flow_data)
handler1.start_flow.assert_awaited_once_with(
"default", "flow1", flow_data
)
handler2.start_flow.assert_awaited_once_with(
"default", "flow1", flow_data
)
@pytest.mark.asyncio
async def test_start_flow_with_handler_exception(self):
"""Test start_flow method handles handler exceptions"""
"""Handler exceptions in start_flow do not propagate."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
handler = Mock()
handler.start_flow = Mock(side_effect=Exception("Handler error"))
handler.start_flow = AsyncMock(side_effect=Exception("Handler error"))
config_receiver.add_handler(handler)
flow_data = {"name": "test_flow", "steps": []}
# Should not raise
await config_receiver.start_flow("flow1", flow_data)
await config_receiver.start_flow("default", "flow1", flow_data)
handler.start_flow.assert_called_once_with("flow1", flow_data)
handler.start_flow.assert_awaited_once_with(
"default", "flow1", flow_data
)
@pytest.mark.asyncio
async def test_stop_flow_with_handlers(self):
"""Test stop_flow method with multiple handlers"""
"""stop_flow fans out to every registered flow handler."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
handler1 = Mock()
handler1.stop_flow = Mock()
handler1.stop_flow = AsyncMock()
handler2 = Mock()
handler2.stop_flow = Mock()
handler2.stop_flow = AsyncMock()
config_receiver.add_handler(handler1)
config_receiver.add_handler(handler2)
flow_data = {"name": "test_flow", "steps": []}
await config_receiver.stop_flow("flow1", flow_data)
await config_receiver.stop_flow("default", "flow1", flow_data)
handler1.stop_flow.assert_called_once_with("flow1", flow_data)
handler2.stop_flow.assert_called_once_with("flow1", flow_data)
handler1.stop_flow.assert_awaited_once_with(
"default", "flow1", flow_data
)
handler2.stop_flow.assert_awaited_once_with(
"default", "flow1", flow_data
)
@pytest.mark.asyncio
async def test_stop_flow_with_handler_exception(self):
"""Test stop_flow method handles handler exceptions"""
"""Handler exceptions in stop_flow do not propagate."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
handler = Mock()
handler.stop_flow = Mock(side_effect=Exception("Handler error"))
handler.stop_flow = AsyncMock(side_effect=Exception("Handler error"))
config_receiver.add_handler(handler)
flow_data = {"name": "test_flow", "steps": []}
# Should not raise
await config_receiver.stop_flow("flow1", flow_data)
await config_receiver.stop_flow("default", "flow1", flow_data)
handler.stop_flow.assert_called_once_with("flow1", flow_data)
handler.stop_flow.assert_awaited_once_with(
"default", "flow1", flow_data
)
@patch('asyncio.create_task')
@pytest.mark.asyncio
@ -329,25 +323,25 @@ class TestConfigReceiver:
mock_create_task.assert_called_once()
@pytest.mark.asyncio
async def test_fetch_and_apply_mixed_flow_operations(self):
"""Test fetch_and_apply with mixed add/remove operations"""
async def test_fetch_and_apply_workspace_mixed_flow_operations(self):
"""fetch_and_apply_workspace adds, keeps and removes flows in one pass."""
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
# Pre-populate
config_receiver.flows = {
"flow1": {"name": "test_flow_1"},
"flow2": {"name": "test_flow_2"}
"default": {
"flow1": {"name": "test_flow_1"},
"flow2": {"name": "test_flow_2"},
}
}
# Config removes flow1, keeps flow2, adds flow3
mock_resp = Mock()
mock_resp.error = None
mock_resp.version = 5
mock_resp.config = {
"flow": {
"flow2": '{"name": "test_flow_2"}',
"flow3": '{"name": "test_flow_3"}'
"flow3": '{"name": "test_flow_3"}',
}
}
@ -358,20 +352,22 @@ class TestConfigReceiver:
start_calls = []
stop_calls = []
async def mock_start_flow(id, flow):
start_calls.append((id, flow))
async def mock_stop_flow(id, flow):
stop_calls.append((id, flow))
async def mock_start_flow(workspace, id, flow):
start_calls.append((workspace, id, flow))
async def mock_stop_flow(workspace, id, flow):
stop_calls.append((workspace, id, flow))
config_receiver.start_flow = mock_start_flow
config_receiver.stop_flow = mock_stop_flow
await config_receiver.fetch_and_apply()
await config_receiver.fetch_and_apply_workspace("default")
assert "flow1" not in config_receiver.flows
assert "flow2" in config_receiver.flows
assert "flow3" in config_receiver.flows
ws_flows = config_receiver.flows["default"]
assert "flow1" not in ws_flows
assert "flow2" in ws_flows
assert "flow3" in ws_flows
assert len(start_calls) == 1
assert start_calls[0][0] == "flow3"
assert start_calls[0][:2] == ("default", "flow3")
assert len(stop_calls) == 1
assert stop_calls[0][0] == "flow1"
assert stop_calls[0][:2] == ("default", "flow1")

View file

@ -0,0 +1,406 @@
"""
Round-trip unit tests for the core msgpack import/export gateway endpoints.
The kg-core export endpoint receives KnowledgeResponse-shaped dicts from
the responder callback and packs them into msgpack tuples. The kg-core
import endpoint takes msgpack tuples back off the wire and rebuilds
KnowledgeRequest-shaped dicts which it then hands to KnowledgeRequestor
(whose translator decodes them into real dataclasses).
Regression coverage: the previous wire format used `"vectors"` (plural)
in the entity blobs and embedded a stale `"m"` field that referenced the
removed `Metadata.metadata` triples-list field. The export side hit a
KeyError on first message; the import side built dicts that the
KnowledgeRequestTranslator (separately fixed) couldn't decode. These
tests pin both halves of the wire protocol.
"""
import msgpack
import pytest
from unittest.mock import AsyncMock, Mock, patch
from trustgraph.gateway.dispatch.core_export import CoreExport
from trustgraph.gateway.dispatch.core_import import CoreImport
# ---------------------------------------------------------------------------
# Helpers — sample translator-shaped dicts (as KnowledgeResponseTranslator
# would emit). The vector wire key is *singular* on purpose; the export
# side previously read the wrong key and crashed.
# ---------------------------------------------------------------------------
def _ge_response_dict():
return {
"graph-embeddings": {
"metadata": {
"id": "doc-1",
"root": "",
"collection": "testcoll",
},
"entities": [
{
"entity": {"t": "i", "i": "http://example.org/alice"},
"vector": [0.1, 0.2, 0.3],
},
{
"entity": {"t": "i", "i": "http://example.org/bob"},
"vector": [0.4, 0.5, 0.6],
},
],
}
}
def _triples_response_dict():
return {
"triples": {
"metadata": {
"id": "doc-1",
"root": "",
"collection": "testcoll",
},
"triples": [
{
"s": {"t": "i", "i": "http://example.org/alice"},
"p": {"t": "i", "i": "http://example.org/knows"},
"o": {"t": "i", "i": "http://example.org/bob"},
},
],
}
}
def _make_request(id_="doc-1", workspace="alice"):
request = Mock()
request.query = {"id": id_, "workspace": workspace}
return request
def _make_data_reader(payload: bytes):
"""Mock the aiohttp StreamReader: returns payload once, then EOF."""
chunks = [payload, b""]
data = Mock()
async def fake_read(n):
return chunks.pop(0) if chunks else b""
data.read = fake_read
return data
# ---------------------------------------------------------------------------
# Export side: translator-shaped dict -> msgpack bytes
# ---------------------------------------------------------------------------
class TestCoreExportWireFormat:
@pytest.mark.asyncio
@patch("trustgraph.gateway.dispatch.core_export.KnowledgeRequestor")
async def test_export_packs_graph_embeddings_with_singular_vector(
self, mock_kr_class,
):
"""The export side must read `ent["vector"]` and emit `v`. The
previous bug was reading `ent["vectors"]` which KeyErrored against
the translator output."""
captured = []
async def fake_kr_process(req_dict, responder):
await responder(_ge_response_dict(), True)
mock_kr = AsyncMock()
mock_kr.start = AsyncMock()
mock_kr.stop = AsyncMock()
mock_kr.process = fake_kr_process
mock_kr_class.return_value = mock_kr
response = AsyncMock()
async def fake_write(b):
captured.append(b)
response.write = fake_write
response.write_eof = AsyncMock()
ok = AsyncMock(return_value=response)
error = AsyncMock()
exporter = CoreExport(backend=Mock())
await exporter.process(
data=Mock(),
error=error,
ok=ok,
request=_make_request(),
)
# Did not raise, did not call error()
error.assert_not_called()
assert len(captured) == 1
unpacker = msgpack.Unpacker()
unpacker.feed(captured[0])
items = list(unpacker)
assert len(items) == 1
msg_type, payload = items[0]
assert msg_type == "ge"
# Metadata envelope: only id/collection — no stale `m["m"]`.
assert payload["m"] == {"i": "doc-1", "c": "testcoll"}
# Entities: each carries the *singular* `v` and the term envelope
assert len(payload["e"]) == 2
assert payload["e"][0]["v"] == [0.1, 0.2, 0.3]
assert payload["e"][1]["v"] == [0.4, 0.5, 0.6]
assert payload["e"][0]["e"]["i"] == "http://example.org/alice"
@pytest.mark.asyncio
@patch("trustgraph.gateway.dispatch.core_export.KnowledgeRequestor")
async def test_export_packs_triples(self, mock_kr_class):
captured = []
async def fake_kr_process(req_dict, responder):
await responder(_triples_response_dict(), True)
mock_kr = AsyncMock()
mock_kr.start = AsyncMock()
mock_kr.stop = AsyncMock()
mock_kr.process = fake_kr_process
mock_kr_class.return_value = mock_kr
response = AsyncMock()
async def fake_write(b):
captured.append(b)
response.write = fake_write
response.write_eof = AsyncMock()
ok = AsyncMock(return_value=response)
error = AsyncMock()
exporter = CoreExport(backend=Mock())
await exporter.process(
data=Mock(), error=error, ok=ok, request=_make_request(),
)
error.assert_not_called()
assert len(captured) == 1
unpacker = msgpack.Unpacker()
unpacker.feed(captured[0])
items = list(unpacker)
assert len(items) == 1
msg_type, payload = items[0]
assert msg_type == "t"
assert payload["m"] == {"i": "doc-1", "c": "testcoll"}
assert len(payload["t"]) == 1
# ---------------------------------------------------------------------------
# Import side: msgpack bytes -> translator-shaped dict
# ---------------------------------------------------------------------------
class TestCoreImportWireFormat:
@pytest.mark.asyncio
@patch("trustgraph.gateway.dispatch.core_import.KnowledgeRequestor")
async def test_import_unpacks_graph_embeddings_to_singular_vector(
self, mock_kr_class,
):
"""The import side must build dicts whose entity blobs have the
singular `vector` key that's what the KnowledgeRequestTranslator
decode side reads. Previous bug emitted `vectors`."""
captured = []
async def fake_kr_process(req_dict):
captured.append(req_dict)
mock_kr = AsyncMock()
mock_kr.start = AsyncMock()
mock_kr.stop = AsyncMock()
mock_kr.process = fake_kr_process
mock_kr_class.return_value = mock_kr
# Build a msgpack tuple matching the new wire format
payload = msgpack.packb((
"ge",
{
"m": {"i": "doc-1", "c": "testcoll"},
"e": [
{
"e": {"t": "i", "i": "http://example.org/alice"},
"v": [0.1, 0.2, 0.3],
},
],
},
))
ok = AsyncMock(return_value=AsyncMock(write_eof=AsyncMock()))
error = AsyncMock()
importer = CoreImport(backend=Mock())
await importer.process(
data=_make_data_reader(payload),
error=error,
ok=ok,
request=_make_request(),
)
error.assert_not_called()
assert len(captured) == 1
req = captured[0]
assert req["operation"] == "put-kg-core"
assert req["workspace"] == "alice"
assert req["id"] == "doc-1"
ge = req["graph-embeddings"]
# Metadata envelope must NOT contain a stale `metadata` key
# referencing the removed Metadata.metadata field.
assert "metadata" not in ge["metadata"]
assert ge["metadata"] == {
"id": "doc-1",
"collection": "default",
}
# Entity blob carries the singular `vector` key
assert len(ge["entities"]) == 1
ent = ge["entities"][0]
assert ent["vector"] == [0.1, 0.2, 0.3]
assert "vectors" not in ent
@pytest.mark.asyncio
@patch("trustgraph.gateway.dispatch.core_import.KnowledgeRequestor")
async def test_import_unpacks_triples(self, mock_kr_class):
captured = []
async def fake_kr_process(req_dict):
captured.append(req_dict)
mock_kr = AsyncMock()
mock_kr.start = AsyncMock()
mock_kr.stop = AsyncMock()
mock_kr.process = fake_kr_process
mock_kr_class.return_value = mock_kr
payload = msgpack.packb((
"t",
{
"m": {"i": "doc-1", "c": "testcoll"},
"t": [
{
"s": {"t": "i", "i": "http://example.org/alice"},
"p": {"t": "i", "i": "http://example.org/knows"},
"o": {"t": "i", "i": "http://example.org/bob"},
},
],
},
))
ok = AsyncMock(return_value=AsyncMock(write_eof=AsyncMock()))
error = AsyncMock()
importer = CoreImport(backend=Mock())
await importer.process(
data=_make_data_reader(payload),
error=error,
ok=ok,
request=_make_request(),
)
error.assert_not_called()
assert len(captured) == 1
req = captured[0]
triples = req["triples"]
assert "metadata" not in triples["metadata"] # no stale field
assert len(triples["triples"]) == 1
# ---------------------------------------------------------------------------
# Full round-trip: export bytes feed directly into import
# ---------------------------------------------------------------------------
class TestCoreImportExportRoundTrip:
"""End-to-end: produce bytes via core_export, consume them via
core_import, and verify the dict that lands at the import-side
translator is structurally equivalent to what went in. This is the
test that catches asymmetries between the two halves."""
@pytest.mark.asyncio
@patch("trustgraph.gateway.dispatch.core_import.KnowledgeRequestor")
@patch("trustgraph.gateway.dispatch.core_export.KnowledgeRequestor")
async def test_graph_embeddings_round_trip(
self, mock_export_kr_class, mock_import_kr_class,
):
# ----- export side: capture bytes -----
export_bytes = []
async def fake_export_process(req_dict, responder):
await responder(_ge_response_dict(), True)
export_kr = AsyncMock()
export_kr.start = AsyncMock()
export_kr.stop = AsyncMock()
export_kr.process = fake_export_process
mock_export_kr_class.return_value = export_kr
response = AsyncMock()
async def fake_write(b):
export_bytes.append(b)
response.write = fake_write
response.write_eof = AsyncMock()
exporter = CoreExport(backend=Mock())
await exporter.process(
data=Mock(),
error=AsyncMock(),
ok=AsyncMock(return_value=response),
request=_make_request(),
)
assert len(export_bytes) == 1
# ----- import side: feed those bytes back in -----
import_captured = []
async def fake_import_process(req_dict):
import_captured.append(req_dict)
import_kr = AsyncMock()
import_kr.start = AsyncMock()
import_kr.stop = AsyncMock()
import_kr.process = fake_import_process
mock_import_kr_class.return_value = import_kr
importer = CoreImport(backend=Mock())
await importer.process(
data=_make_data_reader(export_bytes[0]),
error=AsyncMock(),
ok=AsyncMock(return_value=AsyncMock(write_eof=AsyncMock())),
request=_make_request(),
)
# ----- verify the dict the importer would hand to the translator -----
assert len(import_captured) == 1
req = import_captured[0]
original = _ge_response_dict()["graph-embeddings"]
ge = req["graph-embeddings"]
# The import side overrides id from the URL query (intentional),
# so we only round-trip the entity payload itself.
assert ge["metadata"]["id"] == original["metadata"]["id"]
assert len(ge["entities"]) == len(original["entities"])
for got, want in zip(ge["entities"], original["entities"]):
assert got["vector"] == want["vector"]
assert got["entity"] == want["entity"]

View file

@ -72,10 +72,10 @@ class TestDispatcherManager:
flow_data = {"name": "test_flow", "steps": []}
await manager.start_flow("flow1", flow_data)
assert "flow1" in manager.flows
assert manager.flows["flow1"] == flow_data
await manager.start_flow("default", "flow1", flow_data)
assert ("default", "flow1") in manager.flows
assert manager.flows[("default", "flow1")] == flow_data
@pytest.mark.asyncio
async def test_stop_flow(self):
@ -86,11 +86,11 @@ class TestDispatcherManager:
# Pre-populate with a flow
flow_data = {"name": "test_flow", "steps": []}
manager.flows["flow1"] = flow_data
await manager.stop_flow("flow1", flow_data)
assert "flow1" not in manager.flows
manager.flows[("default", "flow1")] = flow_data
await manager.stop_flow("default", "flow1", flow_data)
assert ("default", "flow1") not in manager.flows
def test_dispatch_global_service_returns_wrapper(self):
"""Test dispatch_global_service returns DispatcherWrapper"""
@ -275,12 +275,12 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"triples-store": {"queue": "test_queue"}
"triples-store": {"flow": "test_queue"}
}
}
with patch('trustgraph.gateway.dispatch.manager.import_dispatchers') as mock_dispatchers, \
patch('uuid.uuid4') as mock_uuid:
mock_uuid.return_value = "test-uuid"
@ -290,7 +290,7 @@ class TestDispatcherManager:
mock_dispatcher_class.return_value = mock_dispatcher
mock_dispatchers.__getitem__.return_value = mock_dispatcher_class
mock_dispatchers.__contains__.return_value = True
params = {"flow": "test_flow", "kind": "triples"}
result = await manager.process_flow_import("ws", "running", params)
@ -298,7 +298,7 @@ class TestDispatcherManager:
backend=mock_backend,
ws="ws",
running="running",
queue={"queue": "test_queue"}
queue="test_queue"
)
mock_dispatcher.start.assert_called_once()
assert result == mock_dispatcher
@ -326,12 +326,12 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"triples-store": {"queue": "test_queue"}
"triples-store": {"flow": "test_queue"}
}
}
with patch('trustgraph.gateway.dispatch.manager.import_dispatchers') as mock_dispatchers:
mock_dispatchers.__contains__.return_value = False
@ -348,12 +348,12 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"triples-store": {"queue": "test_queue"}
"triples-store": {"flow": "test_queue"}
}
}
with patch('trustgraph.gateway.dispatch.manager.export_dispatchers') as mock_dispatchers, \
patch('uuid.uuid4') as mock_uuid:
mock_uuid.return_value = "test-uuid"
@ -370,7 +370,7 @@ class TestDispatcherManager:
backend=mock_backend,
ws="ws",
running="running",
queue={"queue": "test_queue"},
queue="test_queue",
consumer="api-gateway-test-uuid",
subscriber="api-gateway-test-uuid"
)
@ -404,7 +404,7 @@ class TestDispatcherManager:
params = {"flow": "test_flow", "kind": "agent"}
result = await manager.process_flow_service("data", "responder", params)
manager.invoke_flow_service.assert_called_once_with("data", "responder", "test_flow", "agent")
manager.invoke_flow_service.assert_called_once_with("data", "responder", "default", "test_flow", "agent")
assert result == "flow_result"
@pytest.mark.asyncio
@ -415,14 +415,14 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Add flow to the flows dictionary
manager.flows["test_flow"] = {"services": {"agent": {}}}
manager.flows[("default", "test_flow")] = {"services": {"agent": {}}}
# Pre-populate with existing dispatcher
mock_dispatcher = Mock()
mock_dispatcher.process = AsyncMock(return_value="cached_result")
manager.dispatchers[("test_flow", "agent")] = mock_dispatcher
result = await manager.invoke_flow_service("data", "responder", "test_flow", "agent")
manager.dispatchers[("default", "test_flow", "agent")] = mock_dispatcher
result = await manager.invoke_flow_service("data", "responder", "default", "test_flow", "agent")
mock_dispatcher.process.assert_called_once_with("data", "responder")
assert result == "cached_result"
@ -435,7 +435,7 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"agent": {
"request": "agent_request_queue",
@ -443,7 +443,7 @@ class TestDispatcherManager:
}
}
}
with patch('trustgraph.gateway.dispatch.manager.request_response_dispatchers') as mock_dispatchers:
mock_dispatcher_class = Mock()
mock_dispatcher = Mock()
@ -452,23 +452,23 @@ class TestDispatcherManager:
mock_dispatcher_class.return_value = mock_dispatcher
mock_dispatchers.__getitem__.return_value = mock_dispatcher_class
mock_dispatchers.__contains__.return_value = True
result = await manager.invoke_flow_service("data", "responder", "test_flow", "agent")
result = await manager.invoke_flow_service("data", "responder", "default", "test_flow", "agent")
# Verify dispatcher was created with correct parameters
mock_dispatcher_class.assert_called_once_with(
backend=mock_backend,
request_queue="agent_request_queue",
response_queue="agent_response_queue",
timeout=120,
consumer="api-gateway-test_flow-agent-request",
subscriber="api-gateway-test_flow-agent-request"
consumer="api-gateway-default-test_flow-agent-request",
subscriber="api-gateway-default-test_flow-agent-request"
)
mock_dispatcher.start.assert_called_once()
mock_dispatcher.process.assert_called_once_with("data", "responder")
# Verify dispatcher was cached
assert manager.dispatchers[("test_flow", "agent")] == mock_dispatcher
assert manager.dispatchers[("default", "test_flow", "agent")] == mock_dispatcher
assert result == "new_result"
@pytest.mark.asyncio
@ -479,36 +479,36 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"text-load": {"queue": "text_load_queue"}
"text-load": {"flow": "text_load_queue"}
}
}
with patch('trustgraph.gateway.dispatch.manager.request_response_dispatchers') as mock_rr_dispatchers, \
patch('trustgraph.gateway.dispatch.manager.sender_dispatchers') as mock_sender_dispatchers:
mock_rr_dispatchers.__contains__.return_value = False
mock_sender_dispatchers.__contains__.return_value = True
mock_dispatcher_class = Mock()
mock_dispatcher = Mock()
mock_dispatcher.start = AsyncMock()
mock_dispatcher.process = AsyncMock(return_value="sender_result")
mock_dispatcher_class.return_value = mock_dispatcher
mock_sender_dispatchers.__getitem__.return_value = mock_dispatcher_class
result = await manager.invoke_flow_service("data", "responder", "test_flow", "text-load")
result = await manager.invoke_flow_service("data", "responder", "default", "test_flow", "text-load")
# Verify dispatcher was created with correct parameters
mock_dispatcher_class.assert_called_once_with(
backend=mock_backend,
queue={"queue": "text_load_queue"}
queue="text_load_queue"
)
mock_dispatcher.start.assert_called_once()
mock_dispatcher.process.assert_called_once_with("data", "responder")
# Verify dispatcher was cached
assert manager.dispatchers[("test_flow", "text-load")] == mock_dispatcher
assert manager.dispatchers[("default", "test_flow", "text-load")] == mock_dispatcher
assert result == "sender_result"
@pytest.mark.asyncio
@ -519,7 +519,7 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
with pytest.raises(RuntimeError, match="Invalid flow"):
await manager.invoke_flow_service("data", "responder", "invalid_flow", "agent")
await manager.invoke_flow_service("data", "responder", "default", "invalid_flow", "agent")
@pytest.mark.asyncio
async def test_invoke_flow_service_unsupported_kind_by_flow(self):
@ -529,14 +529,14 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow without agent interface
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"text-completion": {"request": "req", "response": "resp"}
}
}
with pytest.raises(RuntimeError, match="This kind not supported by flow"):
await manager.invoke_flow_service("data", "responder", "test_flow", "agent")
await manager.invoke_flow_service("data", "responder", "default", "test_flow", "agent")
@pytest.mark.asyncio
async def test_invoke_flow_service_invalid_kind(self):
@ -546,7 +546,7 @@ class TestDispatcherManager:
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow with interface but unsupported kind
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"invalid-kind": {"request": "req", "response": "resp"}
}
@ -558,7 +558,7 @@ class TestDispatcherManager:
mock_sender_dispatchers.__contains__.return_value = False
with pytest.raises(RuntimeError, match="Invalid kind"):
await manager.invoke_flow_service("data", "responder", "test_flow", "invalid-kind")
await manager.invoke_flow_service("data", "responder", "default", "test_flow", "invalid-kind")
@pytest.mark.asyncio
async def test_invoke_global_service_concurrent_calls_create_single_dispatcher(self):
@ -608,7 +608,7 @@ class TestDispatcherManager:
mock_config_receiver = Mock()
manager = DispatcherManager(mock_backend, mock_config_receiver)
manager.flows["test_flow"] = {
manager.flows[("default", "test_flow")] = {
"interfaces": {
"agent": {
"request": "agent_request_queue",
@ -630,7 +630,7 @@ class TestDispatcherManager:
mock_rr_dispatchers.__contains__.return_value = True
results = await asyncio.gather(*[
manager.invoke_flow_service("data", "responder", "test_flow", "agent")
manager.invoke_flow_service("data", "responder", "default", "test_flow", "agent")
for _ in range(5)
])
@ -638,5 +638,5 @@ class TestDispatcherManager:
"Dispatcher class instantiated more than once — duplicate consumer bug"
)
assert mock_dispatcher.start.call_count == 1
assert manager.dispatchers[("test_flow", "agent")] is mock_dispatcher
assert manager.dispatchers[("default", "test_flow", "agent")] is mock_dispatcher
assert all(r == "result" for r in results)

View file

@ -0,0 +1,75 @@
"""Tests for Gateway i18n pack endpoint."""
import json
from unittest.mock import MagicMock
import pytest
from aiohttp import web
from trustgraph.gateway.endpoint.i18n import I18nPackEndpoint
class TestI18nPackEndpoint:
def test_i18n_endpoint_initialization(self):
mock_auth = MagicMock()
endpoint = I18nPackEndpoint(
endpoint_path="/api/v1/i18n/packs/{lang}",
auth=mock_auth,
)
assert endpoint.path == "/api/v1/i18n/packs/{lang}"
assert endpoint.auth == mock_auth
assert endpoint.operation == "service"
@pytest.mark.asyncio
async def test_i18n_endpoint_start_method(self):
mock_auth = MagicMock()
endpoint = I18nPackEndpoint("/api/v1/i18n/packs/{lang}", mock_auth)
await endpoint.start()
def test_add_routes_registers_get_handler(self):
mock_auth = MagicMock()
mock_app = MagicMock()
endpoint = I18nPackEndpoint("/api/v1/i18n/packs/{lang}", mock_auth)
endpoint.add_routes(mock_app)
mock_app.add_routes.assert_called_once()
call_args = mock_app.add_routes.call_args[0][0]
assert len(call_args) == 1
@pytest.mark.asyncio
async def test_handle_unauthorized_on_invalid_auth_scheme(self):
mock_auth = MagicMock()
mock_auth.permitted.return_value = True
endpoint = I18nPackEndpoint("/api/v1/i18n/packs/{lang}", mock_auth)
request = MagicMock()
request.path = "/api/v1/i18n/packs/en"
request.headers = {"Authorization": "Token abc"}
request.match_info = {"lang": "en"}
resp = await endpoint.handle(request)
assert isinstance(resp, web.HTTPUnauthorized)
@pytest.mark.asyncio
async def test_handle_returns_pack_when_permitted(self):
mock_auth = MagicMock()
mock_auth.permitted.return_value = True
endpoint = I18nPackEndpoint("/api/v1/i18n/packs/{lang}", mock_auth)
request = MagicMock()
request.path = "/api/v1/i18n/packs/en"
request.headers = {}
request.match_info = {"lang": "en"}
resp = await endpoint.handle(request)
assert resp.status == 200
payload = json.loads(resp.body.decode("utf-8"))
assert isinstance(payload, dict)
assert "cli.verify_system_status.title" in payload

View file

@ -0,0 +1,241 @@
"""
Unit tests for entity contexts import dispatcher.
Tests the business logic of EntityContextsImport while mocking the
Publisher and websocket components.
Regression coverage: a previous version constructed Metadata(metadata=...)
which raised TypeError at runtime as soon as a message was received. These
tests exercise receive() end-to-end so any future schema/kwarg drift in
the Metadata or EntityContexts construction is caught immediately.
"""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from trustgraph.gateway.dispatch.entity_contexts_import import EntityContextsImport
from trustgraph.schema import EntityContexts, EntityContext, Metadata
@pytest.fixture
def mock_backend():
return Mock()
@pytest.fixture
def mock_running():
running = Mock()
running.get.return_value = True
running.stop = Mock()
return running
@pytest.fixture
def mock_websocket():
ws = Mock()
ws.close = AsyncMock()
return ws
@pytest.fixture
def sample_message():
"""Sample entity-contexts websocket message."""
return {
"metadata": {
"id": "doc-123",
"user": "testuser",
"collection": "testcollection",
},
"entities": [
{
"entity": {"v": "http://example.org/alice", "e": True},
"context": "Alice is a person.",
},
{
"entity": {"v": "http://example.org/bob", "e": True},
"context": "Bob is a person.",
},
],
}
@pytest.fixture
def empty_entities_message():
return {
"metadata": {
"id": "doc-empty",
"user": "u",
"collection": "c",
},
"entities": [],
}
class TestEntityContextsImportInitialization:
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
def test_init_creates_publisher_with_correct_params(
self, mock_publisher_class, mock_backend, mock_websocket, mock_running
):
instance = Mock()
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=mock_websocket,
running=mock_running,
backend=mock_backend,
queue="ec-queue",
)
mock_publisher_class.assert_called_once_with(
mock_backend,
topic="ec-queue",
schema=EntityContexts,
)
assert dispatcher.ws is mock_websocket
assert dispatcher.running is mock_running
assert dispatcher.publisher is instance
class TestEntityContextsImportLifecycle:
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
@pytest.mark.asyncio
async def test_start_calls_publisher_start(
self, mock_publisher_class, mock_backend, mock_websocket, mock_running
):
instance = Mock()
instance.start = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
await dispatcher.start()
instance.start.assert_called_once()
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
@pytest.mark.asyncio
async def test_destroy_stops_and_closes_properly(
self, mock_publisher_class, mock_backend, mock_websocket, mock_running
):
instance = Mock()
instance.stop = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
await dispatcher.destroy()
mock_running.stop.assert_called_once()
instance.stop.assert_called_once()
mock_websocket.close.assert_called_once()
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
@pytest.mark.asyncio
async def test_destroy_handles_none_websocket(
self, mock_publisher_class, mock_backend, mock_running
):
instance = Mock()
instance.stop = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=None, running=mock_running,
backend=mock_backend, queue="q",
)
await dispatcher.destroy()
mock_running.stop.assert_called_once()
instance.stop.assert_called_once()
class TestEntityContextsImportMessageProcessing:
"""Regression coverage for receive(): catches Metadata/schema drift."""
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
@pytest.mark.asyncio
async def test_receive_constructs_entity_contexts_correctly(
self, mock_publisher_class, mock_backend, mock_websocket,
mock_running, sample_message,
):
instance = Mock()
instance.send = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
mock_msg = Mock()
mock_msg.json.return_value = sample_message
# If Metadata or EntityContexts gain/lose kwargs, this raises
# TypeError — exactly the regression we want to catch.
await dispatcher.receive(mock_msg)
instance.send.assert_called_once()
call_args = instance.send.call_args
assert call_args[0][0] is None
sent = call_args[0][1]
assert isinstance(sent, EntityContexts)
assert isinstance(sent.metadata, Metadata)
assert sent.metadata.id == "doc-123"
assert sent.metadata.collection == "testcollection"
assert len(sent.entities) == 2
assert all(isinstance(e, EntityContext) for e in sent.entities)
assert sent.entities[0].context == "Alice is a person."
assert sent.entities[1].context == "Bob is a person."
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
@pytest.mark.asyncio
async def test_receive_handles_empty_entities(
self, mock_publisher_class, mock_backend, mock_websocket,
mock_running, empty_entities_message,
):
instance = Mock()
instance.send = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
mock_msg = Mock()
mock_msg.json.return_value = empty_entities_message
await dispatcher.receive(mock_msg)
instance.send.assert_called_once()
sent = instance.send.call_args[0][1]
assert isinstance(sent, EntityContexts)
assert sent.entities == []
assert sent.metadata.id == "doc-empty"
@patch('trustgraph.gateway.dispatch.entity_contexts_import.Publisher')
@pytest.mark.asyncio
async def test_receive_propagates_publisher_errors(
self, mock_publisher_class, mock_backend, mock_websocket,
mock_running, sample_message,
):
instance = Mock()
instance.send = AsyncMock(side_effect=RuntimeError("publish failed"))
mock_publisher_class.return_value = instance
dispatcher = EntityContextsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
mock_msg = Mock()
mock_msg.json.return_value = sample_message
with pytest.raises(RuntimeError, match="publish failed"):
await dispatcher.receive(mock_msg)

View file

@ -158,7 +158,7 @@ class TestAgentExplainTriples:
translator = AgentResponseTranslator()
response = AgentResponse(
chunk_type="explain",
message_type="explain",
content="",
explain_id="urn:trustgraph:agent:session:abc123",
explain_graph="urn:graph:retrieval",
@ -179,7 +179,7 @@ class TestAgentExplainTriples:
translator = AgentResponseTranslator()
response = AgentResponse(
chunk_type="thought",
message_type="thought",
content="I need to think...",
)
@ -190,7 +190,7 @@ class TestAgentExplainTriples:
translator = AgentResponseTranslator()
response = AgentResponse(
chunk_type="explain",
message_type="explain",
explain_id="urn:trustgraph:agent:session:abc123",
explain_triples=sample_triples(),
end_of_dialog=False,
@ -203,7 +203,7 @@ class TestAgentExplainTriples:
translator = AgentResponseTranslator()
response = AgentResponse(
chunk_type="answer",
message_type="answer",
content="The answer is...",
end_of_dialog=True,
)

View file

@ -0,0 +1,246 @@
"""
Unit tests for graph embeddings import dispatcher.
Tests the business logic of GraphEmbeddingsImport while mocking the
Publisher and websocket components.
Regression coverage: a previous version of EntityContextsImport
constructed Metadata(metadata=...) which raised TypeError at runtime as
soon as a message was received. The same shape of bug can occur here, so
these tests exercise receive() end-to-end to catch any future schema or
kwarg drift in Metadata / GraphEmbeddings / EntityEmbeddings construction.
"""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from trustgraph.gateway.dispatch.graph_embeddings_import import GraphEmbeddingsImport
from trustgraph.schema import GraphEmbeddings, EntityEmbeddings, Metadata
@pytest.fixture
def mock_backend():
return Mock()
@pytest.fixture
def mock_running():
running = Mock()
running.get.return_value = True
running.stop = Mock()
return running
@pytest.fixture
def mock_websocket():
ws = Mock()
ws.close = AsyncMock()
return ws
@pytest.fixture
def sample_message():
"""Sample graph-embeddings websocket message."""
return {
"metadata": {
"id": "doc-123",
"user": "testuser",
"collection": "testcollection",
},
"entities": [
{
"entity": {"v": "http://example.org/alice", "e": True},
"vector": [0.1, 0.2, 0.3],
},
{
"entity": {"v": "http://example.org/bob", "e": True},
"vector": [0.4, 0.5, 0.6],
},
],
}
@pytest.fixture
def empty_entities_message():
return {
"metadata": {
"id": "doc-empty",
"user": "u",
"collection": "c",
},
"entities": [],
}
class TestGraphEmbeddingsImportInitialization:
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
def test_init_creates_publisher_with_correct_params(
self, mock_publisher_class, mock_backend, mock_websocket, mock_running
):
instance = Mock()
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=mock_websocket,
running=mock_running,
backend=mock_backend,
queue="ge-queue",
)
mock_publisher_class.assert_called_once_with(
mock_backend,
topic="ge-queue",
schema=GraphEmbeddings,
)
assert dispatcher.ws is mock_websocket
assert dispatcher.running is mock_running
assert dispatcher.publisher is instance
class TestGraphEmbeddingsImportLifecycle:
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
@pytest.mark.asyncio
async def test_start_calls_publisher_start(
self, mock_publisher_class, mock_backend, mock_websocket, mock_running
):
instance = Mock()
instance.start = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
await dispatcher.start()
instance.start.assert_called_once()
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
@pytest.mark.asyncio
async def test_destroy_stops_and_closes_properly(
self, mock_publisher_class, mock_backend, mock_websocket, mock_running
):
instance = Mock()
instance.stop = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
await dispatcher.destroy()
mock_running.stop.assert_called_once()
instance.stop.assert_called_once()
mock_websocket.close.assert_called_once()
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
@pytest.mark.asyncio
async def test_destroy_handles_none_websocket(
self, mock_publisher_class, mock_backend, mock_running
):
instance = Mock()
instance.stop = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=None, running=mock_running,
backend=mock_backend, queue="q",
)
await dispatcher.destroy()
mock_running.stop.assert_called_once()
instance.stop.assert_called_once()
class TestGraphEmbeddingsImportMessageProcessing:
"""Regression coverage for receive(): catches Metadata/schema drift."""
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
@pytest.mark.asyncio
async def test_receive_constructs_graph_embeddings_correctly(
self, mock_publisher_class, mock_backend, mock_websocket,
mock_running, sample_message,
):
instance = Mock()
instance.send = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
mock_msg = Mock()
mock_msg.json.return_value = sample_message
# If Metadata, GraphEmbeddings, or EntityEmbeddings gain/lose
# kwargs, this raises TypeError — exactly the regression we want
# to catch.
await dispatcher.receive(mock_msg)
instance.send.assert_called_once()
call_args = instance.send.call_args
assert call_args[0][0] is None
sent = call_args[0][1]
assert isinstance(sent, GraphEmbeddings)
assert isinstance(sent.metadata, Metadata)
assert sent.metadata.id == "doc-123"
assert sent.metadata.collection == "testcollection"
assert len(sent.entities) == 2
assert all(isinstance(e, EntityEmbeddings) for e in sent.entities)
# Lock in the wire format: incoming "vector" key (singular,
# list[float]) maps to EntityEmbeddings.vector. This mirrors
# serialize_graph_embeddings() on the export side.
assert sent.entities[0].vector == [0.1, 0.2, 0.3]
assert sent.entities[1].vector == [0.4, 0.5, 0.6]
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
@pytest.mark.asyncio
async def test_receive_handles_empty_entities(
self, mock_publisher_class, mock_backend, mock_websocket,
mock_running, empty_entities_message,
):
instance = Mock()
instance.send = AsyncMock()
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
mock_msg = Mock()
mock_msg.json.return_value = empty_entities_message
await dispatcher.receive(mock_msg)
instance.send.assert_called_once()
sent = instance.send.call_args[0][1]
assert isinstance(sent, GraphEmbeddings)
assert sent.entities == []
assert sent.metadata.id == "doc-empty"
@patch('trustgraph.gateway.dispatch.graph_embeddings_import.Publisher')
@pytest.mark.asyncio
async def test_receive_propagates_publisher_errors(
self, mock_publisher_class, mock_backend, mock_websocket,
mock_running, sample_message,
):
instance = Mock()
instance.send = AsyncMock(side_effect=RuntimeError("publish failed"))
mock_publisher_class.return_value = instance
dispatcher = GraphEmbeddingsImport(
ws=mock_websocket, running=mock_running,
backend=mock_backend, queue="q",
)
mock_msg = Mock()
mock_msg.json.return_value = sample_message
with pytest.raises(RuntimeError, match="publish failed"):
await dispatcher.receive(mock_msg)

View file

@ -235,7 +235,6 @@ class TestRowsImportMessageProcessing:
# Check metadata
assert sent_object.metadata.id == "obj-123"
assert sent_object.metadata.user == "testuser"
assert sent_object.metadata.collection == "testcollection"
@patch('trustgraph.gateway.dispatch.rows_import.Publisher')

View file

@ -171,6 +171,14 @@ class TestApi:
patch('aiohttp.web.run_app') as mock_run_app:
mock_get_pubsub.return_value = Mock()
# Api.run() passes self.app_factory() — a coroutine — to
# web.run_app, which would normally consume it inside its own
# event loop. Since we mock run_app, close the coroutine here
# so it doesn't leak as an "unawaited coroutine" RuntimeWarning.
def _consume_coro(coro, **kwargs):
coro.close()
mock_run_app.side_effect = _consume_coro
api = Api(port=8080)
api.run()

View file

@ -23,7 +23,6 @@ class TestTextDocumentTranslator:
)
assert msg.metadata.id == "doc-1"
assert msg.metadata.user == "alice"
assert msg.metadata.collection == "research"
assert msg.text == payload.encode("utf-8")