Gateway queue overrides (#584)

This commit is contained in:
cybermaggedon 2025-12-06 11:01:20 +00:00 committed by GitHub
parent 7d07f802a8
commit ba95fa226b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 155 additions and 14 deletions

View file

@ -214,7 +214,9 @@ class TestDispatcherManager:
pulsar_client=mock_pulsar_client,
timeout=120,
consumer="api-gateway-config-request",
subscriber="api-gateway-config-request"
subscriber="api-gateway-config-request",
request_queue=None,
response_queue=None
)
mock_dispatcher.start.assert_called_once()
mock_dispatcher.process.assert_called_once_with("data", "responder")

View file

@ -7,14 +7,20 @@ from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class ConfigRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120,
request_queue=None, response_queue=None):
if request_queue is None:
request_queue = config_request_queue
if response_queue is None:
response_queue = config_response_queue
super(ConfigRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=config_request_queue,
response_queue=config_response_queue,
request_queue=request_queue,
response_queue=response_queue,
request_schema=ConfigRequest,
response_schema=ConfigResponse,
timeout=timeout,

View file

@ -7,14 +7,20 @@ from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class FlowRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120,
request_queue=None, response_queue=None):
if request_queue is None:
request_queue = flow_request_queue
if response_queue is None:
response_queue = flow_response_queue
super(FlowRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=flow_request_queue,
response_queue=flow_response_queue,
request_queue=request_queue,
response_queue=response_queue,
request_schema=FlowRequest,
response_schema=FlowResponse,
timeout=timeout,

View file

@ -10,14 +10,20 @@ from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class KnowledgeRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120,
request_queue=None, response_queue=None):
if request_queue is None:
request_queue = knowledge_request_queue
if response_queue is None:
response_queue = knowledge_response_queue
super(KnowledgeRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=knowledge_request_queue,
response_queue=knowledge_response_queue,
request_queue=request_queue,
response_queue=response_queue,
request_schema=KnowledgeRequest,
response_schema=KnowledgeResponse,
timeout=timeout,

View file

@ -9,14 +9,20 @@ from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class LibrarianRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120,
request_queue=None, response_queue=None):
if request_queue is None:
request_queue = librarian_request_queue
if response_queue is None:
response_queue = librarian_response_queue
super(LibrarianRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=librarian_request_queue,
response_queue=librarian_response_queue,
request_queue=request_queue,
response_queue=response_queue,
request_schema=LibrarianRequest,
response_schema=LibrarianResponse,
timeout=timeout,

View file

@ -98,12 +98,17 @@ class DispatcherWrapper:
class DispatcherManager:
def __init__(self, pulsar_client, config_receiver, prefix="api-gateway"):
def __init__(self, pulsar_client, config_receiver, prefix="api-gateway",
queue_overrides=None):
self.pulsar_client = pulsar_client
self.config_receiver = config_receiver
self.config_receiver.add_handler(self)
self.prefix = prefix
# Store queue overrides for global services
# Format: {"config": {"request": "...", "response": "..."}, ...}
self.queue_overrides = queue_overrides or {}
self.flows = {}
self.dispatchers = {}
@ -148,11 +153,20 @@ class DispatcherManager:
if key in self.dispatchers:
return await self.dispatchers[key].process(data, responder)
# Get queue overrides if specified for this service
request_queue = None
response_queue = None
if kind in self.queue_overrides:
request_queue = self.queue_overrides[kind].get("request")
response_queue = self.queue_overrides[kind].get("response")
dispatcher = global_dispatchers[kind](
pulsar_client = self.pulsar_client,
timeout = 120,
consumer = f"{self.prefix}-{kind}-request",
subscriber = f"{self.prefix}-{kind}-request",
request_queue = request_queue,
response_queue = response_queue,
)
await dispatcher.start()

View file

@ -20,6 +20,14 @@ from . endpoint.manager import EndpointManager
import pulsar
from prometheus_client import start_http_server
# Import default queue names
from .. schema import (
config_request_queue, config_response_queue,
flow_request_queue, flow_response_queue,
knowledge_request_queue, knowledge_response_queue,
librarian_request_queue, librarian_response_queue,
)
logger = logging.getLogger("api")
logger.setLevel(logging.INFO)
@ -70,10 +78,54 @@ class Api:
self.config_receiver = ConfigReceiver(self.pulsar_client)
# Build queue overrides dictionary from CLI arguments
queue_overrides = {}
# Config service
config_req = config.get("config_request_queue")
config_resp = config.get("config_response_queue")
if config_req or config_resp:
queue_overrides["config"] = {}
if config_req:
queue_overrides["config"]["request"] = config_req
if config_resp:
queue_overrides["config"]["response"] = config_resp
# Flow service
flow_req = config.get("flow_request_queue")
flow_resp = config.get("flow_response_queue")
if flow_req or flow_resp:
queue_overrides["flow"] = {}
if flow_req:
queue_overrides["flow"]["request"] = flow_req
if flow_resp:
queue_overrides["flow"]["response"] = flow_resp
# Knowledge service
knowledge_req = config.get("knowledge_request_queue")
knowledge_resp = config.get("knowledge_response_queue")
if knowledge_req or knowledge_resp:
queue_overrides["knowledge"] = {}
if knowledge_req:
queue_overrides["knowledge"]["request"] = knowledge_req
if knowledge_resp:
queue_overrides["knowledge"]["response"] = knowledge_resp
# Librarian service
librarian_req = config.get("librarian_request_queue")
librarian_resp = config.get("librarian_response_queue")
if librarian_req or librarian_resp:
queue_overrides["librarian"] = {}
if librarian_req:
queue_overrides["librarian"]["request"] = librarian_req
if librarian_resp:
queue_overrides["librarian"]["response"] = librarian_resp
self.dispatcher_manager = DispatcherManager(
pulsar_client = self.pulsar_client,
config_receiver = self.config_receiver,
prefix = "gateway",
queue_overrides = queue_overrides,
)
self.endpoint_manager = EndpointManager(
@ -181,6 +233,55 @@ def run():
help=f'Prometheus metrics port (default: 8000)',
)
# Queue override arguments for multi-tenant deployments
parser.add_argument(
'--config-request-queue',
default=None,
help=f'Config service request queue (default: {config_request_queue})',
)
parser.add_argument(
'--config-response-queue',
default=None,
help=f'Config service response queue (default: {config_response_queue})',
)
parser.add_argument(
'--flow-request-queue',
default=None,
help=f'Flow service request queue (default: {flow_request_queue})',
)
parser.add_argument(
'--flow-response-queue',
default=None,
help=f'Flow service response queue (default: {flow_response_queue})',
)
parser.add_argument(
'--knowledge-request-queue',
default=None,
help=f'Knowledge service request queue (default: {knowledge_request_queue})',
)
parser.add_argument(
'--knowledge-response-queue',
default=None,
help=f'Knowledge service response queue (default: {knowledge_response_queue})',
)
parser.add_argument(
'--librarian-request-queue',
default=None,
help=f'Librarian service request queue (default: {librarian_request_queue})',
)
parser.add_argument(
'--librarian-response-queue',
default=None,
help=f'Librarian service response queue (default: {librarian_response_queue})',
)
args = parser.parse_args()
args = vars(args)