fix: prevent duplicate dispatcher creation race condition in invoke_global_service (#715)

* fix: prevent duplicate dispatcher creation race condition in invoke_global_service

Concurrent coroutines could all pass the `if key in self.dispatchers` check
before any of them wrote the result back, because `await dispatcher.start()`
yields to the event loop. This caused multiple Pulsar consumers to be created
on the same shared subscription, distributing responses round-robin and
dropping ~2/3 of them — manifesting as a permanent spinner in the Workbench UI.

Apply a double-checked asyncio.Lock in both `invoke_global_service` and
`invoke_flow_service` so only one dispatcher is ever created per service key.

* test: add concurrent-dispatch tests for race condition fix

Add asyncio.gather-based tests that verify invoke_global_service and
invoke_flow_service create exactly one dispatcher under concurrent calls,
preventing the duplicate Pulsar consumer bug.
This commit is contained in:
V.Sreeram 2026-04-06 15:43:59 +05:30 committed by GitHub
parent 7daa06e9e4
commit 8f18ba0257
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 133 additions and 53 deletions

View file

@ -49,7 +49,8 @@ class TestDispatcherManager:
assert manager.prefix == "api-gateway" # default prefix
assert manager.flows == {}
assert manager.dispatchers == {}
assert isinstance(manager.dispatcher_lock, asyncio.Lock)
# Verify manager was added as handler to config receiver
mock_config_receiver.add_handler.assert_called_once_with(manager)
@ -543,18 +544,99 @@ class TestDispatcherManager:
mock_backend = Mock()
mock_config_receiver = Mock()
manager = DispatcherManager(mock_backend, mock_config_receiver)
# Setup test flow with interface but unsupported kind
manager.flows["test_flow"] = {
"interfaces": {
"invalid-kind": {"request": "req", "response": "resp"}
}
}
with patch('trustgraph.gateway.dispatch.manager.request_response_dispatchers') as mock_rr_dispatchers, \
patch('trustgraph.gateway.dispatch.manager.sender_dispatchers') as mock_sender_dispatchers:
mock_rr_dispatchers.__contains__.return_value = False
mock_sender_dispatchers.__contains__.return_value = False
with pytest.raises(RuntimeError, match="Invalid kind"):
await manager.invoke_flow_service("data", "responder", "test_flow", "invalid-kind")
await manager.invoke_flow_service("data", "responder", "test_flow", "invalid-kind")
@pytest.mark.asyncio
async def test_invoke_global_service_concurrent_calls_create_single_dispatcher(self):
"""Concurrent calls for the same service must create exactly one dispatcher.
Before the fix, await dispatcher.start() yielded to the event loop and
multiple coroutines could all pass the 'key not in self.dispatchers' check
before any of them wrote the result back, creating duplicate Pulsar consumers.
"""
mock_backend = Mock()
mock_config_receiver = Mock()
manager = DispatcherManager(mock_backend, mock_config_receiver)
async def slow_start():
# Yield to the event loop so other coroutines get a chance to run,
# reproducing the window that caused the original race condition.
await asyncio.sleep(0)
with patch('trustgraph.gateway.dispatch.manager.global_dispatchers') as mock_dispatchers:
mock_dispatcher_class = Mock()
mock_dispatcher = Mock()
mock_dispatcher.start = AsyncMock(side_effect=slow_start)
mock_dispatcher.process = AsyncMock(return_value="result")
mock_dispatcher_class.return_value = mock_dispatcher
mock_dispatchers.__getitem__.return_value = mock_dispatcher_class
results = await asyncio.gather(*[
manager.invoke_global_service("data", "responder", "config")
for _ in range(5)
])
assert mock_dispatcher_class.call_count == 1, (
"Dispatcher class instantiated more than once — duplicate consumer bug"
)
assert mock_dispatcher.start.call_count == 1
assert manager.dispatchers[(None, "config")] is mock_dispatcher
assert all(r == "result" for r in results)
@pytest.mark.asyncio
async def test_invoke_flow_service_concurrent_calls_create_single_dispatcher(self):
"""Concurrent calls for the same flow+kind must create exactly one dispatcher.
invoke_flow_service has the same check-then-create pattern as
invoke_global_service and is protected by the same dispatcher_lock.
"""
mock_backend = Mock()
mock_config_receiver = Mock()
manager = DispatcherManager(mock_backend, mock_config_receiver)
manager.flows["test_flow"] = {
"interfaces": {
"agent": {
"request": "agent_request_queue",
"response": "agent_response_queue",
}
}
}
async def slow_start():
await asyncio.sleep(0)
with patch('trustgraph.gateway.dispatch.manager.request_response_dispatchers') as mock_rr_dispatchers:
mock_dispatcher_class = Mock()
mock_dispatcher = Mock()
mock_dispatcher.start = AsyncMock(side_effect=slow_start)
mock_dispatcher.process = AsyncMock(return_value="result")
mock_dispatcher_class.return_value = mock_dispatcher
mock_rr_dispatchers.__getitem__.return_value = mock_dispatcher_class
mock_rr_dispatchers.__contains__.return_value = True
results = await asyncio.gather(*[
manager.invoke_flow_service("data", "responder", "test_flow", "agent")
for _ in range(5)
])
assert mock_dispatcher_class.call_count == 1, (
"Dispatcher class instantiated more than once — duplicate consumer bug"
)
assert mock_dispatcher.start.call_count == 1
assert manager.dispatchers[("test_flow", "agent")] is mock_dispatcher
assert all(r == "result" for r in results)

View file

@ -116,6 +116,7 @@ class DispatcherManager:
self.flows = {}
self.dispatchers = {}
self.dispatcher_lock = asyncio.Lock()
async def start_flow(self, id, flow):
logger.info(f"Starting flow {id}")
@ -163,30 +164,28 @@ class DispatcherManager:
key = (None, kind)
if key in self.dispatchers:
return await self.dispatchers[key].process(data, responder)
if key not in self.dispatchers:
async with self.dispatcher_lock:
if key not in self.dispatchers:
request_queue = None
response_queue = None
if kind in self.queue_overrides:
request_queue = self.queue_overrides[kind].get("request")
response_queue = self.queue_overrides[kind].get("response")
# Get queue overrides if specified for this service
request_queue = None
response_queue = None
if kind in self.queue_overrides:
request_queue = self.queue_overrides[kind].get("request")
response_queue = self.queue_overrides[kind].get("response")
dispatcher = global_dispatchers[kind](
backend = self.backend,
timeout = 120,
consumer = f"{self.prefix}-{kind}-request",
subscriber = f"{self.prefix}-{kind}-request",
request_queue = request_queue,
response_queue = response_queue,
)
dispatcher = global_dispatchers[kind](
backend = self.backend,
timeout = 120,
consumer = f"{self.prefix}-{kind}-request",
subscriber = f"{self.prefix}-{kind}-request",
request_queue = request_queue,
response_queue = response_queue,
)
await dispatcher.start()
self.dispatchers[key] = dispatcher
await dispatcher.start()
self.dispatchers[key] = dispatcher
return await dispatcher.process(data, responder)
return await self.dispatchers[key].process(data, responder)
def dispatch_flow_import(self):
return self.process_flow_import
@ -297,36 +296,35 @@ class DispatcherManager:
key = (flow, kind)
if key in self.dispatchers:
return await self.dispatchers[key].process(data, responder)
if key not in self.dispatchers:
async with self.dispatcher_lock:
if key not in self.dispatchers:
intf_defs = self.flows[flow]["interfaces"]
intf_defs = self.flows[flow]["interfaces"]
if kind not in intf_defs:
raise RuntimeError("This kind not supported by flow")
if kind not in intf_defs:
raise RuntimeError("This kind not supported by flow")
qconfig = intf_defs[kind]
qconfig = intf_defs[kind]
if kind in request_response_dispatchers:
dispatcher = request_response_dispatchers[kind](
backend = self.backend,
request_queue = qconfig["request"],
response_queue = qconfig["response"],
timeout = 120,
consumer = f"{self.prefix}-{flow}-{kind}-request",
subscriber = f"{self.prefix}-{flow}-{kind}-request",
)
elif kind in sender_dispatchers:
dispatcher = sender_dispatchers[kind](
backend = self.backend,
queue = qconfig,
)
else:
raise RuntimeError("Invalid kind")
if kind in request_response_dispatchers:
dispatcher = request_response_dispatchers[kind](
backend = self.backend,
request_queue = qconfig["request"],
response_queue = qconfig["response"],
timeout = 120,
consumer = f"{self.prefix}-{flow}-{kind}-request",
subscriber = f"{self.prefix}-{flow}-{kind}-request",
)
elif kind in sender_dispatchers:
dispatcher = sender_dispatchers[kind](
backend = self.backend,
queue = qconfig,
)
else:
raise RuntimeError("Invalid kind")
await dispatcher.start()
await dispatcher.start()
self.dispatchers[key] = dispatcher
self.dispatchers[key] = dispatcher
return await dispatcher.process(data, responder)
return await self.dispatchers[key].process(data, responder)