archgw cli (#117)

* initial commit of the insurange agent demo, with the CLI tool

* committing the cli

* fixed some field descriptions for generate-prompt-targets

* CLI works with buil, up and down commands. Function calling example works stand-alone

* fixed README to install archgw cli

* fixing based on feedback

* fixing based on feedback

---------

Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-261.local>
This commit is contained in:
Salman Paracha 2024-10-03 18:21:27 -07:00 committed by GitHub
parent af018e5fd8
commit dc57f119a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 1087 additions and 203 deletions

View file

@ -19,7 +19,7 @@ COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy
WORKDIR /config
COPY arch/requirements.txt .
RUN pip install -r requirements.txt
COPY arch/config_generator.py .
COPY arch/tools/config_generator.py .
COPY arch/envoy.template.yaml .
COPY arch/arch_config_schema.yaml .

View file

@ -1,88 +0,0 @@
import os
from jinja2 import Environment, FileSystemLoader
import yaml
from jsonschema import validate
ENVOY_CONFIG_TEMPLATE_FILE = os.getenv('ENVOY_CONFIG_TEMPLATE_FILE', 'envoy.template.yaml')
ARCH_CONFIG_FILE = os.getenv('ARCH_CONFIG_FILE', '/config/arch_config.yaml')
ENVOY_CONFIG_FILE_RENDERED = os.getenv('ENVOY_CONFIG_FILE_RENDERED', '/etc/envoy/envoy.yaml')
ARCH_CONFIG_SCHEMA_FILE = os.getenv('ARCH_CONFIG_SCHEMA_FILE', 'arch_config_schema.yaml')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', False)
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY', False)
def add_secret_key_to_llm_providers(config_yaml) :
llm_providers = []
for llm_provider in config_yaml.get("llm_providers", []):
if llm_provider['access_key'] == "$MISTRAL_ACCESS_KEY":
llm_provider['access_key'] = MISTRAL_API_KEY
elif llm_provider['access_key'] == "$OPENAI_ACCESS_KEY":
llm_provider['access_key'] = OPENAI_API_KEY
else:
llm_provider.pop('access_key')
llm_providers.append(llm_provider)
config_yaml["llm_providers"] = llm_providers
return config_yaml
env = Environment(loader=FileSystemLoader('./'))
template = env.get_template('envoy.template.yaml')
with open(ARCH_CONFIG_FILE, 'r') as file:
arch_config_string = file.read()
with open(ARCH_CONFIG_SCHEMA_FILE, 'r') as file:
arch_config_schema = file.read()
config_yaml = yaml.safe_load(arch_config_string)
config_schema_yaml = yaml.safe_load(arch_config_schema)
try:
validate(config_yaml, config_schema_yaml)
except Exception as e:
print(f"Error validating arch_config file: {ARCH_CONFIG_FILE}, error: {e.message}")
exit(1)
inferred_clusters = {}
for prompt_target in config_yaml["prompt_targets"]:
name = prompt_target.get("endpoint", {}).get("name", "")
if name not in inferred_clusters:
inferred_clusters[name] = {
"name": name,
"port": 80, # default port
}
print(inferred_clusters)
endpoints = config_yaml.get("endpoints", {})
# override the inferred clusters with the ones defined in the config
for name, endpoint_details in endpoints.items():
if name in inferred_clusters:
print("updating cluster", endpoint_details)
inferred_clusters[name].update(endpoint_details)
endpoint = inferred_clusters[name]['endpoint']
if len(endpoint.split(':')) > 1:
inferred_clusters[name]['endpoint'] = endpoint.split(':')[0]
inferred_clusters[name]['port'] = int(endpoint.split(':')[1])
else:
inferred_clusters[name] = endpoint_details
print("updated clusters", inferred_clusters)
config_yaml = add_secret_key_to_llm_providers(config_yaml)
arch_llm_providers = config_yaml["llm_providers"]
arch_config_string = yaml.dump(config_yaml)
print("llm_providers:", arch_llm_providers)
data = {
'arch_config': arch_config_string,
'arch_clusters': inferred_clusters,
'arch_llm_providers': arch_llm_providers
}
rendered = template.render(data)
print(rendered)
print(ENVOY_CONFIG_FILE_RENDERED)
with open(ENVOY_CONFIG_FILE_RENDERED, 'w') as file:
file.write(rendered)

View file

@ -1,13 +1,11 @@
services:
archgw:
build:
context: ../
dockerfile: arch/Dockerfile
image: archgw:latest
ports:
- "10000:10000"
- "18080:9901"
- "19901:9901"
volumes:
- ${ARCH_CONFIG_FILE}:/config/arch_config.yaml
- ${ARCH_CONFIG_FILE:-./demos/function_calling/arch_confg.yaml}:/config/arch_config.yaml
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
- ./arch_log:/var/log/
depends_on:
@ -15,9 +13,7 @@ services:
condition: service_healthy
model_server:
build:
context: ../model_server
dockerfile: Dockerfile
image: model_server:latest
ports:
- "18081:80"
healthcheck:
@ -26,3 +22,8 @@ services:
retries: 20
volumes:
- ~/.cache/huggingface:/root/.cache/huggingface
environment:
- OLLAMA_ENDPOINT=${OLLAMA_ENDPOINT:-host.docker.internal}
- OLLAMA_MODEL=Arch-Function-Calling-3B-Q4_K_M
- MODE=${MODE:-cloud}
- FC_URL=${FC_URL:-https://arch-fc-free-trial-4mzywewe.uc.gateway.dev/v1}

View file

@ -170,7 +170,8 @@ static_resources:
{% else -%}
connect_timeout: 5s
{% endif -%}
type: STRICT_DNS
type: LOGICAL_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: {{ cluster.name }}

View file

@ -328,7 +328,7 @@ impl StreamContext {
if messages.len() >= 2 {
let latest_assistant_message = &messages[messages.len() - 2];
if let Some(model) = latest_assistant_message.model.as_ref() {
if model.starts_with("Arch") {
if model.contains("Arch") {
arch_assistant = true;
}
}

2
arch/tools/MANIFEST.in Normal file
View file

@ -0,0 +1,2 @@
include config/docker-compose.yaml
include config/arch_config_schema.yaml

28
arch/tools/README.md Normal file
View file

@ -0,0 +1,28 @@
## Setup Instructions: archgw CLI
This guide will walk you through the steps to set up the archgw cli on your local machine
### Step 1: Create a Python virtual environment
In the tools directory, create a Python virtual environment by running:
```bash
python -m venv venv
```
### Step 2: Activate the virtual environment
* On Linux/MacOS:
```bash
source venv/bin/activate
```
### Step 3: Run the build script
```bash
sh build-cli.sh
```
## Uninstall Instructions: archgw CLI
```bash
pip uninstall archgw
```

0
arch/tools/__init__.py Normal file
View file

22
arch/tools/build-cli.sh Normal file
View file

@ -0,0 +1,22 @@
#!/bin/bash
# Define paths
source_schema="../arch_config_schema.yaml"
source_compose="../docker-compose.yaml"
destination_dir="config"
# Ensure the destination directory exists only if it doesn't already
if [ ! -d "$destination_dir" ]; then
mkdir -p "$destination_dir"
echo "Directory $destination_dir created."
fi
# Copy the files
cp "$source_schema" "$destination_dir/arch_config_schema.yaml"
cp "$source_compose" "$destination_dir/docker-compose.yaml"
# Print success message
echo "Files copied successfully!"
echo "Building the cli"
pip install -e .

117
arch/tools/cli.py Normal file
View file

@ -0,0 +1,117 @@
import click
from core import start_arch, stop_arch
import targets
import os
import config_generator
import pkg_resources
import sys
import subprocess
logo = r"""
_ _
/ \ _ __ ___ | |__
/ _ \ | '__|/ __|| '_ \
/ ___ \ | | | (__ | | | |
/_/ \_\|_| \___||_| |_|
"""
@click.group(invoke_without_command=True)
@click.pass_context
def main(ctx):
if ctx.invoked_subcommand is None:
click.echo( """Arch (The Intelligent Prompt Gateway) CLI""")
click.echo(logo)
click.echo(ctx.get_help())
# Command to build archgw and model_server Docker images
ARCHGW_DOCKERFILE = "./arch/Dockerfile"
MODEL_SERVER_DOCKERFILE = "./model_server/Dockerfile"
@click.command()
def build():
"""Build Arch from source. Must be in root of cloned repo."""
# Check if /arch/Dockerfile exists
if os.path.exists(ARCHGW_DOCKERFILE):
click.echo("Building archgw image...")
try:
subprocess.run(["docker", "build", "-f", ARCHGW_DOCKERFILE, "-t", "archgw:latest", "."], check=True)
click.echo("archgw image built successfully.")
except subprocess.CalledProcessError as e:
click.echo(f"Error building archgw image: {e}")
sys.exit(1)
else:
click.echo("Error: Dockerfile not found in /arch")
sys.exit(1)
# Check if /model_server/Dockerfile exists
if os.path.exists(MODEL_SERVER_DOCKERFILE):
click.echo("Building model_server image...")
try:
subprocess.run(["docker", "build", "-f", MODEL_SERVER_DOCKERFILE, "-t", "model_server:latest", "./model_server"], check=True)
click.echo("model_server image built successfully.")
except subprocess.CalledProcessError as e:
click.echo(f"Error building model_server image: {e}")
sys.exit(1)
else:
click.echo("Error: Dockerfile not found in /model_server")
sys.exit(1)
click.echo("All images built successfully.")
@click.command()
@click.argument('file', required=False) # Optional file argument
@click.option('-path', default='.', help='Path to the directory containing arch_config.yml')
def up(file, path):
"""Starts Arch."""
if file:
# If a file is provided, process that file
arch_config_file = os.path.abspath(file)
else:
# If no file is provided, use the path and look for arch_config.yml
arch_config_file = os.path.abspath(os.path.join(path, "arch_config.yml"))
# Check if the file exists
if not os.path.exists(arch_config_file):
print(f"Error: {arch_config_file} does not exist.")
return
print(f"Processing config file: {arch_config_file}")
arch_schema_config = pkg_resources.resource_filename(__name__, "config/arch_config_schema.yaml")
print(f"Validating {arch_config_file}")
try:
config_generator.validate_prompt_config(arch_config_file=arch_config_file, arch_config_schema_file=arch_schema_config)
except Exception as e:
print("Exiting archgw up")
sys.exit(1)
print("Starting Arch gateway and Arch model server services via docker ")
start_arch(arch_config_file)
@click.command()
def down():
"""Stops Arch."""
stop_arch()
@click.command()
@click.option('-f', '--file', type=click.Path(exists=True), required=True, help="Path to the Python file")
def generate_prompt_targets(file):
"""Generats prompt_targets from python methods.
Note: This works for simple data types like ['int', 'float', 'bool', 'str', 'list', 'tuple', 'set', 'dict']:
If you have a complex pydantic data type, you will have to flatten those manually until we add support for it."""
print(f"Processing file: {file}")
if not file.endswith(".py"):
print("Error: Input file must be a .py file")
sys.exit(1)
targets.generate_prompt_targets(file)
main.add_command(up)
main.add_command(down)
main.add_command(build)
main.add_command(generate_prompt_targets)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,108 @@
import os
from jinja2 import Environment, FileSystemLoader
import yaml
from jsonschema import validate
ENVOY_CONFIG_TEMPLATE_FILE = os.getenv('ENVOY_CONFIG_TEMPLATE_FILE', 'envoy.template.yaml')
ARCH_CONFIG_FILE = os.getenv('ARCH_CONFIG_FILE', '/config/arch_config.yaml')
ENVOY_CONFIG_FILE_RENDERED = os.getenv('ENVOY_CONFIG_FILE_RENDERED', '/etc/envoy/envoy.yaml')
ARCH_CONFIG_SCHEMA_FILE = os.getenv('ARCH_CONFIG_SCHEMA_FILE', 'arch_config_schema.yaml')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', False)
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY', False)
def add_secret_key_to_llm_providers(config_yaml) :
llm_providers = []
for llm_provider in config_yaml.get("llm_providers", []):
if llm_provider['access_key'] == "$MISTRAL_API_KEY":
llm_provider['access_key'] = MISTRAL_API_KEY
elif llm_provider['access_key'] == "$OPENAI_API_KEY":
llm_provider['access_key'] = OPENAI_API_KEY
else:
llm_provider.pop('access_key')
llm_providers.append(llm_provider)
config_yaml["llm_providers"] = llm_providers
return config_yaml
def validate_and_render_schema():
env = Environment(loader=FileSystemLoader('./'))
template = env.get_template('envoy.template.yaml')
try:
validate_prompt_config(ARCH_CONFIG_FILE, ARCH_CONFIG_SCHEMA_FILE)
except Exception as e:
print(e)
exit(1) # validate_prompt_config failed. Exit
with open(ARCH_CONFIG_FILE, 'r') as file:
arch_config = file.read()
with open(ARCH_CONFIG_SCHEMA_FILE, 'r') as file:
arch_config_schema = file.read()
config_yaml = yaml.safe_load(arch_config)
config_schema_yaml = yaml.safe_load(arch_config_schema)
inferred_clusters = {}
for prompt_target in config_yaml["prompt_targets"]:
name = prompt_target.get("endpoint", {}).get("name", "")
if name not in inferred_clusters:
inferred_clusters[name] = {
"name": name,
"port": 80, # default port
}
print(inferred_clusters)
endpoints = config_yaml.get("endpoints", {})
# override the inferred clusters with the ones defined in the config
for name, endpoint_details in endpoints.items():
if name in inferred_clusters:
print("updating cluster", endpoint_details)
inferred_clusters[name].update(endpoint_details)
endpoint = inferred_clusters[name]['endpoint']
if len(endpoint.split(':')) > 1:
inferred_clusters[name]['endpoint'] = endpoint.split(':')[0]
inferred_clusters[name]['port'] = int(endpoint.split(':')[1])
else:
inferred_clusters[name] = endpoint_details
print("updated clusters", inferred_clusters)
config_yaml = add_secret_key_to_llm_providers(config_yaml)
arch_llm_providers = config_yaml["llm_providers"]
arch_config_string = yaml.dump(config_yaml)
print("llm_providers:", arch_llm_providers)
data = {
'arch_config': arch_config_string,
'arch_clusters': inferred_clusters,
'arch_llm_providers': arch_llm_providers
}
rendered = template.render(data)
print(rendered)
print(ENVOY_CONFIG_FILE_RENDERED)
with open(ENVOY_CONFIG_FILE_RENDERED, 'w') as file:
file.write(rendered)
def validate_prompt_config(arch_config_file, arch_config_schema_file):
with open(arch_config_file, 'r') as file:
arch_config = file.read()
with open(arch_config_schema_file, 'r') as file:
arch_config_schema = file.read()
config_yaml = yaml.safe_load(arch_config)
config_schema_yaml = yaml.safe_load(arch_config_schema)
try:
validate(config_yaml, config_schema_yaml)
except Exception as e:
print(f"Error validating arch_config file: {arch_config_file}, error: {e.message}")
raise e
if __name__ == '__main__':
validate_and_render_schema()

101
arch/tools/core.py Normal file
View file

@ -0,0 +1,101 @@
import subprocess
import os
import time
import pkg_resources
import select
from utils import run_docker_compose_ps, print_service_status, check_services_state
def start_arch(arch_config_file, log_timeout=120, check_interval=1):
"""
Start Docker Compose in detached mode and stream logs until services are healthy.
Args:
path (str): The path where the prompt_confi.yml file is located.
log_timeout (int): Time in seconds to show logs before checking for healthy state.
check_interval (int): Time in seconds between health status checks.
"""
# Set the ARCH_CONFIG_FILE environment variable
env = os.environ.copy()
env['ARCH_CONFIG_FILE'] = arch_config_file
compose_file = pkg_resources.resource_filename(__name__, 'docker-compose.yaml')
try:
# Run the Docker Compose command in detached mode (-d)
subprocess.run(
["docker-compose", "up", "-d"],
cwd=os.path.dirname(compose_file), # Ensure the Docker command runs in the correct path
env=env, # Pass the modified environment
check=True # Raise an exception if the command fails
)
print(f"Arch docker-compose started in detached.")
print("Monitoring `docker-compose ps` logs...")
start_time = time.time()
services_status = {}
services_running = False #assume that the services are not running at the moment
while True:
current_time = time.time()
elapsed_time = current_time - start_time
# Check if timeout is reached
if elapsed_time > log_timeout:
print(f"Stopping log monitoring after {log_timeout} seconds.")
break
current_services_status = run_docker_compose_ps(compose_file=compose_file, env=env)
if not current_services_status:
print("Status for the services could not be detected. Something went wrong. Please run docker logs")
break
if not services_status:
services_status = current_services_status #set the first time
print_service_status(services_status) #print the services status and proceed.
#check if anyone service is failed or exited state, if so print and break out
unhealthy_states = ["unhealthy", "exit", "exited", "dead", "bad"]
running_states = ["running", "up"]
if check_services_state(current_services_status, running_states):
print("Arch is up and running!")
break
if check_services_state(current_services_status, unhealthy_states):
print("One or more Arch services are unhealthy. Please run `docker logs` for more information")
print_service_status(current_services_status) #print the services status and proceed.
break
#check to see if the status of one of the services has changed from prior. Print and loop over until finish, or error
for service_name in services_status.item():
if services_status[service_name]['status'] != current_services_status[service_name]['status']:
print("One or more Arch services have changed state. Printing current state")
print_service_status(current_services_status)
break
services_status = current_services_status
except subprocess.CalledProcessError as e:
print(f"Failed to start Arch: {str(e)}")
def stop_arch():
"""
Shutdown all Docker Compose services by running `docker-compose down`.
Args:
path (str): The path where the docker-compose.yml file is located.
"""
compose_file = pkg_resources.resource_filename(__name__, 'docker-compose.yaml')
try:
# Run `docker-compose down` to shut down all services
subprocess.run(
["docker-compose", "down"],
cwd=os.path.dirname(compose_file),
check=True,
)
print("Successfully shut down all services.")
except subprocess.CalledProcessError as e:
print(f"Failed to shut down services: {str(e)}")

20
arch/tools/setup.py Normal file
View file

@ -0,0 +1,20 @@
from setuptools import setup, find_packages
setup(
name="archgw",
version="0.1.0",
description="Python-based CLI tool to manage Arch and generate targets.",
author="Katanemo Labs, Inc.",
packages=find_packages(),
py_modules = ['cli', 'core', 'targets', 'utils', 'config_generator'],
include_package_data=True,
package_data={
'': ['config/docker-compose.yaml', 'config/arch_config_schema.yaml'] #Specify to include the docker-compose.yml file
},
install_requires=['pyyaml', 'pydantic', 'click', 'jinja2','pyyaml','jsonschema'], # Add dependencies here, e.g., 'PyYAML' for YAML processing
entry_points={
'console_scripts': [
'archgw=cli:main',
],
},
)

297
arch/tools/targets.py Normal file
View file

@ -0,0 +1,297 @@
import ast
import sys
import yaml
from typing import Any
from pydantic import BaseModel
FLASK_ROUTE_DECORATORS = ["route", "get", "post", "put", "delete", "patch"]
FASTAPI_ROUTE_DECORATORS = ["get", "post", "put", "delete", "patch"]
def detect_framework(tree: Any) -> str:
"""Detect whether the file is using Flask or FastAPI based on imports."""
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if node.module == "flask":
return "flask"
elif node.module == "fastapi":
return "fastapi"
return "unknown"
def get_route_decorators(node: Any, framework: str) -> list:
"""Extract route decorators based on the framework."""
decorators = []
for decorator in node.decorator_list:
if isinstance(decorator, ast.Call) and isinstance(decorator.func, ast.Attribute):
if framework == "flask" and decorator.func.attr in FLASK_ROUTE_DECORATORS:
decorators.append(decorator.func.attr)
elif framework == "fastapi" and decorator.func.attr in FASTAPI_ROUTE_DECORATORS:
decorators.append(decorator.func.attr)
return decorators
def get_route_path(node: Any, framework: str) -> str:
"""Extract route path based on the framework."""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Call) and decorator.args:
return decorator.args[0].s # Assuming it's a string literal
def is_pydantic_model(annotation: ast.expr, tree: ast.AST) -> bool:
"""Check if a given type annotation is a Pydantic model."""
# We walk through the AST to find class definitions and check if they inherit from Pydantic's BaseModel
if isinstance(annotation, ast.Name):
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == annotation.id:
for base in node.bases:
if isinstance(base, ast.Name) and base.id == "BaseModel":
return True
return False
def get_pydantic_model_fields(model_name: str, tree: ast.AST) -> list:
"""Extract fields from a Pydantic model, handling list, tuple, set, dict types, and direct default values."""
fields = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == model_name:
for stmt in node.body:
if isinstance(stmt, ast.AnnAssign):
# Initialize the default field description
field_type = "Unknown: Please Fix This!"
description = "Field, description not present. Please fix."
default_value = None
required = True # Assume the field is required initially
# Check if the field uses Field() with required status and description
if stmt.value and isinstance(stmt.value, ast.Call) and isinstance(stmt.value.func, ast.Name) and stmt.value.func.id == 'Field':
# Extract the description argument inside the Field call
for keyword in stmt.value.keywords:
if keyword.arg == 'description' and isinstance(keyword.value, ast.Str):
description = keyword.value.s
if keyword.arg == 'default':
default_value = keyword.value
# If Ellipsis (...) is used, it means the field is required
if stmt.value.args and isinstance(stmt.value.args[0], ast.Constant) and stmt.value.args[0].value is Ellipsis:
required = True
else:
required = False
# Handle direct default values (e.g., name: str = "John Doe")
elif stmt.value is not None:
if isinstance(stmt.value, ast.Constant):
# Set the default value from the assignment (e.g., name: str = "John Doe")
default_value = stmt.value.value
required = False # Not required since it has a default value
# Always extract the field type, even if there's a default value
if isinstance(stmt.annotation, ast.Subscript):
# Get the base type (list, tuple, set, dict)
base_type = stmt.annotation.value.id if isinstance(stmt.annotation.value, ast.Name) else "Unknown"
# Handle only list, tuple, set, dict and ignore the inner types
if base_type.lower() in ['list', 'tuple', 'set', 'dict']:
field_type = base_type.lower()
# Handle the ellipsis '...' for required fields if no Field() call
elif isinstance(stmt.value, ast.Constant) and stmt.value.value is Ellipsis:
required = True
# Handle simple types like str, int, etc.
if isinstance(stmt.annotation, ast.Name):
field_type = stmt.annotation.id
field_info = {
"name": stmt.target.id,
"type": field_type, # Always set the field type
"description": description,
"default": default_value, # Handle direct default values
"required": required
}
fields.append(field_info)
return fields
def get_function_parameters(node: ast.FunctionDef, tree: ast.AST) -> list:
"""Extract the parameters and their types from the function definition."""
parameters = []
# Extract docstring to find descriptions
docstring = ast.get_docstring(node)
arg_descriptions = extract_arg_descriptions_from_docstring(docstring)
# Extract default values
defaults = [None] * (len(node.args.args) - len(node.args.defaults)) + node.args.defaults # Align defaults with args
for arg, default in zip(node.args.args, defaults):
if arg.arg != "self": # Skip 'self' or 'cls' in class methods
param_info = {"name": arg.arg, "description": arg_descriptions.get(arg.arg, "[ADD DESCRIPTION]")}
# Handle Pydantic model types
if hasattr(arg, 'annotation') and is_pydantic_model(arg.annotation, tree):
# Extract and flatten Pydantic model fields
pydantic_fields = get_pydantic_model_fields(arg.annotation.id, tree)
parameters.extend(pydantic_fields) # Flatten the model fields into the parameters list
continue # Skip adding the current param_info for the model since we expand the fields
# Handle standard Python types (int, float, str, etc.)
elif hasattr(arg, 'annotation') and isinstance(arg.annotation, ast.Name):
if arg.annotation.id in ['int', 'float', 'bool', 'str', 'list', 'tuple', 'set', 'dict']:
param_info["type"] = arg.annotation.id
else:
param_info["type"] = "[UNKNOWN - PLEASE FIX]"
# Handle generic subscript types (e.g., Optional, List[Type], etc.)
elif hasattr(arg, 'annotation') and isinstance(arg.annotation, ast.Subscript):
if isinstance(arg.annotation.value, ast.Name) and arg.annotation.value.id in ['list', 'tuple', 'set', 'dict']:
param_info["type"] = f"{arg.annotation.value.id}" # e.g., "List", "Tuple", etc.
else:
param_info["type"] = "[UNKNOWN - PLEASE FIX]"
# Default for unknown types
else:
param_info["type"] = "[UNKNOWN - PLEASE FIX]" # If unable to detect type
# Handle default values
if default is not None:
if isinstance(default, ast.Constant) or isinstance(default, ast.NameConstant):
param_info["default"] = default.value # Use the default value directly
else:
param_info["default"] = "[UNKNOWN DEFAULT]" # Unknown default type
param_info["required"] = False # Optional since it has a default value
else:
param_info["default"] = None
param_info["required"] = True # Required if no default value
parameters.append(param_info)
return parameters
def get_function_docstring(node: Any) -> str:
"""Extract the function's docstring description if present."""
# Check if the first node is a docstring
if isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Str):
# Get the entire docstring
full_docstring = node.body[0].value.s.strip()
# Split the docstring by double newlines (to separate description from fields like Args)
description = full_docstring.split("\n\n")[0].strip()
return description
return "No description provided."
def extract_arg_descriptions_from_docstring(docstring: str) -> dict:
"""Extract descriptions for function parameters from the 'Args' section of the docstring."""
descriptions = {}
if not docstring:
return descriptions
in_args_section = False
current_param = None
for line in docstring.splitlines():
line = line.strip()
# Detect the start of the 'Args' section
if line.startswith("Args:"):
in_args_section = True
continue # Proceed to the next line after 'Args:'
# End of 'Args' section if no indentation and no colon
if in_args_section and not line.startswith(" ") and ':' not in line:
break # Stop processing if we reach a new section
# Process lines in the 'Args' section
if in_args_section:
if ':' in line:
# Extract parameter name and description
param_name, description = line.split(':', 1)
descriptions[param_name.strip()] = description.strip()
current_param = param_name.strip()
elif current_param and line.startswith(" "):
# Handle multiline descriptions (indented lines)
descriptions[current_param] += f" {line.strip()}"
return descriptions
def generate_prompt_targets(input_file_path: str) -> None:
"""Introspect routes and generate YAML for either Flask or FastAPI."""
with open(input_file_path, "r") as source:
tree = ast.parse(source.read())
# Detect the framework (Flask or FastAPI)
framework = detect_framework(tree)
if framework == "unknown":
print("Could not detect Flask or FastAPI in the file.")
return
# Extract routes
routes = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
route_decorators = get_route_decorators(node, framework)
if route_decorators:
route_path = get_route_path(node, framework)
function_params = get_function_parameters(node, tree) # Get parameters for the route
function_docstring = get_function_docstring(node) # Extract docstring
routes.append({
'name': node.name,
'path': route_path,
'methods': route_decorators,
'parameters': function_params, # Add parameters to the route
'description': function_docstring # Add the docstring as the description
})
# Generate YAML structure
output_structure = {
"prompt_targets": []
}
for route in routes:
target = {
"name": route['name'],
"endpoint": [
{
"name": "app_server",
"path": route['path'],
}
],
"description": route['description'], # Use extracted docstring
"parameters": [
{
"name": param['name'],
"type": param['type'],
"description": f"{param['description']}",
**({"default": param['default']} if "default" in param and param['default'] is not None else {}), # Only add default if it's set
"required": param['required']
} for param in route['parameters']
]
}
if route['name'] == "default":
# Special case for `information_extraction` based on your YAML format
target["type"] = "default"
target["auto-llm-dispatch-on-response"] = True
output_structure["prompt_targets"].append(target)
# Output as YAML
print(yaml.dump(output_structure, sort_keys=False,default_flow_style=False, indent=3))
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python targets.py <input_file>")
sys.exit(1)
input_file = sys.argv[1]
# Automatically generate the output file name
if input_file.endswith(".py"):
output_file = input_file.replace(".py", "_prompt_targets.yml")
else:
print("Error: Input file must be a .py file")
sys.exit(1)
# Call the function with the input and generated output file names
generate_prompt_targets(input_file, output_file)
# Example usage:
# python targets.py api.yaml

View file

@ -0,0 +1,33 @@
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Dict, Set
app = FastAPI()
class User(BaseModel):
name: str = Field("John Doe", description="The name of the user.") # Default value and description for name
location: int = None
age: int = Field(30, description="The age of the user.") # Default value and description for age
tags: Set[str] = Field(default_factory=set, description="A set of tags associated with the user.") # Default empty set and description for tags
metadata: Dict[str, int] = Field(default_factory=dict, description="A dictionary storing metadata about the user, with string keys and integer values.") # Default empty dict and description for metadata
@app.get("/agent/default")
async def default(request: User):
"""
This endpoint handles information extraction queries.
It can summarize, extract details, and perform various other information-related tasks.
"""
return {"info": f"Query: {request.name}, Count: {request.age}"}
@app.post("/agent/action")
async def reboot_network_device(device_id: str, confirmation: str):
"""
This endpoint reboots a network device based on the device ID.
Confirmation is required to proceed with the reboot.
Args:
device_id: The device_id that you want to reboot.
confirmation: The confirmation that the user wants to reboot.
metadata: Ignore this parameter
"""
return {"status": "Device rebooted", "device_id": device_id}

View file

@ -0,0 +1,33 @@
prompt_targets:
- name: default
path: /agent/default
description: "This endpoint handles information extraction queries.\n It can\
\ summarize, extract details, and perform various other information-related tasks."
parameters:
- name: query
type: str
description: Field from Pydantic model DefaultRequest
default_value: null
required: false
- name: count
type: int
description: Field from Pydantic model DefaultRequest
default_value: null
required: false
type: default
auto-llm-dispatch-on-response: true
- name: reboot_network_device
path: /agent/action
description: "This endpoint reboots a network device based on the device ID.\n \
\ Confirmation is required to proceed with the reboot."
parameters:
- name: device_id
type: str
description: Description for device_id
default_value: ''
required: true
- name: confirmation
type: int
description: Description for confirmation
default_value: ''
required: true

79
arch/tools/utils.py Normal file
View file

@ -0,0 +1,79 @@
import subprocess
import os
import time
import select
import shlex
def run_docker_compose_ps(compose_file, env):
"""
Check if all Docker Compose services are in a healthy state.
Args:
path (str): The path where the docker-compose.yml file is located.
"""
try:
# Run `docker-compose ps` to get the health status of each service
ps_process = subprocess.Popen(
["docker-compose", "ps"],
cwd=os.path.dirname(compose_file),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env
)
# Capture the output of `docker-compose ps`
services_status, error_output = ps_process.communicate()
# Check if there is any error output
if error_output:
print(f"Error while checking service status:\n{error_output}", file=os.sys.stderr)
return {}
lines = services_status.strip().splitlines()
services = {}
# Skip the header row and parse each service
for line in lines[1:]:
parts = shlex.split(line)
if len(parts) >= 5:
service_name = parts[0] # Service name
status_index = 3 # Status is typically at index 3, but may have multiple words
# Check if the status has multiple words (e.g., "running (healthy)")
if '(' in parts[status_index+1] :
# Combine the status field if it's split over two parts
status = f"{parts[status_index]} {parts[status_index + 1]}"
ports = parts[status_index + 2]
else:
status = parts[status_index]
ports = parts[status_index + 1]
# Store both status and ports in a dictionary for each service
services[service_name] = {
'status': status,
'ports': ports
}
return services
except subprocess.CalledProcessError as e:
print(f"Failed to check service status. Error:\n{e.stderr}")
return e
#Helper method to print service status
def print_service_status(services):
print(f"{'Service Name':<25} {'Status':<20} {'Ports'}")
print("="*72)
for service_name, info in services.items():
status = info['status']
ports = info['ports']
print(f"{service_name:<25} {status:<20} {ports}")
#check for states based on the states passed in
def check_services_state(services, states):
for service_name, service_info in services.items():
status = service_info['status'].lower() # Convert status to lowercase for easier comparison
if any(state in status for state in states):
return True
return False