mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
use docker cli to communicate with docker system
This commit is contained in:
parent
4ec03af16e
commit
13a9e88904
5 changed files with 2030 additions and 177 deletions
|
|
@ -7,99 +7,24 @@ import docker
|
|||
from docker.errors import DockerException
|
||||
from cli.utils import getLogger, update_docker_host_env
|
||||
from cli.consts import (
|
||||
ARCHGW_DOCKER_IMAGE,
|
||||
ARCHGW_DOCKER_NAME,
|
||||
KATANEMO_LOCAL_MODEL_LIST,
|
||||
MODEL_SERVER_LOG_FILE,
|
||||
ACCESS_LOG_FILES,
|
||||
)
|
||||
from huggingface_hub import snapshot_download
|
||||
from dotenv import dotenv_values
|
||||
import subprocess
|
||||
from cli.docker_cli import (
|
||||
docker_container_status,
|
||||
docker_remove_container,
|
||||
docker_start_archgw_detached,
|
||||
docker_stop_container,
|
||||
health_check_endpoint,
|
||||
stream_gateway_logs,
|
||||
)
|
||||
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
|
||||
def start_archgw_docker(client, arch_config_file, env):
|
||||
logs_path = "~/archgw_logs"
|
||||
logs_path_abs = os.path.expanduser(logs_path)
|
||||
|
||||
return client.containers.run(
|
||||
name=ARCHGW_DOCKER_NAME,
|
||||
image=ARCHGW_DOCKER_IMAGE,
|
||||
detach=True, # Run in detached mode
|
||||
ports={
|
||||
"10000/tcp": 10000,
|
||||
"10001/tcp": 10001,
|
||||
"11000/tcp": 11000,
|
||||
"12000/tcp": 12000,
|
||||
"9901/tcp": 19901,
|
||||
},
|
||||
volumes={
|
||||
f"{arch_config_file}": {
|
||||
"bind": "/app/arch_config.yaml",
|
||||
"mode": "ro",
|
||||
},
|
||||
"/etc/ssl/cert.pem": {"bind": "/etc/ssl/cert.pem", "mode": "ro"},
|
||||
logs_path_abs: {"bind": "/var/log"},
|
||||
},
|
||||
environment={
|
||||
"OTEL_TRACING_HTTP_ENDPOINT": "http://host.docker.internal:4318/v1/traces",
|
||||
"MODEL_SERVER_PORT": os.getenv("MODEL_SERVER_PORT", "51000"),
|
||||
**env,
|
||||
},
|
||||
extra_hosts={"host.docker.internal": "host-gateway"},
|
||||
healthcheck={
|
||||
"test": ["CMD", "curl", "-f", "http://localhost:10000/healthz"],
|
||||
"interval": 5000000000, # 5 seconds
|
||||
"timeout": 1000000000, # 1 seconds
|
||||
"retries": 3,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def stream_gateway_logs(follow):
|
||||
"""
|
||||
Stream logs from the arch gateway service.
|
||||
"""
|
||||
log.info("Logs from arch gateway service.")
|
||||
|
||||
options = ["docker", "logs", "archgw"]
|
||||
if follow:
|
||||
options.append("-f")
|
||||
try:
|
||||
# Run `docker-compose logs` to stream logs from the gateway service
|
||||
subprocess.run(
|
||||
options,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
log.info(f"Failed to stream logs: {str(e)}")
|
||||
|
||||
|
||||
def stream_access_logs(follow):
|
||||
"""
|
||||
Get the archgw access logs
|
||||
"""
|
||||
log_file_pattern_expanded = os.path.expanduser(ACCESS_LOG_FILES)
|
||||
log_files = glob.glob(log_file_pattern_expanded)
|
||||
|
||||
stream_command = ["tail"]
|
||||
if follow:
|
||||
stream_command.append("-f")
|
||||
|
||||
stream_command.extend(log_files)
|
||||
subprocess.run(
|
||||
stream_command,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
||||
|
||||
def start_arch(arch_config_file, env, log_timeout=120, foreground=False):
|
||||
"""
|
||||
Start Docker Compose in detached mode and stream logs until services are healthy.
|
||||
|
|
@ -111,55 +36,47 @@ def start_arch(arch_config_file, env, log_timeout=120, foreground=False):
|
|||
log.info("Starting arch gateway")
|
||||
|
||||
try:
|
||||
try:
|
||||
client = docker.from_env()
|
||||
except DockerException as e:
|
||||
# try setting up the docker host environment variable and retry
|
||||
update_docker_host_env()
|
||||
client = docker.from_env()
|
||||
archgw_container_status = docker_container_status(ARCHGW_DOCKER_NAME)
|
||||
if archgw_container_status != "not found":
|
||||
log.info("archgw found in docker, stopping and removing it")
|
||||
docker_stop_container(ARCHGW_DOCKER_NAME)
|
||||
docker_remove_container(ARCHGW_DOCKER_NAME)
|
||||
|
||||
try:
|
||||
container = client.containers.get("archgw")
|
||||
log.info("archgw container found in docker, stopping and removing it")
|
||||
# ensure that previous docker container is stopped and removed
|
||||
container.stop()
|
||||
container.remove()
|
||||
log.info("Stopped and removed archgw container")
|
||||
except docker.errors.NotFound as e:
|
||||
pass
|
||||
|
||||
container = start_archgw_docker(client, arch_config_file, env)
|
||||
return_code, _, archgw_stderr = docker_start_archgw_detached(
|
||||
arch_config_file, os.path.expanduser("~/archgw_logs"), env
|
||||
)
|
||||
if return_code != 0:
|
||||
log.info("Failed to start arch gateway: " + str(return_code))
|
||||
log.info("stderr: " + archgw_stderr)
|
||||
sys.exit(1)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
while True:
|
||||
container = client.containers.get(container.id)
|
||||
health_check_status = health_check_endpoint(
|
||||
"http://localhost:10000/healthz"
|
||||
)
|
||||
archgw_status = docker_container_status(ARCHGW_DOCKER_NAME)
|
||||
current_time = time.time()
|
||||
elapsed_time = current_time - start_time
|
||||
|
||||
# Check if timeout is reached
|
||||
if elapsed_time > log_timeout:
|
||||
log.info(f"Stopping log monitoring after {log_timeout} seconds.")
|
||||
log.info(f"stopping log monitoring after {log_timeout} seconds.")
|
||||
break
|
||||
|
||||
container_status = container.attrs["State"]["Health"]["Status"]
|
||||
|
||||
if container_status == "healthy":
|
||||
log.info("Container is healthy!")
|
||||
if health_check_status:
|
||||
log.info("archgw is running and is healthy!")
|
||||
break
|
||||
else:
|
||||
log.info(f"Container health status: {container_status}")
|
||||
log.info(f"archgw status: {archgw_status}, health status: starting")
|
||||
time.sleep(1)
|
||||
|
||||
if foreground:
|
||||
for line in container.logs(stream=True):
|
||||
print(line.decode("utf-8").strip("\n"))
|
||||
stream_gateway_logs(follow=True)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
log.info("Keyboard interrupt received, stopping arch gateway service.")
|
||||
stop_arch()
|
||||
except docker.errors.APIError as e:
|
||||
log.info(f"Failed to start Arch: {str(e)}")
|
||||
|
||||
|
||||
def stop_arch():
|
||||
|
|
@ -173,10 +90,10 @@ def stop_arch():
|
|||
|
||||
try:
|
||||
subprocess.run(
|
||||
["docker", "stop", "archgw"],
|
||||
["docker", "stop", ARCHGW_DOCKER_NAME],
|
||||
)
|
||||
subprocess.run(
|
||||
["docker", "remove", "archgw"],
|
||||
["docker", "remove", ARCHGW_DOCKER_NAME],
|
||||
)
|
||||
|
||||
log.info("Successfully shut down arch gateway service.")
|
||||
|
|
|
|||
|
|
@ -5,10 +5,12 @@ import subprocess
|
|||
import multiprocessing
|
||||
import importlib.metadata
|
||||
from cli import targets
|
||||
from cli.docker_cli import stream_gateway_logs
|
||||
from cli.utils import (
|
||||
getLogger,
|
||||
get_llm_provider_access_keys,
|
||||
load_env_file_to_dict,
|
||||
stream_access_logs,
|
||||
validate_schema,
|
||||
)
|
||||
from cli.core import (
|
||||
|
|
@ -17,12 +19,9 @@ from cli.core import (
|
|||
start_arch,
|
||||
stop_arch,
|
||||
download_models_from_hf,
|
||||
stream_access_logs,
|
||||
stream_gateway_logs,
|
||||
)
|
||||
from cli.consts import (
|
||||
KATANEMO_DOCKERHUB_REPO,
|
||||
KATANEMO_LOCAL_MODEL_LIST,
|
||||
SERVICE_NAME_ARCHGW,
|
||||
SERVICE_NAME_MODEL_SERVER,
|
||||
SERVICE_ALL,
|
||||
|
|
@ -184,7 +183,10 @@ def up(file, path, service, foreground):
|
|||
log.info("Starting arch model server and arch gateway")
|
||||
|
||||
# Set the ARCH_CONFIG_FILE environment variable
|
||||
env_stage = {}
|
||||
env_stage = {
|
||||
"OTEL_TRACING_HTTP_ENDPOINT": "http://host.docker.internal:4318/v1/traces",
|
||||
"MODEL_SERVER_PORT": os.getenv("MODEL_SERVER_PORT", "51000"),
|
||||
}
|
||||
env = os.environ.copy()
|
||||
# check if access_keys are preesnt in the config file
|
||||
access_keys = get_llm_provider_access_keys(arch_config_file=arch_config_file)
|
||||
|
|
|
|||
|
|
@ -1,10 +1,13 @@
|
|||
import glob
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import yaml
|
||||
import logging
|
||||
import docker
|
||||
from docker.errors import DockerException
|
||||
|
||||
from cli.consts import ARCHGW_DOCKER_IMAGE, ARCHGW_DOCKER_NAME
|
||||
from cli.consts import ACCESS_LOG_FILES, ARCHGW_DOCKER_IMAGE
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
|
|
@ -127,3 +130,23 @@ def load_env_file_to_dict(file_path):
|
|||
env_dict[key] = value
|
||||
|
||||
return env_dict
|
||||
|
||||
|
||||
def stream_access_logs(follow):
|
||||
"""
|
||||
Get the archgw access logs
|
||||
"""
|
||||
log_file_pattern_expanded = os.path.expanduser(ACCESS_LOG_FILES)
|
||||
log_files = glob.glob(log_file_pattern_expanded)
|
||||
|
||||
stream_command = ["tail"]
|
||||
if follow:
|
||||
stream_command.append("-f")
|
||||
|
||||
stream_command.extend(log_files)
|
||||
subprocess.run(
|
||||
stream_command,
|
||||
check=True,
|
||||
stdout=sys.stdout,
|
||||
stderr=sys.stderr,
|
||||
)
|
||||
|
|
|
|||
2023
arch/tools/poetry.lock
generated
2023
arch/tools/poetry.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -15,8 +15,6 @@ click = "^8.1.7"
|
|||
jinja2 = "^3.1.4"
|
||||
jsonschema = "^4.23.0"
|
||||
setuptools = "75.5.0"
|
||||
docker = "^7.1.0"
|
||||
python-dotenv = "^1.0.1"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
archgw = "cli.main:main"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue