diff --git a/tests/unit/test_gateway/test_dispatch_manager.py b/tests/unit/test_gateway/test_dispatch_manager.py index 6bb2e4d1..a9c17ec6 100644 --- a/tests/unit/test_gateway/test_dispatch_manager.py +++ b/tests/unit/test_gateway/test_dispatch_manager.py @@ -214,7 +214,9 @@ class TestDispatcherManager: pulsar_client=mock_pulsar_client, timeout=120, consumer="api-gateway-config-request", - subscriber="api-gateway-config-request" + subscriber="api-gateway-config-request", + request_queue=None, + response_queue=None ) mock_dispatcher.start.assert_called_once() mock_dispatcher.process.assert_called_once_with("data", "responder") diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/config.py b/trustgraph-flow/trustgraph/gateway/dispatch/config.py index c4fac5fa..10a0aab9 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/config.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/config.py @@ -7,14 +7,20 @@ from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor class ConfigRequestor(ServiceRequestor): - def __init__(self, pulsar_client, consumer, subscriber, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120, + request_queue=None, response_queue=None): + + if request_queue is None: + request_queue = config_request_queue + if response_queue is None: + response_queue = config_response_queue super(ConfigRequestor, self).__init__( pulsar_client=pulsar_client, consumer_name = consumer, subscription = subscriber, - request_queue=config_request_queue, - response_queue=config_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=ConfigRequest, response_schema=ConfigResponse, timeout=timeout, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/flow.py b/trustgraph-flow/trustgraph/gateway/dispatch/flow.py index 30f8d45e..cb641656 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/flow.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/flow.py @@ -7,14 +7,20 @@ from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor class FlowRequestor(ServiceRequestor): - def __init__(self, pulsar_client, consumer, subscriber, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120, + request_queue=None, response_queue=None): + + if request_queue is None: + request_queue = flow_request_queue + if response_queue is None: + response_queue = flow_response_queue super(FlowRequestor, self).__init__( pulsar_client=pulsar_client, consumer_name = consumer, subscription = subscriber, - request_queue=flow_request_queue, - response_queue=flow_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=FlowRequest, response_schema=FlowResponse, timeout=timeout, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py index 950b3430..b42db648 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py @@ -10,14 +10,20 @@ from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor class KnowledgeRequestor(ServiceRequestor): - def __init__(self, pulsar_client, consumer, subscriber, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120, + request_queue=None, response_queue=None): + + if request_queue is None: + request_queue = knowledge_request_queue + if response_queue is None: + response_queue = knowledge_response_queue super(KnowledgeRequestor, self).__init__( pulsar_client=pulsar_client, consumer_name = consumer, subscription = subscriber, - request_queue=knowledge_request_queue, - response_queue=knowledge_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=KnowledgeRequest, response_schema=KnowledgeResponse, timeout=timeout, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py index 2155aa5d..8fc62d54 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py @@ -9,14 +9,20 @@ from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor class LibrarianRequestor(ServiceRequestor): - def __init__(self, pulsar_client, consumer, subscriber, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120, + request_queue=None, response_queue=None): + + if request_queue is None: + request_queue = librarian_request_queue + if response_queue is None: + response_queue = librarian_response_queue super(LibrarianRequestor, self).__init__( pulsar_client=pulsar_client, consumer_name = consumer, subscription = subscriber, - request_queue=librarian_request_queue, - response_queue=librarian_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=LibrarianRequest, response_schema=LibrarianResponse, timeout=timeout, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index a1821e84..d35e5525 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -98,12 +98,17 @@ class DispatcherWrapper: class DispatcherManager: - def __init__(self, pulsar_client, config_receiver, prefix="api-gateway"): + def __init__(self, pulsar_client, config_receiver, prefix="api-gateway", + queue_overrides=None): self.pulsar_client = pulsar_client self.config_receiver = config_receiver self.config_receiver.add_handler(self) self.prefix = prefix + # Store queue overrides for global services + # Format: {"config": {"request": "...", "response": "..."}, ...} + self.queue_overrides = queue_overrides or {} + self.flows = {} self.dispatchers = {} @@ -148,11 +153,20 @@ class DispatcherManager: if key in self.dispatchers: return await self.dispatchers[key].process(data, responder) + # Get queue overrides if specified for this service + request_queue = None + response_queue = None + if kind in self.queue_overrides: + request_queue = self.queue_overrides[kind].get("request") + response_queue = self.queue_overrides[kind].get("response") + dispatcher = global_dispatchers[kind]( pulsar_client = self.pulsar_client, timeout = 120, consumer = f"{self.prefix}-{kind}-request", subscriber = f"{self.prefix}-{kind}-request", + request_queue = request_queue, + response_queue = response_queue, ) await dispatcher.start() diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index 1e2fdb23..b58348eb 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -20,6 +20,14 @@ from . endpoint.manager import EndpointManager import pulsar from prometheus_client import start_http_server +# Import default queue names +from .. schema import ( + config_request_queue, config_response_queue, + flow_request_queue, flow_response_queue, + knowledge_request_queue, knowledge_response_queue, + librarian_request_queue, librarian_response_queue, +) + logger = logging.getLogger("api") logger.setLevel(logging.INFO) @@ -70,10 +78,54 @@ class Api: self.config_receiver = ConfigReceiver(self.pulsar_client) + # Build queue overrides dictionary from CLI arguments + queue_overrides = {} + + # Config service + config_req = config.get("config_request_queue") + config_resp = config.get("config_response_queue") + if config_req or config_resp: + queue_overrides["config"] = {} + if config_req: + queue_overrides["config"]["request"] = config_req + if config_resp: + queue_overrides["config"]["response"] = config_resp + + # Flow service + flow_req = config.get("flow_request_queue") + flow_resp = config.get("flow_response_queue") + if flow_req or flow_resp: + queue_overrides["flow"] = {} + if flow_req: + queue_overrides["flow"]["request"] = flow_req + if flow_resp: + queue_overrides["flow"]["response"] = flow_resp + + # Knowledge service + knowledge_req = config.get("knowledge_request_queue") + knowledge_resp = config.get("knowledge_response_queue") + if knowledge_req or knowledge_resp: + queue_overrides["knowledge"] = {} + if knowledge_req: + queue_overrides["knowledge"]["request"] = knowledge_req + if knowledge_resp: + queue_overrides["knowledge"]["response"] = knowledge_resp + + # Librarian service + librarian_req = config.get("librarian_request_queue") + librarian_resp = config.get("librarian_response_queue") + if librarian_req or librarian_resp: + queue_overrides["librarian"] = {} + if librarian_req: + queue_overrides["librarian"]["request"] = librarian_req + if librarian_resp: + queue_overrides["librarian"]["response"] = librarian_resp + self.dispatcher_manager = DispatcherManager( pulsar_client = self.pulsar_client, config_receiver = self.config_receiver, prefix = "gateway", + queue_overrides = queue_overrides, ) self.endpoint_manager = EndpointManager( @@ -181,6 +233,55 @@ def run(): help=f'Prometheus metrics port (default: 8000)', ) + # Queue override arguments for multi-tenant deployments + parser.add_argument( + '--config-request-queue', + default=None, + help=f'Config service request queue (default: {config_request_queue})', + ) + + parser.add_argument( + '--config-response-queue', + default=None, + help=f'Config service response queue (default: {config_response_queue})', + ) + + parser.add_argument( + '--flow-request-queue', + default=None, + help=f'Flow service request queue (default: {flow_request_queue})', + ) + + parser.add_argument( + '--flow-response-queue', + default=None, + help=f'Flow service response queue (default: {flow_response_queue})', + ) + + parser.add_argument( + '--knowledge-request-queue', + default=None, + help=f'Knowledge service request queue (default: {knowledge_request_queue})', + ) + + parser.add_argument( + '--knowledge-response-queue', + default=None, + help=f'Knowledge service response queue (default: {knowledge_response_queue})', + ) + + parser.add_argument( + '--librarian-request-queue', + default=None, + help=f'Librarian service request queue (default: {librarian_request_queue})', + ) + + parser.add_argument( + '--librarian-response-queue', + default=None, + help=f'Librarian service response queue (default: {librarian_response_queue})', + ) + args = parser.parse_args() args = vars(args)