nomyo-router/test/load/loadtest.py

803 lines
33 KiB
Python

#!/usr/bin/env python3
"""
NOMYO Router load test — asyncio + httpx driver with a built-in mock backend.
Three modes
-----------
1. Drive an already-running router (default)::
python test/load/loadtest.py --url http://127.0.0.1:12434 \
--concurrency 64 --duration 30 --stream
2. Fully self-contained "mock backend" mode — spins up a fast fake Ollama/OpenAI
backend AND the router (wired to that backend via a temp config), load-tests
them, then tears both down. This isolates the *router's* proxy overhead from
real GPU compute, so the numbers tell you how many concurrent connections the
router itself can sustain on this machine::
python test/load/loadtest.py --mock-backend \
--concurrency 128 --duration 30 --stream
3. Run just the mock backend (point your own router config at it)::
python test/load/loadtest.py --serve-mock --mock-port 11434
Both streaming and non-streaming are supported (--stream / --no-stream), against
either the Ollama API (--api ollama -> POST /api/chat) or the OpenAI-compatible
API (--api openai -> POST /v1/chat/completions).
Finding the concurrency ceiling
-------------------------------
Use --ramp to sweep concurrency levels and print a table; the "knee" is where
p99 latency climbs sharply or req/s stops increasing::
python test/load/loadtest.py --mock-backend --stream \
--ramp 16,32,64,128,256 --duration 15
The mock backend is a configurable "fake GPU": --mock-ttft-ms (prefill latency),
--mock-tokens (completion length) and --mock-tok-ms (per-token decode delay) let
you model anything from an instant echo (measure pure proxy overhead) to a slow,
long-streaming model (measure how many slow streams the box holds open).
"""
from __future__ import annotations
import argparse
import asyncio
import contextlib
import json
import math
import os
import signal
import socket
import statistics
import sys
import tempfile
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import httpx
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
_WORDS = (
"the quick brown fox jumps over the lazy dog while a router proxies many "
"concurrent streaming completions across several ollama and openai backends "
"without dropping a single token under sustained synthetic load testing"
).split()
def _gen_text(n_tokens: int) -> str:
"""Deterministic pseudo-completion of roughly ``n_tokens`` space-separated tokens."""
return " ".join(_WORDS[i % len(_WORDS)] for i in range(max(0, n_tokens)))
def _rfc3339_now() -> str:
# Ollama-style timestamp, e.g. 2024-01-01T00:00:00.000000Z
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z"
def _count_prompt_tokens(messages: list) -> int:
total = 0
for m in messages or []:
c = m.get("content")
if isinstance(c, str):
total += len(c.split())
elif isinstance(c, list):
for part in c:
if isinstance(part, dict) and isinstance(part.get("text"), str):
total += len(part["text"].split())
return max(1, total)
def _free_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
s.close()
return port
# ===========================================================================
# Mock backend (a fast, configurable fake Ollama + OpenAI-compatible server)
# ===========================================================================
def build_mock_app(models: list[str], ttft_ms: float, tokens: int, tok_ms: float):
"""Construct the aiohttp mock-backend application.
Serves the native-Ollama surface the router uses for discovery and the
`/api/chat` path (`/api/version`, `/api/tags`, `/api/ps`, `/api/chat`,
`/api/generate`) plus the OpenAI-compatible surface used by the
`/v1/chat/completions` path (`/v1/models`, `/v1/chat/completions`,
`/v1/completions`).
"""
from aiohttp import web # imported lazily so the driver has no hard aiohttp dep
ttft = ttft_ms / 1000.0
tok_delay = tok_ms / 1000.0
def _tag_entry(name: str) -> dict:
return {
"name": name,
"model": name,
"modified_at": _rfc3339_now(),
"size": 4_000_000_000,
"digest": "0" * 64,
"details": {
"parent_model": "",
"format": "gguf",
"family": "mock",
"families": ["mock"],
"parameter_size": "7B",
"quantization_level": "Q4_0",
},
}
async def version(_req):
return web.json_response({"version": "0.0.0-nomyo-mock"})
async def tags(_req):
return web.json_response({"models": [_tag_entry(m) for m in models]})
async def ps(_req):
# Report every advertised model as loaded with VRAM so choose_endpoint
# treats this endpoint as "loaded + free".
out = []
for m in models:
e = _tag_entry(m)
e["size_vram"] = e["size"]
e["expires_at"] = "2999-01-01T00:00:00Z"
out.append(e)
return web.json_response({"models": out})
async def v1_models(_req):
now = int(time.time())
return web.json_response({
"object": "list",
"data": [{"id": m, "object": "model", "created": now, "owned_by": "mock"} for m in models],
})
# ----- Ollama /api/chat -------------------------------------------------
async def api_chat(req):
payload = await req.json()
model = payload.get("model", models[0] if models else "mock")
stream = payload.get("stream", True)
prompt_tok = _count_prompt_tokens(payload.get("messages", []))
t0 = time.perf_counter()
if stream:
resp = web.StreamResponse(
status=200, headers={"Content-Type": "application/x-ndjson"}
)
await resp.prepare(req)
if ttft:
await asyncio.sleep(ttft)
for i in range(tokens):
if tok_delay and i:
await asyncio.sleep(tok_delay)
line = {
"model": model,
"created_at": _rfc3339_now(),
"message": {"role": "assistant", "content": _WORDS[i % len(_WORDS)] + " "},
"done": False,
}
await resp.write(json.dumps(line).encode() + b"\n")
dur_ns = int((time.perf_counter() - t0) * 1e9)
final = {
"model": model,
"created_at": _rfc3339_now(),
"message": {"role": "assistant", "content": ""},
"done": True,
"done_reason": "stop",
"total_duration": dur_ns,
"load_duration": 0,
"prompt_eval_count": prompt_tok,
"prompt_eval_duration": int(ttft * 1e9),
"eval_count": tokens,
"eval_duration": dur_ns,
}
await resp.write(json.dumps(final).encode() + b"\n")
await resp.write_eof()
return resp
# non-streaming: simulate the whole generation latency, then one object
await asyncio.sleep(ttft + tokens * tok_delay)
dur_ns = int((time.perf_counter() - t0) * 1e9)
return web.json_response({
"model": model,
"created_at": _rfc3339_now(),
"message": {"role": "assistant", "content": _gen_text(tokens)},
"done": True,
"done_reason": "stop",
"total_duration": dur_ns,
"load_duration": 0,
"prompt_eval_count": prompt_tok,
"prompt_eval_duration": int(ttft * 1e9),
"eval_count": tokens,
"eval_duration": dur_ns,
})
# ----- Ollama /api/generate --------------------------------------------
async def api_generate(req):
payload = await req.json()
model = payload.get("model", models[0] if models else "mock")
stream = payload.get("stream", True)
prompt_tok = max(1, len(str(payload.get("prompt", "")).split()))
t0 = time.perf_counter()
if stream:
resp = web.StreamResponse(status=200, headers={"Content-Type": "application/x-ndjson"})
await resp.prepare(req)
if ttft:
await asyncio.sleep(ttft)
for i in range(tokens):
if tok_delay and i:
await asyncio.sleep(tok_delay)
await resp.write(json.dumps({
"model": model, "created_at": _rfc3339_now(),
"response": _WORDS[i % len(_WORDS)] + " ", "done": False,
}).encode() + b"\n")
dur_ns = int((time.perf_counter() - t0) * 1e9)
await resp.write(json.dumps({
"model": model, "created_at": _rfc3339_now(), "response": "", "done": True,
"done_reason": "stop", "total_duration": dur_ns,
"prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns,
}).encode() + b"\n")
await resp.write_eof()
return resp
await asyncio.sleep(ttft + tokens * tok_delay)
dur_ns = int((time.perf_counter() - t0) * 1e9)
return web.json_response({
"model": model, "created_at": _rfc3339_now(), "response": _gen_text(tokens),
"done": True, "done_reason": "stop", "total_duration": dur_ns,
"prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns,
})
# ----- OpenAI /v1/chat/completions -------------------------------------
async def v1_chat(req):
payload = await req.json()
model = payload.get("model", models[0] if models else "mock")
stream = payload.get("stream", False)
want_usage = bool((payload.get("stream_options") or {}).get("include_usage"))
prompt_tok = _count_prompt_tokens(payload.get("messages", []))
created = int(time.time())
cid = "chatcmpl-mock"
if stream:
resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"})
await resp.prepare(req)
if ttft:
await asyncio.sleep(ttft)
def _sse(obj: dict) -> bytes:
return b"data: " + json.dumps(obj).encode() + b"\n\n"
# first chunk carries the role
await resp.write(_sse({
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}],
}))
for i in range(tokens):
if tok_delay and i:
await asyncio.sleep(tok_delay)
await resp.write(_sse({
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": [{"index": 0, "delta": {"content": _WORDS[i % len(_WORDS)] + " "}, "finish_reason": None}],
}))
await resp.write(_sse({
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}))
if want_usage:
await resp.write(_sse({
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": [],
"usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens,
"total_tokens": prompt_tok + tokens},
}))
await resp.write(b"data: [DONE]\n\n")
await resp.write_eof()
return resp
await asyncio.sleep(ttft + tokens * tok_delay)
return web.json_response({
"id": cid, "object": "chat.completion", "created": created, "model": model,
"choices": [{"index": 0, "message": {"role": "assistant", "content": _gen_text(tokens)},
"finish_reason": "stop", "logprobs": None}],
"usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens,
"total_tokens": prompt_tok + tokens},
})
# ----- OpenAI /v1/completions ------------------------------------------
async def v1_completions(req):
payload = await req.json()
model = payload.get("model", models[0] if models else "mock")
stream = payload.get("stream", False)
prompt_tok = max(1, len(str(payload.get("prompt", "")).split()))
created = int(time.time())
cid = "cmpl-mock"
if stream:
resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"})
await resp.prepare(req)
if ttft:
await asyncio.sleep(ttft)
for i in range(tokens):
if tok_delay and i:
await asyncio.sleep(tok_delay)
await resp.write(b"data: " + json.dumps({
"id": cid, "object": "text_completion", "created": created, "model": model,
"choices": [{"index": 0, "text": _WORDS[i % len(_WORDS)] + " ", "finish_reason": None}],
}).encode() + b"\n\n")
await resp.write(b"data: [DONE]\n\n")
await resp.write_eof()
return resp
await asyncio.sleep(ttft + tokens * tok_delay)
return web.json_response({
"id": cid, "object": "text_completion", "created": created, "model": model,
"choices": [{"index": 0, "text": _gen_text(tokens), "finish_reason": "stop"}],
"usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens,
"total_tokens": prompt_tok + tokens},
})
app = web.Application(client_max_size=64 * 1024 * 1024)
app.add_routes([
web.get("/api/version", version),
web.get("/api/tags", tags),
web.get("/api/ps", ps),
web.post("/api/chat", api_chat),
web.post("/api/generate", api_generate),
web.get("/v1/models", v1_models),
web.post("/v1/chat/completions", v1_chat),
web.post("/v1/completions", v1_completions),
])
return app
def serve_mock(args) -> None:
from aiohttp import web
models = [m.strip() for m in args.mock_models.split(",") if m.strip()]
app = build_mock_app(models, args.mock_ttft_ms, args.mock_tokens, args.mock_tok_ms)
print(f"[mock] serving models={models} on http://{args.mock_host}:{args.mock_port} "
f"(ttft={args.mock_ttft_ms}ms tokens={args.mock_tokens} tok={args.mock_tok_ms}ms)",
flush=True)
web.run_app(app, host=args.mock_host, port=args.mock_port, print=None)
# ===========================================================================
# Load driver
# ===========================================================================
@dataclass
class Sample:
ok: bool
status: int
latency: float # full request wall time (s)
ttft: Optional[float] # time-to-first-byte for streaming (s), else None
err: Optional[str] = None
@dataclass
class Stats:
concurrency: int
wall: float = 0.0
samples: list = field(default_factory=list)
@property
def ok(self) -> list:
return [s for s in self.samples if s.ok]
@property
def n_total(self) -> int:
return len(self.samples)
@property
def n_ok(self) -> int:
return len(self.ok)
def _pct(values: list[float], p: float) -> float:
if not values:
return float("nan")
s = sorted(values)
if len(s) == 1:
return s[0]
k = (len(s) - 1) * (p / 100.0)
lo = math.floor(k)
hi = math.ceil(k)
if lo == hi:
return s[int(k)]
return s[lo] + (s[hi] - s[lo]) * (k - lo)
def _build_request(args):
"""Return (path, json_payload) for a single request."""
messages = [{"role": "user", "content": args.prompt}]
if args.api == "openai":
path = "/v1/chat/completions"
body = {"model": args.model, "messages": messages, "stream": args.stream}
else:
path = "/api/chat"
body = {"model": args.model, "messages": messages, "stream": args.stream}
return path, body
async def _one_request(client: httpx.AsyncClient, url: str, body: dict, stream: bool) -> Sample:
t0 = time.perf_counter()
try:
if stream:
ttft = None
async with client.stream("POST", url, json=body) as resp:
status = resp.status_code
async for _chunk in resp.aiter_bytes():
if ttft is None:
ttft = time.perf_counter() - t0
# drain complete
lat = time.perf_counter() - t0
ok = 200 <= status < 300
return Sample(ok=ok, status=status, latency=lat, ttft=ttft,
err=None if ok else f"HTTP {status}")
else:
resp = await client.post(url, json=body)
lat = time.perf_counter() - t0
ok = 200 <= resp.status_code < 300
# touch body so the full response is received
_ = resp.content
return Sample(ok=ok, status=resp.status_code, latency=lat, ttft=None,
err=None if ok else f"HTTP {resp.status_code}")
except Exception as e: # noqa: BLE001 — record any transport error as a failed sample
lat = time.perf_counter() - t0
return Sample(ok=False, status=0, latency=lat, ttft=None,
err=f"{type(e).__name__}: {str(e)[:120]}")
async def run_stage(args, concurrency: int) -> Stats:
path, body = _build_request(args)
url = args.url.rstrip("/") + path
stats = Stats(concurrency=concurrency)
limits = httpx.Limits(max_connections=concurrency + 50,
max_keepalive_connections=concurrency + 50)
timeout = httpx.Timeout(args.timeout, connect=15.0)
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
# warmup (unmeasured): make a few requests so caches/connections are hot
if args.warmup > 0:
warm_deadline = time.perf_counter() + args.warmup
async def _warm():
while time.perf_counter() < warm_deadline:
await _one_request(client, url, body, args.stream)
await asyncio.gather(*[_warm() for _ in range(min(concurrency, 8))])
use_duration = args.requests is None
deadline = time.perf_counter() + args.duration if use_duration else None
remaining = args.requests if not use_duration else None
remaining_lock = asyncio.Lock()
async def worker():
nonlocal remaining
while True:
if use_duration:
if time.perf_counter() >= deadline:
return
else:
async with remaining_lock:
if remaining <= 0:
return
remaining -= 1
s = await _one_request(client, url, body, args.stream)
stats.samples.append(s)
wall0 = time.perf_counter()
await asyncio.gather(*[worker() for _ in range(concurrency)])
stats.wall = time.perf_counter() - wall0
return stats
def _print_stage(stats: Stats, args, header: bool) -> None:
lat = [s.latency * 1000 for s in stats.ok]
ttfts = [s.ttft * 1000 for s in stats.ok if s.ttft is not None]
rps = stats.n_ok / stats.wall if stats.wall else 0.0
errs = stats.n_total - stats.n_ok
if args.ramp:
if header:
cols = f"{'conc':>5} {'req':>7} {'ok':>7} {'err':>5} {'req/s':>9} " \
f"{'p50ms':>8} {'p90ms':>8} {'p99ms':>9} {'maxms':>9}"
if args.stream:
cols += f" {'ttftP50':>8} {'ttftP99':>8}"
print(cols)
print("-" * len(cols))
row = (f"{stats.concurrency:>5} {stats.n_total:>7} {stats.n_ok:>7} {errs:>5} "
f"{rps:>9.1f} {_pct(lat,50):>8.1f} {_pct(lat,90):>8.1f} "
f"{_pct(lat,99):>9.1f} {(max(lat) if lat else float('nan')):>9.1f}")
if args.stream:
row += f" {_pct(ttfts,50):>8.1f} {_pct(ttfts,99):>8.1f}"
print(row, flush=True)
return
# single-stage detailed report
print(f"\n=== Results (concurrency={stats.concurrency}, "
f"{'stream' if args.stream else 'non-stream'}, api={args.api}) ===")
print(f" wall time : {stats.wall:8.2f} s")
print(f" requests : {stats.n_total} total, {stats.n_ok} ok, {errs} failed")
print(f" throughput : {rps:8.1f} req/s")
if lat:
print(f" latency p50 : {_pct(lat,50):8.1f} ms")
print(f" p90 : {_pct(lat,90):8.1f} ms")
print(f" p95 : {_pct(lat,95):8.1f} ms")
print(f" p99 : {_pct(lat,99):8.1f} ms")
print(f" max : {max(lat):8.1f} ms")
print(f" mean : {statistics.mean(lat):8.1f} ms")
if ttfts:
print(f" TTFT p50 : {_pct(ttfts,50):8.1f} ms")
print(f" p90 : {_pct(ttfts,90):8.1f} ms")
print(f" p99 : {_pct(ttfts,99):8.1f} ms")
if errs:
by_err: dict[str, int] = {}
for s in stats.samples:
if not s.ok:
by_err[s.err or "unknown"] = by_err.get(s.err or "unknown", 0) + 1
print(" errors:")
for k, v in sorted(by_err.items(), key=lambda kv: -kv[1]):
print(f" {v:>6} {k}")
async def run_driver(args) -> list[Stats]:
stages = ([int(x) for x in args.ramp.split(",")] if args.ramp else [args.concurrency])
results: list[Stats] = []
for i, c in enumerate(stages):
stats = await run_stage(args, c)
_print_stage(stats, args, header=(i == 0))
results.append(stats)
return results
# ===========================================================================
# Orchestration: --mock-backend (spawn mock + router, run, tear down)
# ===========================================================================
PROJECT_ROOT = Path(__file__).resolve().parents[2]
async def _wait_http_ok(url: str, timeout: float, accept=(200,)) -> bool:
deadline = time.perf_counter() + timeout
async with httpx.AsyncClient(timeout=5.0) as client:
while time.perf_counter() < deadline:
try:
r = await client.get(url)
if r.status_code in accept:
return True
except Exception:
pass
await asyncio.sleep(0.25)
return False
def _write_temp_config(mock_url: str, models: list[str], max_conc: int) -> Path:
fd, path = tempfile.mkstemp(prefix="nomyo_loadtest_", suffix=".yaml")
os.close(fd)
cfg = (
"# Auto-generated by test/load/loadtest.py --mock-backend. Safe to delete.\n"
"endpoints:\n"
f" - {mock_url}\n"
"llama_server_endpoints: []\n"
f"max_concurrent_connections: {max_conc}\n"
"priority_routing: false\n"
"conversation_affinity: false\n"
"cache_enabled: false\n"
"nomyo-router-api-key: \"\"\n"
"api_keys:\n"
f" \"{mock_url}\": \"mock\"\n"
)
Path(path).write_text(cfg)
return Path(path)
async def run_with_mock_backend(args) -> list[Stats]:
mock_port = args.mock_port or _free_port()
router_port = args.router_port or _free_port()
mock_url = f"http://127.0.0.1:{mock_port}"
router_url = f"http://127.0.0.1:{router_port}"
models = [m.strip() for m in args.mock_models.split(",") if m.strip()]
# Size the router's per-endpoint admission limit so it does not artificially
# serialize the load (unless the user explicitly wants to measure that).
peak = max([int(x) for x in args.ramp.split(",")]) if args.ramp else args.concurrency
max_conc = args.router_max_conc if args.router_max_conc else max(peak, 1)
cfg_path = _write_temp_config(mock_url, models, max_conc)
db_path = Path(tempfile.gettempdir()) / f"nomyo_loadtest_{os.getpid()}.db"
env = dict(os.environ)
env["NOMYO_ROUTER_CONFIG_PATH"] = str(cfg_path)
env["NOMYO_ROUTER_DB_PATH"] = str(db_path)
mock_proc = None
router_proc = None
try:
# 1. mock backend first, so the router never caches it as "down"
mock_cmd = [
sys.executable, str(Path(__file__).resolve()), "--serve-mock",
"--mock-host", "127.0.0.1", "--mock-port", str(mock_port),
"--mock-models", args.mock_models,
"--mock-ttft-ms", str(args.mock_ttft_ms),
"--mock-tokens", str(args.mock_tokens),
"--mock-tok-ms", str(args.mock_tok_ms),
]
print(f"[orchestrator] starting mock backend: {mock_url}", flush=True)
mock_proc = await asyncio.create_subprocess_exec(*mock_cmd)
if not await _wait_http_ok(f"{mock_url}/api/version", timeout=15):
raise RuntimeError("mock backend did not become ready")
# 2. router
router_cmd = [
sys.executable, "-m", "uvicorn", "router:app",
"--host", "127.0.0.1", "--port", str(router_port),
# Per-request access logging is pure noise (and overhead) under load.
"--no-access-log",
]
if args.router_workers and args.router_workers > 1:
router_cmd += ["--workers", str(args.router_workers)]
print(f"[orchestrator] starting router: {router_url} "
f"(workers={args.router_workers}, max_concurrent_connections={max_conc})", flush=True)
router_proc = await asyncio.create_subprocess_exec(
*router_cmd, cwd=str(PROJECT_ROOT), env=env
)
# /health returns 200 only once it can reach the (healthy) mock backend
if not await _wait_http_ok(f"{router_url}/health", timeout=40, accept=(200,)):
raise RuntimeError("router did not become healthy")
print("[orchestrator] router healthy — starting load\n", flush=True)
# 3. drive load against the router
args.url = router_url
return await run_driver(args)
finally:
for name, proc in (("router", router_proc), ("mock", mock_proc)):
if proc and proc.returncode is None:
with contextlib.suppress(ProcessLookupError):
proc.send_signal(signal.SIGINT)
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(proc.wait(), timeout=8)
if proc.returncode is None:
with contextlib.suppress(ProcessLookupError):
proc.kill()
print(f"[orchestrator] stopped {name}", flush=True)
if args.keep_config:
print(f"[orchestrator] kept config: {cfg_path}", flush=True)
else:
with contextlib.suppress(FileNotFoundError):
cfg_path.unlink()
for suffix in ("", "-shm", "-wal"):
with contextlib.suppress(FileNotFoundError):
Path(str(db_path) + suffix).unlink()
# ===========================================================================
# CLI
# ===========================================================================
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="NOMYO Router load test (asyncio + httpx) with a built-in mock backend.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
mode = p.add_argument_group("mode")
mode.add_argument("--serve-mock", action="store_true",
help="Run ONLY the mock backend (foreground) and exit on Ctrl-C.")
mode.add_argument("--mock-backend", action="store_true",
help="Spawn mock backend + router, load-test them, then tear down.")
tgt = p.add_argument_group("target (driver)")
tgt.add_argument("--url", default="http://127.0.0.1:12434",
help="Router base URL to drive (ignored with --mock-backend).")
tgt.add_argument("--api", choices=["ollama", "openai"], default="ollama",
help="ollama -> POST /api/chat ; openai -> POST /v1/chat/completions")
tgt.add_argument("--model", default="mock", help="Model name to request.")
tgt.add_argument("--prompt", default="Say hello and count to ten.",
help="User prompt sent in every request.")
stream_grp = tgt.add_mutually_exclusive_group()
stream_grp.add_argument("--stream", dest="stream", action="store_true",
help="Stream the response (default).")
stream_grp.add_argument("--no-stream", dest="stream", action="store_false",
help="Request a single non-streamed response.")
p.set_defaults(stream=True)
load = p.add_argument_group("load shape")
load.add_argument("--concurrency", type=int, default=32,
help="Number of concurrent virtual clients.")
load.add_argument("--duration", type=float, default=20.0,
help="Seconds to run each stage (ignored if --requests given).")
load.add_argument("--requests", type=int, default=None,
help="Send exactly N requests instead of running for --duration.")
load.add_argument("--ramp", default=None,
help="Comma-separated concurrency stages, e.g. 16,32,64,128 "
"(prints a table to find the knee).")
load.add_argument("--warmup", type=float, default=2.0,
help="Seconds of unmeasured warmup before each stage.")
load.add_argument("--timeout", type=float, default=120.0,
help="Per-request timeout (seconds).")
load.add_argument("--json", dest="json_out", default=None,
help="Also write the results as JSON to this path.")
mock = p.add_argument_group("mock backend tuning")
mock.add_argument("--mock-host", default="127.0.0.1")
mock.add_argument("--mock-port", type=int, default=0,
help="Mock backend port (0 = auto-pick a free port).")
mock.add_argument("--mock-models", default="mock",
help="Comma-separated model names the mock advertises.")
mock.add_argument("--mock-ttft-ms", type=float, default=0.0,
help="Simulated prefill latency before the first token (ms).")
mock.add_argument("--mock-tokens", type=int, default=64,
help="Completion length in tokens the mock emits.")
mock.add_argument("--mock-tok-ms", type=float, default=0.0,
help="Simulated per-token decode delay (ms) = inverse of tok/s.")
orch = p.add_argument_group("router orchestration (--mock-backend only)")
orch.add_argument("--router-port", type=int, default=0,
help="Router port (0 = auto-pick a free port).")
orch.add_argument("--router-workers", type=int, default=1,
help="uvicorn --workers for the spawned router.")
orch.add_argument("--router-max-conc", type=int, default=0,
help="max_concurrent_connections in the generated config "
"(0 = match peak concurrency so the router does not queue).")
orch.add_argument("--keep-config", action="store_true",
help="Do not delete the generated temp config on exit.")
return p
def _dump_json(path: str, args, results: list[Stats]) -> None:
out = {
"config": {k: getattr(args, k) for k in (
"api", "model", "stream", "duration", "requests", "warmup", "timeout",
"mock_tokens", "mock_ttft_ms", "mock_tok_ms")},
"stages": [],
}
for st in results:
lat = [s.latency * 1000 for s in st.ok]
ttfts = [s.ttft * 1000 for s in st.ok if s.ttft is not None]
out["stages"].append({
"concurrency": st.concurrency,
"wall_s": st.wall,
"requests": st.n_total,
"ok": st.n_ok,
"errors": st.n_total - st.n_ok,
"rps": (st.n_ok / st.wall) if st.wall else 0.0,
"latency_ms": {p: _pct(lat, p) for p in (50, 90, 95, 99)} | (
{"max": max(lat), "mean": statistics.mean(lat)} if lat else {}),
"ttft_ms": {p: _pct(ttfts, p) for p in (50, 90, 99)} if ttfts else {},
})
Path(path).write_text(json.dumps(out, indent=2))
print(f"\n[driver] wrote JSON results to {path}", flush=True)
def main() -> None:
args = build_parser().parse_args()
if args.serve_mock:
try:
serve_mock(args)
except KeyboardInterrupt:
pass
return
if args.requests is not None and args.requests <= 0:
print("--requests must be > 0", file=sys.stderr)
sys.exit(2)
if args.mock_backend:
results = asyncio.run(run_with_mock_backend(args)) or []
else:
results = asyncio.run(run_driver(args))
if args.json_out and results:
_dump_json(args.json_out, args, results)
if __name__ == "__main__":
main()