diff --git a/tests/unit/test_gateway/test_dispatch_mux.py b/tests/unit/test_gateway/test_dispatch_mux.py index b623a1b6..a0bc9460 100644 --- a/tests/unit/test_gateway/test_dispatch_mux.py +++ b/tests/unit/test_gateway/test_dispatch_mux.py @@ -121,7 +121,11 @@ class TestMux: # Based on the code, it seems to catch exceptions await mux.receive(mock_msg) - mock_ws.send_json.assert_called_once_with({"error": "Bad message"}) + mock_ws.send_json.assert_called_once_with({ + "error": {"message": "Bad message", "type": "error"}, + "complete": True, + "id": "test-id-123", + }) @pytest.mark.asyncio async def test_mux_receive_message_without_id(self): @@ -129,23 +133,26 @@ class TestMux: mock_dispatcher_manager = MagicMock() mock_ws = AsyncMock() mock_running = MagicMock() - + mux = Mux( dispatcher_manager=mock_dispatcher_manager, ws=mock_ws, running=mock_running ) - + # Mock message without id field mock_msg = MagicMock() mock_msg.json.return_value = { "request": {"type": "test"} } - + # receive method should handle the RuntimeError internally await mux.receive(mock_msg) - - mock_ws.send_json.assert_called_once_with({"error": "Bad message"}) + + mock_ws.send_json.assert_called_once_with({ + "error": {"message": "Bad message", "type": "error"}, + "complete": True, + }) @pytest.mark.asyncio async def test_mux_receive_invalid_json(self): @@ -153,19 +160,22 @@ class TestMux: mock_dispatcher_manager = MagicMock() mock_ws = AsyncMock() mock_running = MagicMock() - + mux = Mux( dispatcher_manager=mock_dispatcher_manager, ws=mock_ws, running=mock_running ) - + # Mock message with invalid JSON mock_msg = MagicMock() mock_msg.json.side_effect = ValueError("Invalid JSON") - + # receive method should handle the ValueError internally await mux.receive(mock_msg) - + mock_msg.json.assert_called_once() - mock_ws.send_json.assert_called_once_with({"error": "Invalid JSON"}) \ No newline at end of file + mock_ws.send_json.assert_called_once_with({ + "error": {"message": "Invalid JSON", "type": "error"}, + "complete": True, + }) \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index ddaa8ddf..fabd5c44 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -33,9 +33,12 @@ class Mux: async def receive(self, msg): + request_id = None + try: data = msg.json() + request_id = data.get("id") if "request" not in data: raise RuntimeError("Bad message") @@ -51,7 +54,13 @@ class Mux: except Exception as e: logger.error(f"Receive exception: {str(e)}", exc_info=True) - await self.ws.send_json({"error": str(e)}) + error_resp = { + "error": {"message": str(e), "type": "error"}, + "complete": True, + } + if request_id: + error_resp["id"] = request_id + await self.ws.send_json(error_resp) async def maybe_tidy_workers(self, workers): @@ -97,12 +106,12 @@ class Mux: }) worker = asyncio.create_task( - self.request_task(request, responder, flow, svc) + self.request_task(id, request, responder, flow, svc) ) workers.append(worker) - async def request_task(self, request, responder, flow, svc): + async def request_task(self, id, request, responder, flow, svc): try: @@ -119,7 +128,11 @@ class Mux: ) except Exception as e: - await self.ws.send_json({"error": str(e)}) + await self.ws.send_json({ + "id": id, + "error": {"message": str(e), "type": "error"}, + "complete": True, + }) async def run(self): @@ -143,7 +156,11 @@ class Mux: except Exception as e: # This is an internal working error, may not be recoverable logger.error(f"Run prepare exception: {e}", exc_info=True) - await self.ws.send_json({"id": id, "error": str(e)}) + await self.ws.send_json({ + "id": id, + "error": {"message": str(e), "type": "error"}, + "complete": True, + }) self.running.stop() if self.ws: @@ -160,7 +177,11 @@ class Mux: except Exception as e: logger.error(f"Exception in mux: {e}", exc_info=True) - await self.ws.send_json({"error": str(e)}) + await self.ws.send_json({ + "id": id, + "error": {"message": str(e), "type": "error"}, + "complete": True, + }) self.running.stop()