mirror of
https://github.com/katanemo/plano.git
synced 2026-05-04 05:12:55 +02:00
use docker cli to communicate to docker sub system (#412)
This commit is contained in:
parent
2f6c4348fd
commit
d0a783cca8
8 changed files with 2159 additions and 248 deletions
|
|
@ -2,104 +2,26 @@ import subprocess
|
|||
import os
|
||||
import time
|
||||
import sys
|
||||
import glob
|
||||
import docker
|
||||
from docker.errors import DockerException
|
||||
from cli.utils import getLogger, update_docker_host_env
|
||||
from cli.utils import getLogger
|
||||
from cli.consts import (
|
||||
ARCHGW_DOCKER_IMAGE,
|
||||
ARCHGW_DOCKER_NAME,
|
||||
KATANEMO_LOCAL_MODEL_LIST,
|
||||
MODEL_SERVER_LOG_FILE,
|
||||
ACCESS_LOG_FILES,
|
||||
)
|
||||
from huggingface_hub import snapshot_download
|
||||
from dotenv import dotenv_values
|
||||
import subprocess
|
||||
from cli.docker_cli import (
|
||||
docker_container_status,
|
||||
docker_remove_container,
|
||||
docker_start_archgw_detached,
|
||||
docker_stop_container,
|
||||
health_check_endpoint,
|
||||
stream_gateway_logs,
|
||||
)
|
||||
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
|
||||
def start_archgw_docker(client, arch_config_file, env):
|
||||
logs_path = "~/archgw_logs"
|
||||
logs_path_abs = os.path.expanduser(logs_path)
|
||||
|
||||
return client.containers.run(
|
||||
name=ARCHGW_DOCKER_NAME,
|
||||
image=ARCHGW_DOCKER_IMAGE,
|
||||
detach=True, # Run in detached mode
|
||||
ports={
|
||||
"10000/tcp": 10000,
|
||||
"10001/tcp": 10001,
|
||||
"11000/tcp": 11000,
|
||||
"12000/tcp": 12000,
|
||||
"9901/tcp": 19901,
|
||||
},
|
||||
volumes={
|
||||
f"{arch_config_file}": {
|
||||
"bind": "/app/arch_config.yaml",
|
||||
"mode": "ro",
|
||||
},
|
||||
"/etc/ssl/cert.pem": {"bind": "/etc/ssl/cert.pem", "mode": "ro"},
|
||||
logs_path_abs: {"bind": "/var/log"},
|
||||
},
|
||||
environment={
|
||||
"OTEL_TRACING_HTTP_ENDPOINT": "http://host.docker.internal:4318/v1/traces",
|
||||
"MODEL_SERVER_PORT": os.getenv("MODEL_SERVER_PORT", "51000"),
|
||||
**env,
|
||||
},
|
||||
extra_hosts={"host.docker.internal": "host-gateway"},
|
||||
healthcheck={
|
||||
"test": ["CMD", "curl", "-f", "http://localhost:10000/healthz"],
|
||||
"interval": 5000000000, # 5 seconds
|
||||
"timeout": 1000000000, # 1 seconds
|
||||
"retries": 3,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def stream_gateway_logs(follow):
|
||||
"""
|
||||
Stream logs from the arch gateway service.
|
||||
"""
|
||||
log.info("Logs from arch gateway service.")
|
||||
|
||||
options = ["docker", "logs", "archgw"]
|
||||
if follow:
|
||||
options.append("-f")
|
||||
try:
|
||||
# Run `docker-compose logs` to stream logs from the gateway service
|
||||
subprocess.run(
|
||||
options,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
log.info(f"Failed to stream logs: {str(e)}")
|
||||
|
||||
|
||||
def stream_access_logs(follow):
|
||||
"""
|
||||
Get the archgw access logs
|
||||
"""
|
||||
log_file_pattern_expanded = os.path.expanduser(ACCESS_LOG_FILES)
|
||||
log_files = glob.glob(log_file_pattern_expanded)
|
||||
|
||||
stream_command = ["tail"]
|
||||
if follow:
|
||||
stream_command.append("-f")
|
||||
|
||||
stream_command.extend(log_files)
|
||||
subprocess.run(
|
||||
stream_command,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
||||
|
||||
def start_arch(arch_config_file, env, log_timeout=120, foreground=False):
|
||||
"""
|
||||
Start Docker Compose in detached mode and stream logs until services are healthy.
|
||||
|
|
@ -111,55 +33,47 @@ def start_arch(arch_config_file, env, log_timeout=120, foreground=False):
|
|||
log.info("Starting arch gateway")
|
||||
|
||||
try:
|
||||
try:
|
||||
client = docker.from_env()
|
||||
except DockerException as e:
|
||||
# try setting up the docker host environment variable and retry
|
||||
update_docker_host_env()
|
||||
client = docker.from_env()
|
||||
archgw_container_status = docker_container_status(ARCHGW_DOCKER_NAME)
|
||||
if archgw_container_status != "not found":
|
||||
log.info("archgw found in docker, stopping and removing it")
|
||||
docker_stop_container(ARCHGW_DOCKER_NAME)
|
||||
docker_remove_container(ARCHGW_DOCKER_NAME)
|
||||
|
||||
try:
|
||||
container = client.containers.get("archgw")
|
||||
log.info("archgw container found in docker, stopping and removing it")
|
||||
# ensure that previous docker container is stopped and removed
|
||||
container.stop()
|
||||
container.remove()
|
||||
log.info("Stopped and removed archgw container")
|
||||
except docker.errors.NotFound as e:
|
||||
pass
|
||||
|
||||
container = start_archgw_docker(client, arch_config_file, env)
|
||||
return_code, _, archgw_stderr = docker_start_archgw_detached(
|
||||
arch_config_file, os.path.expanduser("~/archgw_logs"), env
|
||||
)
|
||||
if return_code != 0:
|
||||
log.info("Failed to start arch gateway: " + str(return_code))
|
||||
log.info("stderr: " + archgw_stderr)
|
||||
sys.exit(1)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
while True:
|
||||
container = client.containers.get(container.id)
|
||||
health_check_status = health_check_endpoint(
|
||||
"http://localhost:10000/healthz"
|
||||
)
|
||||
archgw_status = docker_container_status(ARCHGW_DOCKER_NAME)
|
||||
current_time = time.time()
|
||||
elapsed_time = current_time - start_time
|
||||
|
||||
# Check if timeout is reached
|
||||
if elapsed_time > log_timeout:
|
||||
log.info(f"Stopping log monitoring after {log_timeout} seconds.")
|
||||
log.info(f"stopping log monitoring after {log_timeout} seconds.")
|
||||
break
|
||||
|
||||
container_status = container.attrs["State"]["Health"]["Status"]
|
||||
|
||||
if container_status == "healthy":
|
||||
log.info("Container is healthy!")
|
||||
if health_check_status:
|
||||
log.info("archgw is running and is healthy!")
|
||||
break
|
||||
else:
|
||||
log.info(f"Container health status: {container_status}")
|
||||
log.info(f"archgw status: {archgw_status}, health status: starting")
|
||||
time.sleep(1)
|
||||
|
||||
if foreground:
|
||||
for line in container.logs(stream=True):
|
||||
print(line.decode("utf-8").strip("\n"))
|
||||
stream_gateway_logs(follow=True)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
log.info("Keyboard interrupt received, stopping arch gateway service.")
|
||||
stop_arch()
|
||||
except docker.errors.APIError as e:
|
||||
log.info(f"Failed to start Arch: {str(e)}")
|
||||
|
||||
|
||||
def stop_arch():
|
||||
|
|
@ -173,10 +87,10 @@ def stop_arch():
|
|||
|
||||
try:
|
||||
subprocess.run(
|
||||
["docker", "stop", "archgw"],
|
||||
["docker", "stop", ARCHGW_DOCKER_NAME],
|
||||
)
|
||||
subprocess.run(
|
||||
["docker", "remove", "archgw"],
|
||||
["docker", "remove", ARCHGW_DOCKER_NAME],
|
||||
)
|
||||
|
||||
log.info("Successfully shut down arch gateway service.")
|
||||
|
|
|
|||
118
arch/tools/cli/docker_cli.py
Normal file
118
arch/tools/cli/docker_cli.py
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
import subprocess
|
||||
import json
|
||||
import sys
|
||||
import requests # Add this import
|
||||
|
||||
from cli.consts import ARCHGW_DOCKER_IMAGE, ARCHGW_DOCKER_NAME
|
||||
from cli.utils import getLogger
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
|
||||
def docker_container_status(container: str) -> str:
|
||||
result = subprocess.run(
|
||||
["docker", "inspect", container], capture_output=True, text=True, check=False
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return "not found"
|
||||
return json.loads(result.stdout)[0]["State"]["Status"]
|
||||
|
||||
|
||||
def docker_stop_container(container: str) -> str:
|
||||
result = subprocess.run(
|
||||
["docker", "stop", container], capture_output=True, text=True, check=False
|
||||
)
|
||||
return result.returncode
|
||||
|
||||
|
||||
def docker_remove_container(container: str) -> str:
|
||||
result = subprocess.run(
|
||||
["docker", "remove", container], capture_output=True, text=True, check=False
|
||||
)
|
||||
return result.returncode
|
||||
|
||||
|
||||
def docker_start_archgw_detached(
|
||||
arch_config_file: str, logs_path_abs: str, env: dict
|
||||
) -> str:
|
||||
env_args = [item for key, value in env.items() for item in ["-e", f"{key}={value}"]]
|
||||
|
||||
port_mappings = ["10000:10000", "12000:12000", "9901:19901"]
|
||||
port_mappings_args = [item for port in port_mappings for item in ("-p", port)]
|
||||
|
||||
volume_mappings = [
|
||||
f"{logs_path_abs}:/var/log:rw",
|
||||
f"{arch_config_file}:/app/arch_config.yaml:ro",
|
||||
]
|
||||
volume_mappings_args = [
|
||||
item for volume in volume_mappings for item in ("-v", volume)
|
||||
]
|
||||
|
||||
options = [
|
||||
"docker",
|
||||
"run",
|
||||
"-d",
|
||||
"--name",
|
||||
ARCHGW_DOCKER_NAME,
|
||||
*port_mappings_args,
|
||||
*volume_mappings_args,
|
||||
*env_args,
|
||||
"--add-host",
|
||||
"host.docker.internal:host-gateway",
|
||||
ARCHGW_DOCKER_IMAGE,
|
||||
]
|
||||
|
||||
result = subprocess.run(options, capture_output=True, text=True, check=False)
|
||||
return result.returncode, result.stdout, result.stderr
|
||||
|
||||
|
||||
def health_check_endpoint(endpoint: str) -> bool:
|
||||
try:
|
||||
response = requests.get(endpoint)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except requests.RequestException as e:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def stream_gateway_logs(follow):
|
||||
"""
|
||||
Stream logs from the arch gateway service.
|
||||
"""
|
||||
log.info("Logs from arch gateway service.")
|
||||
|
||||
options = ["docker", "logs", ARCHGW_DOCKER_NAME]
|
||||
if follow:
|
||||
options.append("-f")
|
||||
try:
|
||||
# Run `docker-compose logs` to stream logs from the gateway service
|
||||
subprocess.run(
|
||||
options,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
log.info(f"Failed to stream logs: {str(e)}")
|
||||
|
||||
|
||||
def docker_validate_archgw_schema(arch_config_file):
|
||||
result = subprocess.run(
|
||||
[
|
||||
"docker",
|
||||
"run",
|
||||
"--rm",
|
||||
"-v",
|
||||
f"{arch_config_file}:/app/arch_config.yaml:ro",
|
||||
"--entrypoint",
|
||||
"python",
|
||||
ARCHGW_DOCKER_IMAGE,
|
||||
"config_generator.py",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
return result.returncode, result.stdout, result.stderr
|
||||
|
|
@ -5,11 +5,12 @@ import subprocess
|
|||
import multiprocessing
|
||||
import importlib.metadata
|
||||
from cli import targets
|
||||
from cli.docker_cli import docker_validate_archgw_schema, stream_gateway_logs
|
||||
from cli.utils import (
|
||||
getLogger,
|
||||
get_llm_provider_access_keys,
|
||||
load_env_file_to_dict,
|
||||
validate_schema,
|
||||
stream_access_logs,
|
||||
)
|
||||
from cli.core import (
|
||||
start_arch_modelserver,
|
||||
|
|
@ -17,12 +18,9 @@ from cli.core import (
|
|||
start_arch,
|
||||
stop_arch,
|
||||
download_models_from_hf,
|
||||
stream_access_logs,
|
||||
stream_gateway_logs,
|
||||
)
|
||||
from cli.consts import (
|
||||
KATANEMO_DOCKERHUB_REPO,
|
||||
KATANEMO_LOCAL_MODEL_LIST,
|
||||
SERVICE_NAME_ARCHGW,
|
||||
SERVICE_NAME_MODEL_SERVER,
|
||||
SERVICE_ALL,
|
||||
|
|
@ -174,17 +172,24 @@ def up(file, path, service, foreground):
|
|||
|
||||
log.info(f"Validating {arch_config_file}")
|
||||
|
||||
try:
|
||||
validate_schema(arch_config_file)
|
||||
except Exception as e:
|
||||
log.info(f"Exiting archgw up: validation failed")
|
||||
log.info(f"Error: {str(e)}")
|
||||
(
|
||||
validation_return_code,
|
||||
validation_stdout,
|
||||
validation_stderr,
|
||||
) = docker_validate_archgw_schema(arch_config_file)
|
||||
if validation_return_code != 0:
|
||||
log.info(f"Error: Validation failed. Exiting")
|
||||
log.info(f"Validation stdout: {validation_stdout}")
|
||||
log.info(f"Validation stderr: {validation_stderr}")
|
||||
sys.exit(1)
|
||||
|
||||
log.info("Starting arch model server and arch gateway")
|
||||
|
||||
# Set the ARCH_CONFIG_FILE environment variable
|
||||
env_stage = {}
|
||||
env_stage = {
|
||||
"OTEL_TRACING_HTTP_ENDPOINT": "http://host.docker.internal:4318/v1/traces",
|
||||
"MODEL_SERVER_PORT": os.getenv("MODEL_SERVER_PORT", "51000"),
|
||||
}
|
||||
env = os.environ.copy()
|
||||
# check if access_keys are preesnt in the config file
|
||||
access_keys = get_llm_provider_access_keys(arch_config_file=arch_config_file)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ import ast
|
|||
import sys
|
||||
import yaml
|
||||
from typing import Any
|
||||
from pydantic import BaseModel
|
||||
|
||||
FLASK_ROUTE_DECORATORS = ["route", "get", "post", "put", "delete", "patch"]
|
||||
FASTAPI_ROUTE_DECORATORS = ["get", "post", "put", "delete", "patch"]
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
import glob
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import yaml
|
||||
import logging
|
||||
import docker
|
||||
from docker.errors import DockerException
|
||||
|
||||
from cli.consts import ARCHGW_DOCKER_IMAGE, ARCHGW_DOCKER_NAME
|
||||
from cli.consts import ACCESS_LOG_FILES
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
|
|
@ -21,63 +22,6 @@ def getLogger(name="cli"):
|
|||
log = getLogger(__name__)
|
||||
|
||||
|
||||
def update_docker_host_env():
|
||||
"""
|
||||
Update DOCKER_HOST environment variable to use the local Docker socket
|
||||
"""
|
||||
if os.getenv("DOCKER_HOST"):
|
||||
return
|
||||
|
||||
default_docker_socket = os.getenv("DEFAULT_DOCKER_SOCKET", "/var/run/docker.sock")
|
||||
if not os.path.exists(default_docker_socket):
|
||||
home_dir = os.getenv("HOME")
|
||||
docker_host = f"unix://{home_dir}/.docker/run/docker.sock"
|
||||
log.info(
|
||||
f"Default docker socket {default_docker_socket} not found, using {docker_host}"
|
||||
)
|
||||
os.environ["DOCKER_HOST"] = docker_host
|
||||
|
||||
|
||||
def validate_schema(arch_config_file: str) -> None:
|
||||
try:
|
||||
try:
|
||||
client = docker.from_env()
|
||||
except DockerException as e:
|
||||
# try setting up the docker host environment variable and retry
|
||||
update_docker_host_env()
|
||||
client = docker.from_env()
|
||||
|
||||
container = client.containers.run(
|
||||
image=ARCHGW_DOCKER_IMAGE,
|
||||
volumes={
|
||||
f"{arch_config_file}": {
|
||||
"bind": "/app/arch_config.yaml",
|
||||
"mode": "ro",
|
||||
},
|
||||
},
|
||||
entrypoint=["python", "config_generator.py"],
|
||||
detach=True,
|
||||
)
|
||||
|
||||
# Wait for the container to finish and get the exit code
|
||||
exit_code = container.wait()
|
||||
|
||||
# Check exit code for validation success
|
||||
if exit_code["StatusCode"] != 0:
|
||||
# Validation failed (non-zero exit code)
|
||||
logs = container.logs().decode() # Get container logs for debugging
|
||||
raise ValueError(
|
||||
f"Validation failed. Container exited with code {exit_code}.\nLogs:\n{logs}"
|
||||
)
|
||||
|
||||
# Successful validation (exit code 0)
|
||||
log.info("Schema validation successful!")
|
||||
|
||||
except docker.errors.APIError as e:
|
||||
# Handle container creation error
|
||||
raise ValueError(f"Failed to create container: {e}")
|
||||
|
||||
|
||||
def get_llm_provider_access_keys(arch_config_file):
|
||||
with open(arch_config_file, "r") as file:
|
||||
arch_config = file.read()
|
||||
|
|
@ -127,3 +71,23 @@ def load_env_file_to_dict(file_path):
|
|||
env_dict[key] = value
|
||||
|
||||
return env_dict
|
||||
|
||||
|
||||
def stream_access_logs(follow):
|
||||
"""
|
||||
Get the archgw access logs
|
||||
"""
|
||||
log_file_pattern_expanded = os.path.expanduser(ACCESS_LOG_FILES)
|
||||
log_files = glob.glob(log_file_pattern_expanded)
|
||||
|
||||
stream_command = ["tail"]
|
||||
if follow:
|
||||
stream_command.append("-f")
|
||||
|
||||
stream_command.extend(log_files)
|
||||
subprocess.run(
|
||||
stream_command,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue