more changes

This commit is contained in:
Adil Hafeez 2025-09-17 00:57:26 -07:00
parent 6cfce60501
commit c173074864
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
9 changed files with 286 additions and 215 deletions

View file

@ -32,7 +32,7 @@ COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy
WORKDIR /app
COPY arch/requirements.txt .
RUN pip install -r requirements.txt
COPY arch/tools/cli/config_generator.py .
COPY arch/tools .
COPY arch/envoy.template.yaml .
COPY arch/arch_config_schema.yaml .
COPY arch/supervisord.conf /etc/supervisor/conf.d/supervisord.conf

View file

@ -395,137 +395,6 @@ static_resources:
{% endif %}
{% endfor %}
# Listeners for LLMs
{% for listener in listeners %}
{% if listener.llm_providers %}
- name: {{ listener.name | replace(" ", "_") }}
address:
socket_address:
address: {{ listener.address }}
port_value: {{ listener.port }}
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
generate_request_id: true
tracing:
provider:
name: envoy.tracers.opentelemetry
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
grpc_service:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 0.250s
service_name: egress_traffic_llm
random_sampling:
value: {{ arch_tracing.random_sampling }}
{% endif %}
stat_prefix: egress_traffic
codec_type: AUTO
scheme_header_transformation:
scheme_to_overwrite: https
access_log:
- name: envoy.access_loggers.file
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/access_llm.log"
route_config:
name: local_routes
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
{% for provider in listener.llm_providers %}
# if endpoint is set then use custom cluster for upstream llm
{% if provider.endpoint %}
{% set llm_cluster_name = provider.name %}
{% else %}
{% set llm_cluster_name = provider.provider_interface %}
{% endif %}
- match:
prefix: "/"
headers:
- name: "x-arch-llm-provider"
string_match:
exact: {{ llm_cluster_name }}
route:
auto_host_rewrite: true
cluster: {{ llm_cluster_name }}
timeout: 60s
{% endfor %}
- match:
prefix: "/"
direct_response:
status: 400
body:
inline_string: "x-arch-llm-provider header not set, llm gateway cannot perform routing\n"
http_filters:
- name: envoy.filters.http.compressor
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor
compressor_library:
name: envoy.compression.brotli.compressor
typed_config:
"@type": type.googleapis.com/envoy.extensions.compression.brotli.compressor.v3.Brotli
chunk_size: 8192
- name: envoy.filters.http.compressor
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor
compressor_library:
name: compress
typed_config:
"@type": type.googleapis.com/envoy.extensions.compression.gzip.compressor.v3.Gzip
memory_level: 3
window_bits: 10
- name: envoy.filters.http.wasm
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
value:
config:
name: "http_config"
root_id: llm_gateway
configuration:
"@type": "type.googleapis.com/google.protobuf.StringValue"
value: |
{{ arch_llm_config | indent(32) }}
vm_config:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/llm_gateway.wasm"
- name: envoy.filters.http.decompressor
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.decompressor.v3.Decompressor
decompressor_library:
name: decompress
typed_config:
"@type": "type.googleapis.com/envoy.extensions.compression.gzip.decompressor.v3.Gzip"
chunk_size: 8192
# If this ratio is set too low, then body data will not be decompressed completely.
max_inflate_ratio: 1000
- name: envoy.filters.http.decompressor
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.decompressor.v3.Decompressor
decompressor_library:
name: envoy.compression.brotli.decompressor
typed_config:
"@type": type.googleapis.com/envoy.extensions.compression.brotli.decompressor.v3.Brotli
chunk_size: 8192
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
{% endif %}
{% endfor %}
# begin - legacy llm listeners
- name: egress_traffic

View file

@ -9,7 +9,7 @@ stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
[program:envoy]
command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log//envoy.log"
command=/bin/sh -c "python -m cli.config_generator && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log//envoy.log"
stdout_logfile=/dev/stdout
redirect_stderr=true
stdout_logfile_maxbytes=0

View file

@ -1,5 +1,6 @@
import json
import os
from cli.utils import convert_legacy_llm_providers
from jinja2 import Environment, FileSystemLoader
import yaml
from jsonschema import validate
@ -66,6 +67,12 @@ def validate_and_render_schema():
_ = yaml.safe_load(arch_config_schema)
inferred_clusters = {}
listeners, llm_gateway, prompt_gateway = convert_legacy_llm_providers(
config_yaml.get("listeners"), config_yaml.get("llm_providers")
)
config_yaml["listeners"] = listeners
endpoints = config_yaml.get("endpoints", {})
# Process agents section and convert to endpoints
@ -126,64 +133,62 @@ def validate_and_render_schema():
model_name_keys = set()
model_usage_name_keys = set()
# legacy listeners
# check if type is array or object
# if its dict its legacy format let's convert it to array
prompt_gateway_listener = {
"name": "ingress_traffic",
"port": 10000,
"address": "0.0.0.0",
"timeout": "30s",
"protocol": "openai",
}
llm_gateway_listener = {
"name": "egress_traffic",
"port": 12000,
"address": "0.0.0.0",
"timeout": "30s",
"llm_providers": [],
"protocol": "openai",
}
if isinstance(config_yaml["listeners"], dict):
ingress_traffic = config_yaml["listeners"].get("ingress_traffic", None)
egress_traffic = config_yaml["listeners"].get("egress_traffic", {})
config_yaml["listeners"] = []
# # legacy listeners
# # check if type is array or object
# # if its dict its legacy format let's convert it to array
# prompt_gateway_listener = {
# "name": "ingress_traffic",
# "port": 10000,
# "address": "0.0.0.0",
# "timeout": "30s",
# "protocol": "openai",
# }
# llm_gateway_listener = {
# "name": "egress_traffic",
# "port": 12000,
# "address": "0.0.0.0",
# "timeout": "30s",
# "llm_providers": [],
# "protocol": "openai",
# }
# if isinstance(config_yaml["listeners"], dict):
# ingress_traffic = config_yaml["listeners"].get("ingress_traffic", None)
# egress_traffic = config_yaml["listeners"].get("egress_traffic", {})
# config_yaml["listeners"] = []
llm_providers = []
if config_yaml.get("llm_providers"):
llm_providers = config_yaml["llm_providers"]
del config_yaml["llm_providers"]
llm_gateway_listener["port"] = egress_traffic.get(
"port", llm_gateway_listener["port"]
)
llm_gateway_listener["address"] = egress_traffic.get(
"address", llm_gateway_listener["address"]
)
llm_gateway_listener["timeout"] = egress_traffic.get(
"timeout", llm_gateway_listener["timeout"]
)
llm_gateway_listener["llm_providers"] = llm_providers
config_yaml["listeners"].append(llm_gateway_listener)
# llm_providers = []
# if config_yaml.get("llm_providers"):
# llm_providers = config_yaml["llm_providers"]
# del config_yaml["llm_providers"]
# llm_gateway_listener["port"] = egress_traffic.get(
# "port", llm_gateway_listener["port"]
# )
# llm_gateway_listener["address"] = egress_traffic.get(
# "address", llm_gateway_listener["address"]
# )
# llm_gateway_listener["timeout"] = egress_traffic.get(
# "timeout", llm_gateway_listener["timeout"]
# )
# llm_gateway_listener["llm_providers"] = llm_providers
# config_yaml["listeners"].append(llm_gateway_listener)
if ingress_traffic:
prompt_gateway_listener["port"] = ingress_traffic.get(
"port", prompt_gateway_listener["port"]
)
prompt_gateway_listener["address"] = ingress_traffic.get(
"address", prompt_gateway_listener["address"]
)
prompt_gateway_listener["timeout"] = ingress_traffic.get(
"timeout", prompt_gateway_listener["timeout"]
)
config_yaml["listeners"].append(prompt_gateway_listener)
# if ingress_traffic:
# prompt_gateway_listener["port"] = ingress_traffic.get(
# "port", prompt_gateway_listener["port"]
# )
# prompt_gateway_listener["address"] = ingress_traffic.get(
# "address", prompt_gateway_listener["address"]
# )
# prompt_gateway_listener["timeout"] = ingress_traffic.get(
# "timeout", prompt_gateway_listener["timeout"]
# )
# config_yaml["listeners"].append(prompt_gateway_listener)
for listener in config_yaml["listeners"]:
print("Processing listener: ", listener)
name = listener.get("name", None)
# TODO: for now we only support llm_providers under egress_traffic listener
if name != "egress_traffic":
for listener in listeners:
if listener.get("llm_providers") is None or listener.get("llm_providers") == []:
continue
print("Processing listener with llm_providers: ", listener)
name = listener.get("name", None)
for llm_provider in listener.get("llm_providers", []):
if llm_provider.get("usage", None):
@ -293,12 +298,18 @@ def validate_and_render_schema():
}
)
for listener in config_yaml["listeners"]:
updated_llm_providers = []
for listener in listeners:
print("Processing listener: ", listener)
if listener.get("name") == "egress_traffic":
llm_providers = listener.get("llm_providers", None)
if llm_providers is not None and llm_providers != []:
print("processing egress traffic listener")
print("updated_llm_providers: ", updated_llm_providers)
listener["llm_providers"] = deepcopy(updated_llm_providers)
if updated_llm_providers is not None and updated_llm_providers != []:
raise Exception(
"Please provide llm_providers either under listeners or at root level, not both. Currently we don't support multiple listeners with llm_providers"
)
updated_llm_providers = deepcopy(llm_providers)
config_yaml["llm_providers"] = updated_llm_providers
# Validate model aliases if present
@ -314,16 +325,6 @@ def validate_and_render_schema():
arch_config_string = yaml.dump(config_yaml)
arch_llm_config_string = yaml.dump(config_yaml)
# prompt_gateway_listener = config_yaml.get("listeners", {}).get(
# "ingress_traffic", {}
# )
# if prompt_gateway_listener.get("port") == None:
# prompt_gateway_listener["port"] = 10000 # default port for prompt gateway
# if prompt_gateway_listener.get("address") == None:
# prompt_gateway_listener["address"] = "127.0.0.1"
# if prompt_gateway_listener.get("timeout") == None:
# prompt_gateway_listener["timeout"] = "10s"
use_agent_orchestrator = config_yaml.get("overrides", {}).get(
"use_agent_orchestrator", False
)
@ -346,8 +347,8 @@ def validate_and_render_schema():
print("agent_orchestrator: ", agent_orchestrator)
data = {
"prompt_gateway_listener": prompt_gateway_listener,
"llm_gateway_listener": llm_gateway_listener,
"prompt_gateway_listener": prompt_gateway,
"llm_gateway_listener": llm_gateway,
"arch_config": arch_config_string,
"arch_llm_config": arch_llm_config_string,
"arch_clusters": inferred_clusters,
@ -355,7 +356,7 @@ def validate_and_render_schema():
"arch_tracing": arch_tracing,
"local_llms": llms_with_endpoint,
"agent_orchestrator": agent_orchestrator,
"listeners": config_yaml["listeners"].copy(),
"listeners": listeners,
}
rendered = template.render(data)

View file

@ -5,7 +5,7 @@ import time
import sys
import yaml
from cli.utils import getLogger
from cli.utils import convert_legacy_llm_providers, getLogger
from cli.consts import (
ARCHGW_DOCKER_IMAGE,
ARCHGW_DOCKER_NAME,
@ -37,9 +37,11 @@ def _get_gateway_ports(arch_config_file: str) -> list[int]:
print("arch config dict json string: ", json.dumps(arch_config_dict))
all_ports = [
listener.get("port") for listener in arch_config_dict.get("listeners", [])
]
listeners, _, _ = convert_legacy_llm_providers(
arch_config_dict.get("listeners"), arch_config_dict.get("llm_providers")
)
all_ports = [listener.get("port") for listener in listeners]
return all_ports

View file

@ -127,7 +127,8 @@ def docker_validate_archgw_schema(arch_config_file):
"--entrypoint",
"python",
ARCHGW_DOCKER_IMAGE,
"config_generator.py",
"-m",
"cli.config_generator",
],
capture_output=True,
text=True,

View file

@ -21,16 +21,85 @@ def getLogger(name="cli"):
log = getLogger(__name__)
def convert_legacy_llm_providers(
listeners: dict | list, llm_providers: list | None
) -> tuple[list, dict | None, dict | None]:
llm_gateway_listener = {
"name": "egress_traffic",
"port": 12000,
"address": "0.0.0.0",
"timeout": "30s",
"llm_providers": [],
"protocol": "openai",
}
prompt_gateway_listener = {
"name": "ingress_traffic",
"port": 10000,
"address": "0.0.0.0",
"timeout": "30s",
"protocol": "openai",
}
if isinstance(listeners, dict):
# legacy listeners
# check if type is array or object
# if its dict its legacy format let's convert it to array
updated_listeners = []
ingress_traffic = listeners.get("ingress_traffic", {})
egress_traffic = listeners.get("egress_traffic", {})
llm_gateway_listener["port"] = egress_traffic.get(
"port", llm_gateway_listener["port"]
)
llm_gateway_listener["address"] = egress_traffic.get(
"address", llm_gateway_listener["address"]
)
llm_gateway_listener["timeout"] = egress_traffic.get(
"timeout", llm_gateway_listener["timeout"]
)
if llm_providers is None or llm_providers == []:
raise ValueError("llm_providers cannot be empty when using legacy format")
llm_gateway_listener["llm_providers"] = llm_providers
updated_listeners.append(llm_gateway_listener)
if ingress_traffic and ingress_traffic != {}:
prompt_gateway_listener["port"] = ingress_traffic.get(
"port", prompt_gateway_listener["port"]
)
prompt_gateway_listener["address"] = ingress_traffic.get(
"address", prompt_gateway_listener["address"]
)
prompt_gateway_listener["timeout"] = ingress_traffic.get(
"timeout", prompt_gateway_listener["timeout"]
)
updated_listeners.append(prompt_gateway_listener)
return updated_listeners, llm_gateway_listener, prompt_gateway_listener
llm_provider_set = False
for listener in listeners:
if listener.get("llm_providers") is not None:
if llm_provider_set:
raise ValueError(
"Currently only one listener can have llm_providers set"
)
llm_gateway_listener = listener
llm_provider_set = True
return listeners, llm_gateway_listener, prompt_gateway_listener
def get_llm_provider_access_keys(arch_config_file):
with open(arch_config_file, "r") as file:
arch_config = file.read()
arch_config_yaml = yaml.safe_load(arch_config)
access_key_list = []
for llm_provider in arch_config_yaml.get("llm_providers", []):
access_key = llm_provider.get("access_key")
if access_key is not None:
access_key_list.append(access_key)
listeners, _, _ = convert_legacy_llm_providers(
arch_config_yaml.get("listeners"), arch_config_yaml.get("llm_providers")
)
for prompt_target in arch_config_yaml.get("prompt_targets", []):
for k, v in prompt_target.get("endpoint", {}).get("http_headers", {}).items():
@ -44,7 +113,7 @@ def get_llm_provider_access_keys(arch_config_file):
else:
access_key_list.append(v)
for listener in arch_config_yaml.get("listeners", []):
for listener in listeners:
for llm_provider in listener.get("llm_providers", []):
access_key = llm_provider.get("access_key")
if access_key is not None:

View file

@ -90,7 +90,7 @@ def test_validate_and_render_happy_path_agent_config(monkeypatch):
monkeypatch.setenv("TEMPLATE_ROOT", "../")
arch_config = """
version: v0.2.0
version: v0.3.0
agents:
- name: query_rewriter
@ -347,3 +347,132 @@ def test_validate_and_render_schema_tests(monkeypatch, arch_config_test_case):
with pytest.raises(Exception) as excinfo:
validate_and_render_schema()
assert expected_error in str(excinfo.value)
def test_convert_legacy_llm_providers():
from cli.utils import convert_legacy_llm_providers
listeners = {
"ingress_traffic": {
"address": "0.0.0.0",
"port": 10000,
"timeout": "30s",
"protocol": "openai",
},
"egress_traffic": {
"address": "0.0.0.0",
"port": 12000,
"timeout": "30s",
"protocol": "openai",
},
}
llm_providers = [
{
"model": "openai/gpt-4o",
"access_key": "test_key",
}
]
updated_providers, llm_gateway, prompt_gateway = convert_legacy_llm_providers(
listeners, llm_providers
)
assert isinstance(updated_providers, list)
assert llm_gateway is not None
assert prompt_gateway is not None
assert updated_providers == [
{
"address": "0.0.0.0",
"llm_providers": [
{
"access_key": "test_key",
"model": "openai/gpt-4o",
},
],
"name": "egress_traffic",
"port": 12000,
"protocol": "openai",
"timeout": "30s",
},
{
"address": "0.0.0.0",
"name": "ingress_traffic",
"port": 10000,
"protocol": "openai",
"timeout": "30s",
},
]
assert llm_gateway == {
"address": "0.0.0.0",
"llm_providers": [
{
"access_key": "test_key",
"model": "openai/gpt-4o",
},
],
"name": "egress_traffic",
"port": 12000,
"protocol": "openai",
"timeout": "30s",
}
assert prompt_gateway == {
"address": "0.0.0.0",
"name": "ingress_traffic",
"port": 10000,
"protocol": "openai",
"timeout": "30s",
}
def test_convert_legacy_llm_providers_no_prompt_gateway():
from cli.utils import convert_legacy_llm_providers
listeners = {
"egress_traffic": {
"address": "0.0.0.0",
"port": 12000,
"timeout": "30s",
"protocol": "openai",
}
}
llm_providers = [
{
"model": "openai/gpt-4o",
"access_key": "test_key",
}
]
updated_providers, llm_gateway, prompt_gateway = convert_legacy_llm_providers(
listeners, llm_providers
)
assert isinstance(updated_providers, list)
assert llm_gateway is not None
assert prompt_gateway is not None
assert updated_providers == [
{
"address": "0.0.0.0",
"llm_providers": [
{
"access_key": "test_key",
"model": "openai/gpt-4o",
},
],
"name": "egress_traffic",
"port": 12000,
"protocol": "openai",
"timeout": "30s",
}
]
assert llm_gateway == {
"address": "0.0.0.0",
"llm_providers": [
{
"access_key": "test_key",
"model": "openai/gpt-4o",
},
],
"name": "egress_traffic",
"port": 12000,
"protocol": "openai",
"timeout": "30s",
}

View file

@ -5,7 +5,7 @@ failed_files=()
for file in $(find . -name arch_config.yaml -o -name arch_config_full_reference.yaml); do
echo "Validating ${file}..."
touch $(pwd)/${file}_rendered
if ! docker run --rm -v "$(pwd)/${file}:/app/arch_config.yaml:ro" -v "$(pwd)/${file}_rendered:/app/arch_config_rendered.yaml:rw" --entrypoint /bin/sh katanemo/archgw:latest -c "python config_generator.py" 2>&1 > /dev/null ; then
if ! docker run --rm -v "$(pwd)/${file}:/app/arch_config.yaml:ro" -v "$(pwd)/${file}_rendered:/app/arch_config_rendered.yaml:rw" --entrypoint /bin/sh katanemo/archgw:latest -c "python -m cli.config_generator" 2>&1 > /dev/null ; then
echo "Validation failed for $file"
failed_files+=("$file")
fi