#!/usr/bin/env python3 """ NOMYO Router load test — asyncio + httpx driver with a built-in mock backend. Three modes ----------- 1. Drive an already-running router (default):: python test/load/loadtest.py --url http://127.0.0.1:12434 \ --concurrency 64 --duration 30 --stream 2. Fully self-contained "mock backend" mode — spins up a fast fake Ollama/OpenAI backend AND the router (wired to that backend via a temp config), load-tests them, then tears both down. This isolates the *router's* proxy overhead from real GPU compute, so the numbers tell you how many concurrent connections the router itself can sustain on this machine:: python test/load/loadtest.py --mock-backend \ --concurrency 128 --duration 30 --stream 3. Run just the mock backend (point your own router config at it):: python test/load/loadtest.py --serve-mock --mock-port 11434 Both streaming and non-streaming are supported (--stream / --no-stream), against either the Ollama API (--api ollama -> POST /api/chat) or the OpenAI-compatible API (--api openai -> POST /v1/chat/completions). Finding the concurrency ceiling ------------------------------- Use --ramp to sweep concurrency levels and print a table; the "knee" is where p99 latency climbs sharply or req/s stops increasing:: python test/load/loadtest.py --mock-backend --stream \ --ramp 16,32,64,128,256 --duration 15 The mock backend is a configurable "fake GPU": --mock-ttft-ms (prefill latency), --mock-tokens (completion length) and --mock-tok-ms (per-token decode delay) let you model anything from an instant echo (measure pure proxy overhead) to a slow, long-streaming model (measure how many slow streams the box holds open). """ from __future__ import annotations import argparse import asyncio import contextlib import json import math import os import signal import socket import statistics import sys import tempfile import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Optional import httpx # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _WORDS = ( "the quick brown fox jumps over the lazy dog while a router proxies many " "concurrent streaming completions across several ollama and openai backends " "without dropping a single token under sustained synthetic load testing" ).split() def _gen_text(n_tokens: int) -> str: """Deterministic pseudo-completion of roughly ``n_tokens`` space-separated tokens.""" return " ".join(_WORDS[i % len(_WORDS)] for i in range(max(0, n_tokens))) def _rfc3339_now() -> str: # Ollama-style timestamp, e.g. 2024-01-01T00:00:00.000000Z return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" def _count_prompt_tokens(messages: list) -> int: total = 0 for m in messages or []: c = m.get("content") if isinstance(c, str): total += len(c.split()) elif isinstance(c, list): for part in c: if isinstance(part, dict) and isinstance(part.get("text"), str): total += len(part["text"].split()) return max(1, total) def _free_port() -> int: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("127.0.0.1", 0)) port = s.getsockname()[1] s.close() return port # =========================================================================== # Mock backend (a fast, configurable fake Ollama + OpenAI-compatible server) # =========================================================================== def build_mock_app(models: list[str], ttft_ms: float, tokens: int, tok_ms: float): """Construct the aiohttp mock-backend application. Serves the native-Ollama surface the router uses for discovery and the `/api/chat` path (`/api/version`, `/api/tags`, `/api/ps`, `/api/chat`, `/api/generate`) plus the OpenAI-compatible surface used by the `/v1/chat/completions` path (`/v1/models`, `/v1/chat/completions`, `/v1/completions`). """ from aiohttp import web # imported lazily so the driver has no hard aiohttp dep ttft = ttft_ms / 1000.0 tok_delay = tok_ms / 1000.0 def _tag_entry(name: str) -> dict: return { "name": name, "model": name, "modified_at": _rfc3339_now(), "size": 4_000_000_000, "digest": "0" * 64, "details": { "parent_model": "", "format": "gguf", "family": "mock", "families": ["mock"], "parameter_size": "7B", "quantization_level": "Q4_0", }, } async def version(_req): return web.json_response({"version": "0.0.0-nomyo-mock"}) async def tags(_req): return web.json_response({"models": [_tag_entry(m) for m in models]}) async def ps(_req): # Report every advertised model as loaded with VRAM so choose_endpoint # treats this endpoint as "loaded + free". out = [] for m in models: e = _tag_entry(m) e["size_vram"] = e["size"] e["expires_at"] = "2999-01-01T00:00:00Z" out.append(e) return web.json_response({"models": out}) async def v1_models(_req): now = int(time.time()) return web.json_response({ "object": "list", "data": [{"id": m, "object": "model", "created": now, "owned_by": "mock"} for m in models], }) # ----- Ollama /api/chat ------------------------------------------------- async def api_chat(req): payload = await req.json() model = payload.get("model", models[0] if models else "mock") stream = payload.get("stream", True) prompt_tok = _count_prompt_tokens(payload.get("messages", [])) t0 = time.perf_counter() if stream: resp = web.StreamResponse( status=200, headers={"Content-Type": "application/x-ndjson"} ) await resp.prepare(req) if ttft: await asyncio.sleep(ttft) for i in range(tokens): if tok_delay and i: await asyncio.sleep(tok_delay) line = { "model": model, "created_at": _rfc3339_now(), "message": {"role": "assistant", "content": _WORDS[i % len(_WORDS)] + " "}, "done": False, } await resp.write(json.dumps(line).encode() + b"\n") dur_ns = int((time.perf_counter() - t0) * 1e9) final = { "model": model, "created_at": _rfc3339_now(), "message": {"role": "assistant", "content": ""}, "done": True, "done_reason": "stop", "total_duration": dur_ns, "load_duration": 0, "prompt_eval_count": prompt_tok, "prompt_eval_duration": int(ttft * 1e9), "eval_count": tokens, "eval_duration": dur_ns, } await resp.write(json.dumps(final).encode() + b"\n") await resp.write_eof() return resp # non-streaming: simulate the whole generation latency, then one object await asyncio.sleep(ttft + tokens * tok_delay) dur_ns = int((time.perf_counter() - t0) * 1e9) return web.json_response({ "model": model, "created_at": _rfc3339_now(), "message": {"role": "assistant", "content": _gen_text(tokens)}, "done": True, "done_reason": "stop", "total_duration": dur_ns, "load_duration": 0, "prompt_eval_count": prompt_tok, "prompt_eval_duration": int(ttft * 1e9), "eval_count": tokens, "eval_duration": dur_ns, }) # ----- Ollama /api/generate -------------------------------------------- async def api_generate(req): payload = await req.json() model = payload.get("model", models[0] if models else "mock") stream = payload.get("stream", True) prompt_tok = max(1, len(str(payload.get("prompt", "")).split())) t0 = time.perf_counter() if stream: resp = web.StreamResponse(status=200, headers={"Content-Type": "application/x-ndjson"}) await resp.prepare(req) if ttft: await asyncio.sleep(ttft) for i in range(tokens): if tok_delay and i: await asyncio.sleep(tok_delay) await resp.write(json.dumps({ "model": model, "created_at": _rfc3339_now(), "response": _WORDS[i % len(_WORDS)] + " ", "done": False, }).encode() + b"\n") dur_ns = int((time.perf_counter() - t0) * 1e9) await resp.write(json.dumps({ "model": model, "created_at": _rfc3339_now(), "response": "", "done": True, "done_reason": "stop", "total_duration": dur_ns, "prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns, }).encode() + b"\n") await resp.write_eof() return resp await asyncio.sleep(ttft + tokens * tok_delay) dur_ns = int((time.perf_counter() - t0) * 1e9) return web.json_response({ "model": model, "created_at": _rfc3339_now(), "response": _gen_text(tokens), "done": True, "done_reason": "stop", "total_duration": dur_ns, "prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns, }) # ----- OpenAI /v1/chat/completions ------------------------------------- async def v1_chat(req): payload = await req.json() model = payload.get("model", models[0] if models else "mock") stream = payload.get("stream", False) want_usage = bool((payload.get("stream_options") or {}).get("include_usage")) prompt_tok = _count_prompt_tokens(payload.get("messages", [])) created = int(time.time()) cid = "chatcmpl-mock" if stream: resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"}) await resp.prepare(req) if ttft: await asyncio.sleep(ttft) def _sse(obj: dict) -> bytes: return b"data: " + json.dumps(obj).encode() + b"\n\n" # first chunk carries the role await resp.write(_sse({ "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], })) for i in range(tokens): if tok_delay and i: await asyncio.sleep(tok_delay) await resp.write(_sse({ "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": _WORDS[i % len(_WORDS)] + " "}, "finish_reason": None}], })) await resp.write(_sse({ "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], })) if want_usage: await resp.write(_sse({ "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [], "usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens, "total_tokens": prompt_tok + tokens}, })) await resp.write(b"data: [DONE]\n\n") await resp.write_eof() return resp await asyncio.sleep(ttft + tokens * tok_delay) return web.json_response({ "id": cid, "object": "chat.completion", "created": created, "model": model, "choices": [{"index": 0, "message": {"role": "assistant", "content": _gen_text(tokens)}, "finish_reason": "stop", "logprobs": None}], "usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens, "total_tokens": prompt_tok + tokens}, }) # ----- OpenAI /v1/completions ------------------------------------------ async def v1_completions(req): payload = await req.json() model = payload.get("model", models[0] if models else "mock") stream = payload.get("stream", False) prompt_tok = max(1, len(str(payload.get("prompt", "")).split())) created = int(time.time()) cid = "cmpl-mock" if stream: resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"}) await resp.prepare(req) if ttft: await asyncio.sleep(ttft) for i in range(tokens): if tok_delay and i: await asyncio.sleep(tok_delay) await resp.write(b"data: " + json.dumps({ "id": cid, "object": "text_completion", "created": created, "model": model, "choices": [{"index": 0, "text": _WORDS[i % len(_WORDS)] + " ", "finish_reason": None}], }).encode() + b"\n\n") await resp.write(b"data: [DONE]\n\n") await resp.write_eof() return resp await asyncio.sleep(ttft + tokens * tok_delay) return web.json_response({ "id": cid, "object": "text_completion", "created": created, "model": model, "choices": [{"index": 0, "text": _gen_text(tokens), "finish_reason": "stop"}], "usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens, "total_tokens": prompt_tok + tokens}, }) app = web.Application(client_max_size=64 * 1024 * 1024) app.add_routes([ web.get("/api/version", version), web.get("/api/tags", tags), web.get("/api/ps", ps), web.post("/api/chat", api_chat), web.post("/api/generate", api_generate), web.get("/v1/models", v1_models), web.post("/v1/chat/completions", v1_chat), web.post("/v1/completions", v1_completions), ]) return app def serve_mock(args) -> None: from aiohttp import web models = [m.strip() for m in args.mock_models.split(",") if m.strip()] app = build_mock_app(models, args.mock_ttft_ms, args.mock_tokens, args.mock_tok_ms) print(f"[mock] serving models={models} on http://{args.mock_host}:{args.mock_port} " f"(ttft={args.mock_ttft_ms}ms tokens={args.mock_tokens} tok={args.mock_tok_ms}ms)", flush=True) web.run_app(app, host=args.mock_host, port=args.mock_port, print=None) # =========================================================================== # Load driver # =========================================================================== @dataclass class Sample: ok: bool status: int latency: float # full request wall time (s) ttft: Optional[float] # time-to-first-byte for streaming (s), else None err: Optional[str] = None @dataclass class Stats: concurrency: int wall: float = 0.0 samples: list = field(default_factory=list) @property def ok(self) -> list: return [s for s in self.samples if s.ok] @property def n_total(self) -> int: return len(self.samples) @property def n_ok(self) -> int: return len(self.ok) def _pct(values: list[float], p: float) -> float: if not values: return float("nan") s = sorted(values) if len(s) == 1: return s[0] k = (len(s) - 1) * (p / 100.0) lo = math.floor(k) hi = math.ceil(k) if lo == hi: return s[int(k)] return s[lo] + (s[hi] - s[lo]) * (k - lo) def _build_request(args): """Return (path, json_payload) for a single request.""" messages = [{"role": "user", "content": args.prompt}] if args.api == "openai": path = "/v1/chat/completions" body = {"model": args.model, "messages": messages, "stream": args.stream} else: path = "/api/chat" body = {"model": args.model, "messages": messages, "stream": args.stream} return path, body async def _one_request(client: httpx.AsyncClient, url: str, body: dict, stream: bool) -> Sample: t0 = time.perf_counter() try: if stream: ttft = None async with client.stream("POST", url, json=body) as resp: status = resp.status_code async for _chunk in resp.aiter_bytes(): if ttft is None: ttft = time.perf_counter() - t0 # drain complete lat = time.perf_counter() - t0 ok = 200 <= status < 300 return Sample(ok=ok, status=status, latency=lat, ttft=ttft, err=None if ok else f"HTTP {status}") else: resp = await client.post(url, json=body) lat = time.perf_counter() - t0 ok = 200 <= resp.status_code < 300 # touch body so the full response is received _ = resp.content return Sample(ok=ok, status=resp.status_code, latency=lat, ttft=None, err=None if ok else f"HTTP {resp.status_code}") except Exception as e: # noqa: BLE001 — record any transport error as a failed sample lat = time.perf_counter() - t0 return Sample(ok=False, status=0, latency=lat, ttft=None, err=f"{type(e).__name__}: {str(e)[:120]}") async def run_stage(args, concurrency: int) -> Stats: path, body = _build_request(args) url = args.url.rstrip("/") + path stats = Stats(concurrency=concurrency) limits = httpx.Limits(max_connections=concurrency + 50, max_keepalive_connections=concurrency + 50) timeout = httpx.Timeout(args.timeout, connect=15.0) async with httpx.AsyncClient(limits=limits, timeout=timeout) as client: # warmup (unmeasured): make a few requests so caches/connections are hot if args.warmup > 0: warm_deadline = time.perf_counter() + args.warmup async def _warm(): while time.perf_counter() < warm_deadline: await _one_request(client, url, body, args.stream) await asyncio.gather(*[_warm() for _ in range(min(concurrency, 8))]) use_duration = args.requests is None deadline = time.perf_counter() + args.duration if use_duration else None remaining = args.requests if not use_duration else None remaining_lock = asyncio.Lock() async def worker(): nonlocal remaining while True: if use_duration: if time.perf_counter() >= deadline: return else: async with remaining_lock: if remaining <= 0: return remaining -= 1 s = await _one_request(client, url, body, args.stream) stats.samples.append(s) wall0 = time.perf_counter() await asyncio.gather(*[worker() for _ in range(concurrency)]) stats.wall = time.perf_counter() - wall0 return stats def _print_stage(stats: Stats, args, header: bool) -> None: lat = [s.latency * 1000 for s in stats.ok] ttfts = [s.ttft * 1000 for s in stats.ok if s.ttft is not None] rps = stats.n_ok / stats.wall if stats.wall else 0.0 errs = stats.n_total - stats.n_ok if args.ramp: if header: cols = f"{'conc':>5} {'req':>7} {'ok':>7} {'err':>5} {'req/s':>9} " \ f"{'p50ms':>8} {'p90ms':>8} {'p99ms':>9} {'maxms':>9}" if args.stream: cols += f" {'ttftP50':>8} {'ttftP99':>8}" print(cols) print("-" * len(cols)) row = (f"{stats.concurrency:>5} {stats.n_total:>7} {stats.n_ok:>7} {errs:>5} " f"{rps:>9.1f} {_pct(lat,50):>8.1f} {_pct(lat,90):>8.1f} " f"{_pct(lat,99):>9.1f} {(max(lat) if lat else float('nan')):>9.1f}") if args.stream: row += f" {_pct(ttfts,50):>8.1f} {_pct(ttfts,99):>8.1f}" print(row, flush=True) return # single-stage detailed report print(f"\n=== Results (concurrency={stats.concurrency}, " f"{'stream' if args.stream else 'non-stream'}, api={args.api}) ===") print(f" wall time : {stats.wall:8.2f} s") print(f" requests : {stats.n_total} total, {stats.n_ok} ok, {errs} failed") print(f" throughput : {rps:8.1f} req/s") if lat: print(f" latency p50 : {_pct(lat,50):8.1f} ms") print(f" p90 : {_pct(lat,90):8.1f} ms") print(f" p95 : {_pct(lat,95):8.1f} ms") print(f" p99 : {_pct(lat,99):8.1f} ms") print(f" max : {max(lat):8.1f} ms") print(f" mean : {statistics.mean(lat):8.1f} ms") if ttfts: print(f" TTFT p50 : {_pct(ttfts,50):8.1f} ms") print(f" p90 : {_pct(ttfts,90):8.1f} ms") print(f" p99 : {_pct(ttfts,99):8.1f} ms") if errs: by_err: dict[str, int] = {} for s in stats.samples: if not s.ok: by_err[s.err or "unknown"] = by_err.get(s.err or "unknown", 0) + 1 print(" errors:") for k, v in sorted(by_err.items(), key=lambda kv: -kv[1]): print(f" {v:>6} {k}") async def run_driver(args) -> list[Stats]: stages = ([int(x) for x in args.ramp.split(",")] if args.ramp else [args.concurrency]) results: list[Stats] = [] for i, c in enumerate(stages): stats = await run_stage(args, c) _print_stage(stats, args, header=(i == 0)) results.append(stats) return results # =========================================================================== # Orchestration: --mock-backend (spawn mock + router, run, tear down) # =========================================================================== PROJECT_ROOT = Path(__file__).resolve().parents[2] async def _wait_http_ok(url: str, timeout: float, accept=(200,)) -> bool: deadline = time.perf_counter() + timeout async with httpx.AsyncClient(timeout=5.0) as client: while time.perf_counter() < deadline: try: r = await client.get(url) if r.status_code in accept: return True except Exception: pass await asyncio.sleep(0.25) return False def _write_temp_config(mock_url: str, models: list[str], max_conc: int) -> Path: fd, path = tempfile.mkstemp(prefix="nomyo_loadtest_", suffix=".yaml") os.close(fd) cfg = ( "# Auto-generated by test/load/loadtest.py --mock-backend. Safe to delete.\n" "endpoints:\n" f" - {mock_url}\n" "llama_server_endpoints: []\n" f"max_concurrent_connections: {max_conc}\n" "priority_routing: false\n" "conversation_affinity: false\n" "cache_enabled: false\n" "nomyo-router-api-key: \"\"\n" "api_keys:\n" f" \"{mock_url}\": \"mock\"\n" ) Path(path).write_text(cfg) return Path(path) async def run_with_mock_backend(args) -> list[Stats]: mock_port = args.mock_port or _free_port() router_port = args.router_port or _free_port() mock_url = f"http://127.0.0.1:{mock_port}" router_url = f"http://127.0.0.1:{router_port}" models = [m.strip() for m in args.mock_models.split(",") if m.strip()] # Size the router's per-endpoint admission limit so it does not artificially # serialize the load (unless the user explicitly wants to measure that). peak = max([int(x) for x in args.ramp.split(",")]) if args.ramp else args.concurrency max_conc = args.router_max_conc if args.router_max_conc else max(peak, 1) cfg_path = _write_temp_config(mock_url, models, max_conc) db_path = Path(tempfile.gettempdir()) / f"nomyo_loadtest_{os.getpid()}.db" env = dict(os.environ) env["NOMYO_ROUTER_CONFIG_PATH"] = str(cfg_path) env["NOMYO_ROUTER_DB_PATH"] = str(db_path) mock_proc = None router_proc = None try: # 1. mock backend first, so the router never caches it as "down" mock_cmd = [ sys.executable, str(Path(__file__).resolve()), "--serve-mock", "--mock-host", "127.0.0.1", "--mock-port", str(mock_port), "--mock-models", args.mock_models, "--mock-ttft-ms", str(args.mock_ttft_ms), "--mock-tokens", str(args.mock_tokens), "--mock-tok-ms", str(args.mock_tok_ms), ] print(f"[orchestrator] starting mock backend: {mock_url}", flush=True) mock_proc = await asyncio.create_subprocess_exec(*mock_cmd) if not await _wait_http_ok(f"{mock_url}/api/version", timeout=15): raise RuntimeError("mock backend did not become ready") # 2. router router_cmd = [ sys.executable, "-m", "uvicorn", "router:app", "--host", "127.0.0.1", "--port", str(router_port), # Per-request access logging is pure noise (and overhead) under load. "--no-access-log", ] if args.router_workers and args.router_workers > 1: router_cmd += ["--workers", str(args.router_workers)] print(f"[orchestrator] starting router: {router_url} " f"(workers={args.router_workers}, max_concurrent_connections={max_conc})", flush=True) router_proc = await asyncio.create_subprocess_exec( *router_cmd, cwd=str(PROJECT_ROOT), env=env ) # /health returns 200 only once it can reach the (healthy) mock backend if not await _wait_http_ok(f"{router_url}/health", timeout=40, accept=(200,)): raise RuntimeError("router did not become healthy") print("[orchestrator] router healthy — starting load\n", flush=True) # 3. drive load against the router args.url = router_url return await run_driver(args) finally: for name, proc in (("router", router_proc), ("mock", mock_proc)): if proc and proc.returncode is None: with contextlib.suppress(ProcessLookupError): proc.send_signal(signal.SIGINT) with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for(proc.wait(), timeout=8) if proc.returncode is None: with contextlib.suppress(ProcessLookupError): proc.kill() print(f"[orchestrator] stopped {name}", flush=True) if args.keep_config: print(f"[orchestrator] kept config: {cfg_path}", flush=True) else: with contextlib.suppress(FileNotFoundError): cfg_path.unlink() for suffix in ("", "-shm", "-wal"): with contextlib.suppress(FileNotFoundError): Path(str(db_path) + suffix).unlink() # =========================================================================== # CLI # =========================================================================== def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="NOMYO Router load test (asyncio + httpx) with a built-in mock backend.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) mode = p.add_argument_group("mode") mode.add_argument("--serve-mock", action="store_true", help="Run ONLY the mock backend (foreground) and exit on Ctrl-C.") mode.add_argument("--mock-backend", action="store_true", help="Spawn mock backend + router, load-test them, then tear down.") tgt = p.add_argument_group("target (driver)") tgt.add_argument("--url", default="http://127.0.0.1:12434", help="Router base URL to drive (ignored with --mock-backend).") tgt.add_argument("--api", choices=["ollama", "openai"], default="ollama", help="ollama -> POST /api/chat ; openai -> POST /v1/chat/completions") tgt.add_argument("--model", default="mock", help="Model name to request.") tgt.add_argument("--prompt", default="Say hello and count to ten.", help="User prompt sent in every request.") stream_grp = tgt.add_mutually_exclusive_group() stream_grp.add_argument("--stream", dest="stream", action="store_true", help="Stream the response (default).") stream_grp.add_argument("--no-stream", dest="stream", action="store_false", help="Request a single non-streamed response.") p.set_defaults(stream=True) load = p.add_argument_group("load shape") load.add_argument("--concurrency", type=int, default=32, help="Number of concurrent virtual clients.") load.add_argument("--duration", type=float, default=20.0, help="Seconds to run each stage (ignored if --requests given).") load.add_argument("--requests", type=int, default=None, help="Send exactly N requests instead of running for --duration.") load.add_argument("--ramp", default=None, help="Comma-separated concurrency stages, e.g. 16,32,64,128 " "(prints a table to find the knee).") load.add_argument("--warmup", type=float, default=2.0, help="Seconds of unmeasured warmup before each stage.") load.add_argument("--timeout", type=float, default=120.0, help="Per-request timeout (seconds).") load.add_argument("--json", dest="json_out", default=None, help="Also write the results as JSON to this path.") mock = p.add_argument_group("mock backend tuning") mock.add_argument("--mock-host", default="127.0.0.1") mock.add_argument("--mock-port", type=int, default=0, help="Mock backend port (0 = auto-pick a free port).") mock.add_argument("--mock-models", default="mock", help="Comma-separated model names the mock advertises.") mock.add_argument("--mock-ttft-ms", type=float, default=0.0, help="Simulated prefill latency before the first token (ms).") mock.add_argument("--mock-tokens", type=int, default=64, help="Completion length in tokens the mock emits.") mock.add_argument("--mock-tok-ms", type=float, default=0.0, help="Simulated per-token decode delay (ms) = inverse of tok/s.") orch = p.add_argument_group("router orchestration (--mock-backend only)") orch.add_argument("--router-port", type=int, default=0, help="Router port (0 = auto-pick a free port).") orch.add_argument("--router-workers", type=int, default=1, help="uvicorn --workers for the spawned router.") orch.add_argument("--router-max-conc", type=int, default=0, help="max_concurrent_connections in the generated config " "(0 = match peak concurrency so the router does not queue).") orch.add_argument("--keep-config", action="store_true", help="Do not delete the generated temp config on exit.") return p def _dump_json(path: str, args, results: list[Stats]) -> None: out = { "config": {k: getattr(args, k) for k in ( "api", "model", "stream", "duration", "requests", "warmup", "timeout", "mock_tokens", "mock_ttft_ms", "mock_tok_ms")}, "stages": [], } for st in results: lat = [s.latency * 1000 for s in st.ok] ttfts = [s.ttft * 1000 for s in st.ok if s.ttft is not None] out["stages"].append({ "concurrency": st.concurrency, "wall_s": st.wall, "requests": st.n_total, "ok": st.n_ok, "errors": st.n_total - st.n_ok, "rps": (st.n_ok / st.wall) if st.wall else 0.0, "latency_ms": {p: _pct(lat, p) for p in (50, 90, 95, 99)} | ( {"max": max(lat), "mean": statistics.mean(lat)} if lat else {}), "ttft_ms": {p: _pct(ttfts, p) for p in (50, 90, 99)} if ttfts else {}, }) Path(path).write_text(json.dumps(out, indent=2)) print(f"\n[driver] wrote JSON results to {path}", flush=True) def main() -> None: args = build_parser().parse_args() if args.serve_mock: try: serve_mock(args) except KeyboardInterrupt: pass return if args.requests is not None and args.requests <= 0: print("--requests must be > 0", file=sys.stderr) sys.exit(2) if args.mock_backend: results = asyncio.run(run_with_mock_backend(args)) or [] else: results = asyncio.run(run_driver(args)) if args.json_out and results: _dump_json(args.json_out, args, results) if __name__ == "__main__": main()