diff --git a/arch/Dockerfile b/arch/Dockerfile index 29793882..5d7f1df6 100644 --- a/arch/Dockerfile +++ b/arch/Dockerfile @@ -32,7 +32,7 @@ COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy WORKDIR /app COPY arch/requirements.txt . RUN pip install -r requirements.txt -COPY arch/tools/cli/config_generator.py . +COPY arch/tools . COPY arch/envoy.template.yaml . COPY arch/arch_config_schema.yaml . COPY arch/supervisord.conf /etc/supervisor/conf.d/supervisord.conf diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index c18987fd..872639db 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -395,137 +395,6 @@ static_resources: {% endif %} {% endfor %} - # Listeners for LLMs - {% for listener in listeners %} - - {% if listener.llm_providers %} - - - name: {{ listener.name | replace(" ", "_") }} - address: - socket_address: - address: {{ listener.address }} - port_value: {{ listener.port }} - filter_chains: - - filters: - - name: envoy.filters.network.http_connection_manager - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - generate_request_id: true - tracing: - provider: - name: envoy.tracers.opentelemetry - typed_config: - "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig - grpc_service: - envoy_grpc: - cluster_name: opentelemetry_collector - timeout: 0.250s - service_name: egress_traffic_llm - random_sampling: - value: {{ arch_tracing.random_sampling }} - {% endif %} - stat_prefix: egress_traffic - codec_type: AUTO - scheme_header_transformation: - scheme_to_overwrite: https - access_log: - - name: envoy.access_loggers.file - typed_config: - "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - path: "/var/log/access_llm.log" - route_config: - name: local_routes - virtual_hosts: - - name: local_service - domains: - - "*" - routes: - {% for provider in listener.llm_providers %} - # if endpoint is set then use custom cluster for upstream llm - {% if provider.endpoint %} - {% set llm_cluster_name = provider.name %} - {% else %} - {% set llm_cluster_name = provider.provider_interface %} - {% endif %} - - match: - prefix: "/" - headers: - - name: "x-arch-llm-provider" - string_match: - exact: {{ llm_cluster_name }} - route: - auto_host_rewrite: true - cluster: {{ llm_cluster_name }} - timeout: 60s - {% endfor %} - - match: - prefix: "/" - direct_response: - status: 400 - body: - inline_string: "x-arch-llm-provider header not set, llm gateway cannot perform routing\n" - http_filters: - - name: envoy.filters.http.compressor - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor - compressor_library: - name: envoy.compression.brotli.compressor - typed_config: - "@type": type.googleapis.com/envoy.extensions.compression.brotli.compressor.v3.Brotli - chunk_size: 8192 - - name: envoy.filters.http.compressor - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor - compressor_library: - name: compress - typed_config: - "@type": type.googleapis.com/envoy.extensions.compression.gzip.compressor.v3.Gzip - memory_level: 3 - window_bits: 10 - - name: envoy.filters.http.wasm - typed_config: - "@type": type.googleapis.com/udpa.type.v1.TypedStruct - type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm - value: - config: - name: "http_config" - root_id: llm_gateway - configuration: - "@type": "type.googleapis.com/google.protobuf.StringValue" - value: | - {{ arch_llm_config | indent(32) }} - vm_config: - runtime: "envoy.wasm.runtime.v8" - code: - local: - filename: "/etc/envoy/proxy-wasm-plugins/llm_gateway.wasm" - - name: envoy.filters.http.decompressor - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.decompressor.v3.Decompressor - decompressor_library: - name: decompress - typed_config: - "@type": "type.googleapis.com/envoy.extensions.compression.gzip.decompressor.v3.Gzip" - chunk_size: 8192 - # If this ratio is set too low, then body data will not be decompressed completely. - max_inflate_ratio: 1000 - - name: envoy.filters.http.decompressor - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.decompressor.v3.Decompressor - decompressor_library: - name: envoy.compression.brotli.decompressor - typed_config: - "@type": type.googleapis.com/envoy.extensions.compression.brotli.decompressor.v3.Brotli - chunk_size: 8192 - - name: envoy.filters.http.router - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router - - - {% endif %} - {% endfor %} - # begin - legacy llm listeners - name: egress_traffic diff --git a/arch/supervisord.conf b/arch/supervisord.conf index f7e374b6..0361ee0b 100644 --- a/arch/supervisord.conf +++ b/arch/supervisord.conf @@ -9,7 +9,7 @@ stdout_logfile_maxbytes=0 stderr_logfile_maxbytes=0 [program:envoy] -command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log//envoy.log" +command=/bin/sh -c "python -m cli.config_generator && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log//envoy.log" stdout_logfile=/dev/stdout redirect_stderr=true stdout_logfile_maxbytes=0 diff --git a/arch/tools/cli/config_generator.py b/arch/tools/cli/config_generator.py index c054a09d..661b9b51 100644 --- a/arch/tools/cli/config_generator.py +++ b/arch/tools/cli/config_generator.py @@ -1,5 +1,6 @@ import json import os +from cli.utils import convert_legacy_llm_providers from jinja2 import Environment, FileSystemLoader import yaml from jsonschema import validate @@ -66,6 +67,12 @@ def validate_and_render_schema(): _ = yaml.safe_load(arch_config_schema) inferred_clusters = {} + listeners, llm_gateway, prompt_gateway = convert_legacy_llm_providers( + config_yaml.get("listeners"), config_yaml.get("llm_providers") + ) + + config_yaml["listeners"] = listeners + endpoints = config_yaml.get("endpoints", {}) # Process agents section and convert to endpoints @@ -126,64 +133,62 @@ def validate_and_render_schema(): model_name_keys = set() model_usage_name_keys = set() - # legacy listeners - # check if type is array or object - # if its dict its legacy format let's convert it to array - prompt_gateway_listener = { - "name": "ingress_traffic", - "port": 10000, - "address": "0.0.0.0", - "timeout": "30s", - "protocol": "openai", - } - llm_gateway_listener = { - "name": "egress_traffic", - "port": 12000, - "address": "0.0.0.0", - "timeout": "30s", - "llm_providers": [], - "protocol": "openai", - } - if isinstance(config_yaml["listeners"], dict): - ingress_traffic = config_yaml["listeners"].get("ingress_traffic", None) - egress_traffic = config_yaml["listeners"].get("egress_traffic", {}) - config_yaml["listeners"] = [] + # # legacy listeners + # # check if type is array or object + # # if its dict its legacy format let's convert it to array + # prompt_gateway_listener = { + # "name": "ingress_traffic", + # "port": 10000, + # "address": "0.0.0.0", + # "timeout": "30s", + # "protocol": "openai", + # } + # llm_gateway_listener = { + # "name": "egress_traffic", + # "port": 12000, + # "address": "0.0.0.0", + # "timeout": "30s", + # "llm_providers": [], + # "protocol": "openai", + # } + # if isinstance(config_yaml["listeners"], dict): + # ingress_traffic = config_yaml["listeners"].get("ingress_traffic", None) + # egress_traffic = config_yaml["listeners"].get("egress_traffic", {}) + # config_yaml["listeners"] = [] - llm_providers = [] - if config_yaml.get("llm_providers"): - llm_providers = config_yaml["llm_providers"] - del config_yaml["llm_providers"] - llm_gateway_listener["port"] = egress_traffic.get( - "port", llm_gateway_listener["port"] - ) - llm_gateway_listener["address"] = egress_traffic.get( - "address", llm_gateway_listener["address"] - ) - llm_gateway_listener["timeout"] = egress_traffic.get( - "timeout", llm_gateway_listener["timeout"] - ) - llm_gateway_listener["llm_providers"] = llm_providers - config_yaml["listeners"].append(llm_gateway_listener) + # llm_providers = [] + # if config_yaml.get("llm_providers"): + # llm_providers = config_yaml["llm_providers"] + # del config_yaml["llm_providers"] + # llm_gateway_listener["port"] = egress_traffic.get( + # "port", llm_gateway_listener["port"] + # ) + # llm_gateway_listener["address"] = egress_traffic.get( + # "address", llm_gateway_listener["address"] + # ) + # llm_gateway_listener["timeout"] = egress_traffic.get( + # "timeout", llm_gateway_listener["timeout"] + # ) + # llm_gateway_listener["llm_providers"] = llm_providers + # config_yaml["listeners"].append(llm_gateway_listener) - if ingress_traffic: - prompt_gateway_listener["port"] = ingress_traffic.get( - "port", prompt_gateway_listener["port"] - ) - prompt_gateway_listener["address"] = ingress_traffic.get( - "address", prompt_gateway_listener["address"] - ) - prompt_gateway_listener["timeout"] = ingress_traffic.get( - "timeout", prompt_gateway_listener["timeout"] - ) - config_yaml["listeners"].append(prompt_gateway_listener) + # if ingress_traffic: + # prompt_gateway_listener["port"] = ingress_traffic.get( + # "port", prompt_gateway_listener["port"] + # ) + # prompt_gateway_listener["address"] = ingress_traffic.get( + # "address", prompt_gateway_listener["address"] + # ) + # prompt_gateway_listener["timeout"] = ingress_traffic.get( + # "timeout", prompt_gateway_listener["timeout"] + # ) + # config_yaml["listeners"].append(prompt_gateway_listener) - for listener in config_yaml["listeners"]: - print("Processing listener: ", listener) - name = listener.get("name", None) - - # TODO: for now we only support llm_providers under egress_traffic listener - if name != "egress_traffic": + for listener in listeners: + if listener.get("llm_providers") is None or listener.get("llm_providers") == []: continue + print("Processing listener with llm_providers: ", listener) + name = listener.get("name", None) for llm_provider in listener.get("llm_providers", []): if llm_provider.get("usage", None): @@ -293,12 +298,18 @@ def validate_and_render_schema(): } ) - for listener in config_yaml["listeners"]: + updated_llm_providers = [] + for listener in listeners: print("Processing listener: ", listener) - if listener.get("name") == "egress_traffic": + llm_providers = listener.get("llm_providers", None) + if llm_providers is not None and llm_providers != []: print("processing egress traffic listener") print("updated_llm_providers: ", updated_llm_providers) - listener["llm_providers"] = deepcopy(updated_llm_providers) + if updated_llm_providers is not None and updated_llm_providers != []: + raise Exception( + "Please provide llm_providers either under listeners or at root level, not both. Currently we don't support multiple listeners with llm_providers" + ) + updated_llm_providers = deepcopy(llm_providers) config_yaml["llm_providers"] = updated_llm_providers # Validate model aliases if present @@ -314,16 +325,6 @@ def validate_and_render_schema(): arch_config_string = yaml.dump(config_yaml) arch_llm_config_string = yaml.dump(config_yaml) - # prompt_gateway_listener = config_yaml.get("listeners", {}).get( - # "ingress_traffic", {} - # ) - # if prompt_gateway_listener.get("port") == None: - # prompt_gateway_listener["port"] = 10000 # default port for prompt gateway - # if prompt_gateway_listener.get("address") == None: - # prompt_gateway_listener["address"] = "127.0.0.1" - # if prompt_gateway_listener.get("timeout") == None: - # prompt_gateway_listener["timeout"] = "10s" - use_agent_orchestrator = config_yaml.get("overrides", {}).get( "use_agent_orchestrator", False ) @@ -346,8 +347,8 @@ def validate_and_render_schema(): print("agent_orchestrator: ", agent_orchestrator) data = { - "prompt_gateway_listener": prompt_gateway_listener, - "llm_gateway_listener": llm_gateway_listener, + "prompt_gateway_listener": prompt_gateway, + "llm_gateway_listener": llm_gateway, "arch_config": arch_config_string, "arch_llm_config": arch_llm_config_string, "arch_clusters": inferred_clusters, @@ -355,7 +356,7 @@ def validate_and_render_schema(): "arch_tracing": arch_tracing, "local_llms": llms_with_endpoint, "agent_orchestrator": agent_orchestrator, - "listeners": config_yaml["listeners"].copy(), + "listeners": listeners, } rendered = template.render(data) diff --git a/arch/tools/cli/core.py b/arch/tools/cli/core.py index eb1ed377..2761ff44 100644 --- a/arch/tools/cli/core.py +++ b/arch/tools/cli/core.py @@ -5,7 +5,7 @@ import time import sys import yaml -from cli.utils import getLogger +from cli.utils import convert_legacy_llm_providers, getLogger from cli.consts import ( ARCHGW_DOCKER_IMAGE, ARCHGW_DOCKER_NAME, @@ -37,9 +37,11 @@ def _get_gateway_ports(arch_config_file: str) -> list[int]: print("arch config dict json string: ", json.dumps(arch_config_dict)) - all_ports = [ - listener.get("port") for listener in arch_config_dict.get("listeners", []) - ] + listeners, _, _ = convert_legacy_llm_providers( + arch_config_dict.get("listeners"), arch_config_dict.get("llm_providers") + ) + + all_ports = [listener.get("port") for listener in listeners] return all_ports diff --git a/arch/tools/cli/docker_cli.py b/arch/tools/cli/docker_cli.py index 873a2641..2d7bac28 100644 --- a/arch/tools/cli/docker_cli.py +++ b/arch/tools/cli/docker_cli.py @@ -127,7 +127,8 @@ def docker_validate_archgw_schema(arch_config_file): "--entrypoint", "python", ARCHGW_DOCKER_IMAGE, - "config_generator.py", + "-m", + "cli.config_generator", ], capture_output=True, text=True, diff --git a/arch/tools/cli/utils.py b/arch/tools/cli/utils.py index e8bdb3a7..57c34719 100644 --- a/arch/tools/cli/utils.py +++ b/arch/tools/cli/utils.py @@ -21,16 +21,85 @@ def getLogger(name="cli"): log = getLogger(__name__) +def convert_legacy_llm_providers( + listeners: dict | list, llm_providers: list | None +) -> tuple[list, dict | None, dict | None]: + llm_gateway_listener = { + "name": "egress_traffic", + "port": 12000, + "address": "0.0.0.0", + "timeout": "30s", + "llm_providers": [], + "protocol": "openai", + } + + prompt_gateway_listener = { + "name": "ingress_traffic", + "port": 10000, + "address": "0.0.0.0", + "timeout": "30s", + "protocol": "openai", + } + + if isinstance(listeners, dict): + # legacy listeners + # check if type is array or object + # if its dict its legacy format let's convert it to array + updated_listeners = [] + ingress_traffic = listeners.get("ingress_traffic", {}) + egress_traffic = listeners.get("egress_traffic", {}) + + llm_gateway_listener["port"] = egress_traffic.get( + "port", llm_gateway_listener["port"] + ) + llm_gateway_listener["address"] = egress_traffic.get( + "address", llm_gateway_listener["address"] + ) + llm_gateway_listener["timeout"] = egress_traffic.get( + "timeout", llm_gateway_listener["timeout"] + ) + if llm_providers is None or llm_providers == []: + raise ValueError("llm_providers cannot be empty when using legacy format") + + llm_gateway_listener["llm_providers"] = llm_providers + updated_listeners.append(llm_gateway_listener) + + if ingress_traffic and ingress_traffic != {}: + prompt_gateway_listener["port"] = ingress_traffic.get( + "port", prompt_gateway_listener["port"] + ) + prompt_gateway_listener["address"] = ingress_traffic.get( + "address", prompt_gateway_listener["address"] + ) + prompt_gateway_listener["timeout"] = ingress_traffic.get( + "timeout", prompt_gateway_listener["timeout"] + ) + updated_listeners.append(prompt_gateway_listener) + + return updated_listeners, llm_gateway_listener, prompt_gateway_listener + + llm_provider_set = False + for listener in listeners: + if listener.get("llm_providers") is not None: + if llm_provider_set: + raise ValueError( + "Currently only one listener can have llm_providers set" + ) + llm_gateway_listener = listener + llm_provider_set = True + + return listeners, llm_gateway_listener, prompt_gateway_listener + + def get_llm_provider_access_keys(arch_config_file): with open(arch_config_file, "r") as file: arch_config = file.read() arch_config_yaml = yaml.safe_load(arch_config) access_key_list = [] - for llm_provider in arch_config_yaml.get("llm_providers", []): - access_key = llm_provider.get("access_key") - if access_key is not None: - access_key_list.append(access_key) + listeners, _, _ = convert_legacy_llm_providers( + arch_config_yaml.get("listeners"), arch_config_yaml.get("llm_providers") + ) for prompt_target in arch_config_yaml.get("prompt_targets", []): for k, v in prompt_target.get("endpoint", {}).get("http_headers", {}).items(): @@ -44,7 +113,7 @@ def get_llm_provider_access_keys(arch_config_file): else: access_key_list.append(v) - for listener in arch_config_yaml.get("listeners", []): + for listener in listeners: for llm_provider in listener.get("llm_providers", []): access_key = llm_provider.get("access_key") if access_key is not None: diff --git a/arch/tools/test/test_config_generator.py b/arch/tools/test/test_config_generator.py index 94e1ba39..43b7bd45 100644 --- a/arch/tools/test/test_config_generator.py +++ b/arch/tools/test/test_config_generator.py @@ -90,7 +90,7 @@ def test_validate_and_render_happy_path_agent_config(monkeypatch): monkeypatch.setenv("TEMPLATE_ROOT", "../") arch_config = """ -version: v0.2.0 +version: v0.3.0 agents: - name: query_rewriter @@ -347,3 +347,132 @@ def test_validate_and_render_schema_tests(monkeypatch, arch_config_test_case): with pytest.raises(Exception) as excinfo: validate_and_render_schema() assert expected_error in str(excinfo.value) + + +def test_convert_legacy_llm_providers(): + from cli.utils import convert_legacy_llm_providers + + listeners = { + "ingress_traffic": { + "address": "0.0.0.0", + "port": 10000, + "timeout": "30s", + "protocol": "openai", + }, + "egress_traffic": { + "address": "0.0.0.0", + "port": 12000, + "timeout": "30s", + "protocol": "openai", + }, + } + llm_providers = [ + { + "model": "openai/gpt-4o", + "access_key": "test_key", + } + ] + + updated_providers, llm_gateway, prompt_gateway = convert_legacy_llm_providers( + listeners, llm_providers + ) + assert isinstance(updated_providers, list) + assert llm_gateway is not None + assert prompt_gateway is not None + assert updated_providers == [ + { + "address": "0.0.0.0", + "llm_providers": [ + { + "access_key": "test_key", + "model": "openai/gpt-4o", + }, + ], + "name": "egress_traffic", + "port": 12000, + "protocol": "openai", + "timeout": "30s", + }, + { + "address": "0.0.0.0", + "name": "ingress_traffic", + "port": 10000, + "protocol": "openai", + "timeout": "30s", + }, + ] + assert llm_gateway == { + "address": "0.0.0.0", + "llm_providers": [ + { + "access_key": "test_key", + "model": "openai/gpt-4o", + }, + ], + "name": "egress_traffic", + "port": 12000, + "protocol": "openai", + "timeout": "30s", + } + + assert prompt_gateway == { + "address": "0.0.0.0", + "name": "ingress_traffic", + "port": 10000, + "protocol": "openai", + "timeout": "30s", + } + + +def test_convert_legacy_llm_providers_no_prompt_gateway(): + from cli.utils import convert_legacy_llm_providers + + listeners = { + "egress_traffic": { + "address": "0.0.0.0", + "port": 12000, + "timeout": "30s", + "protocol": "openai", + } + } + llm_providers = [ + { + "model": "openai/gpt-4o", + "access_key": "test_key", + } + ] + + updated_providers, llm_gateway, prompt_gateway = convert_legacy_llm_providers( + listeners, llm_providers + ) + assert isinstance(updated_providers, list) + assert llm_gateway is not None + assert prompt_gateway is not None + assert updated_providers == [ + { + "address": "0.0.0.0", + "llm_providers": [ + { + "access_key": "test_key", + "model": "openai/gpt-4o", + }, + ], + "name": "egress_traffic", + "port": 12000, + "protocol": "openai", + "timeout": "30s", + } + ] + assert llm_gateway == { + "address": "0.0.0.0", + "llm_providers": [ + { + "access_key": "test_key", + "model": "openai/gpt-4o", + }, + ], + "name": "egress_traffic", + "port": 12000, + "protocol": "openai", + "timeout": "30s", + } diff --git a/arch/validate_arch_config.sh b/arch/validate_arch_config.sh index 493d1b2f..f253bf3a 100644 --- a/arch/validate_arch_config.sh +++ b/arch/validate_arch_config.sh @@ -5,7 +5,7 @@ failed_files=() for file in $(find . -name arch_config.yaml -o -name arch_config_full_reference.yaml); do echo "Validating ${file}..." touch $(pwd)/${file}_rendered - if ! docker run --rm -v "$(pwd)/${file}:/app/arch_config.yaml:ro" -v "$(pwd)/${file}_rendered:/app/arch_config_rendered.yaml:rw" --entrypoint /bin/sh katanemo/archgw:latest -c "python config_generator.py" 2>&1 > /dev/null ; then + if ! docker run --rm -v "$(pwd)/${file}:/app/arch_config.yaml:ro" -v "$(pwd)/${file}_rendered:/app/arch_config_rendered.yaml:rw" --entrypoint /bin/sh katanemo/archgw:latest -c "python -m cli.config_generator" 2>&1 > /dev/null ; then echo "Validation failed for $file" failed_files+=("$file") fi