diff --git a/test/load/README.md b/test/load/README.md new file mode 100644 index 0000000..758d072 --- /dev/null +++ b/test/load/README.md @@ -0,0 +1,138 @@ +# Load testing the NOMYO Router + +`loadtest.py` is a self-contained load generator (asyncio + httpx) with a built-in +**mock backend** so you can measure the router's own concurrency ceiling on a given +machine — independent of real GPU/backend compute. + +It answers the question *"how many concurrent connections can the router sustain +on this box?"* by hammering it with N concurrent virtual clients and reporting +throughput, latency percentiles and (for streaming) time-to-first-token. + +Run everything from the project root with the project venv active: + +```bash +source ~/.venv/nomyo-router/bin/activate # whatever venv has the router deps +``` + +## The three modes + +### 1. `--mock-backend` (recommended) — fully self-contained + +Spawns a fast fake Ollama/OpenAI backend **and** the router (wired to it via a +temporary config), drives load against the router, then tears both down. Because +the backend is trivial, the numbers reflect the **router's proxy overhead**, not +model inference time. + +```bash +python test/load/loadtest.py --mock-backend --stream --concurrency 128 --duration 30 +``` + +### 2. Default — drive an already-running router + +```bash +python test/load/loadtest.py --url http://127.0.0.1:12434 \ + --api ollama --stream --concurrency 64 --duration 30 --model llama3 +``` + +### 3. `--serve-mock` — just the mock backend + +Run only the fake backend and point your own router `config.yaml` at it +(`endpoints: [http://127.0.0.1:11434]`): + +```bash +python test/load/loadtest.py --serve-mock --mock-port 11434 --mock-tokens 64 +``` + +## Finding the concurrency knee + +`--ramp` sweeps several concurrency levels and prints a table. The knee is where +`req/s` stops rising and `p99` latency starts climbing sharply: + +```bash +python test/load/loadtest.py --mock-backend --stream \ + --ramp 8,32,64,128,256 --duration 15 +``` + +``` + conc req ok err req/s p50ms p90ms p99ms maxms ttftP50 ttftP99 +--------------------------------------------------------------------------------------------- + 8 120 120 0 19.8 404.6 448.3 478.6 501.4 358.4 391.7 + 32 140 140 0 21.5 1487.1 1641.8 2341.8 2397.4 1269.8 1476.3 + 64 148 148 0 21.3 2953.0 4632.5 5204.3 5267.0 1207.8 3031.7 + 128 168 168 0 19.0 6376.4 8608.9 8726.9 8739.8 2843.1 8348.6 +``` + +> Reading the table above: throughput stays flat (~20 req/s) while latency grows +> linearly with concurrency — the classic signature of a **single-worker +> serialization bottleneck**. Raising `--router-workers` lets throughput scale +> across CPU cores; the per-worker ceiling is what each table row measures. + +## Streaming vs non-streaming, Ollama vs OpenAI + +| flag | effect | +|------|--------| +| `--stream` / `--no-stream` | streamed response (default) vs a single buffered response | +| `--api ollama` | drives `POST /api/chat` (default) | +| `--api openai` | drives `POST /v1/chat/completions` | + +Streaming runs additionally report **TTFT** (time-to-first-token), which isolates +prefill/routing latency from total stream duration. + +## Shaping the mock backend (the "fake GPU") + +The mock's latency is fully configurable, so you can model anything from an +instant echo (measure pure proxy overhead) to a slow, long-streaming model +(measure how many slow streams the box holds open at once): + +| flag | meaning | +|------|---------| +| `--mock-ttft-ms` | prefill latency before the first token (ms) | +| `--mock-tokens` | number of completion tokens emitted | +| `--mock-tok-ms` | per-token decode delay (ms) — inverse of tokens/sec | +| `--mock-models` | comma-separated model names advertised in `/api/tags` & `/api/ps` | + +Example — simulate a realistic 40 tok/s model with 300 ms prefill emitting 200 +tokens, and see how many concurrent such streams the router holds: + +```bash +python test/load/loadtest.py --mock-backend --stream --ramp 16,64,256 \ + --mock-ttft-ms 300 --mock-tokens 200 --mock-tok-ms 25 --duration 20 +``` + +## Load shape & misc flags + +| flag | default | meaning | +|------|---------|---------| +| `--concurrency N` | 32 | concurrent virtual clients | +| `--duration S` | 20 | seconds per stage (ignored if `--requests` set) | +| `--requests N` | — | send exactly N requests instead of timing out | +| `--warmup S` | 2 | unmeasured warmup before each stage (hot caches/connections) | +| `--timeout S` | 120 | per-request timeout | +| `--model NAME` | `mock` | model name requested (must match what the backend advertises) | +| `--prompt STR` | … | user prompt sent in every request | +| `--json PATH` | — | also write the full results as JSON | + +### `--mock-backend` orchestration knobs + +| flag | default | meaning | +|------|---------|---------| +| `--router-workers N` | 1 | `uvicorn --workers` for the spawned router | +| `--router-max-conc N` | = peak concurrency | `max_concurrent_connections` in the generated config (so the router doesn't queue unless you want it to) | +| `--router-port` / `--mock-port` | auto | fix the ports instead of auto-picking free ones | +| `--keep-config` | off | keep the generated temp `config.yaml` for inspection | + +## Notes & caveats + +- **Single-machine bias.** With `--mock-backend`, the driver, router and mock all + share the same CPU, so they compete for cores. For an upper-bound number, run + the driver on a separate machine against a real router (`--url`), or pin + processes to different cores. +- The generated config sets `conversation_affinity: false` and + `cache_enabled: false` to measure the raw proxy path. The temp config and a + throwaway token DB (under the system temp dir) are deleted on exit. +- To measure the router's *admission* limit instead of raw throughput, set + `--router-max-conc` low (e.g. `2`) — requests beyond the limit queue on the + least-busy endpoint rather than erroring. +- Requires the router's own dependencies (`aiohttp`, `httpx`, `uvicorn`, …); it + reuses the project venv, no extra packages needed. +``` diff --git a/test/load/loadtest.py b/test/load/loadtest.py new file mode 100644 index 0000000..90bff17 --- /dev/null +++ b/test/load/loadtest.py @@ -0,0 +1,803 @@ +#!/usr/bin/env python3 +""" +NOMYO Router load test — asyncio + httpx driver with a built-in mock backend. + +Three modes +----------- +1. Drive an already-running router (default):: + + python test/load/loadtest.py --url http://127.0.0.1:12434 \ + --concurrency 64 --duration 30 --stream + +2. Fully self-contained "mock backend" mode — spins up a fast fake Ollama/OpenAI + backend AND the router (wired to that backend via a temp config), load-tests + them, then tears both down. This isolates the *router's* proxy overhead from + real GPU compute, so the numbers tell you how many concurrent connections the + router itself can sustain on this machine:: + + python test/load/loadtest.py --mock-backend \ + --concurrency 128 --duration 30 --stream + +3. Run just the mock backend (point your own router config at it):: + + python test/load/loadtest.py --serve-mock --mock-port 11434 + +Both streaming and non-streaming are supported (--stream / --no-stream), against +either the Ollama API (--api ollama -> POST /api/chat) or the OpenAI-compatible +API (--api openai -> POST /v1/chat/completions). + +Finding the concurrency ceiling +------------------------------- +Use --ramp to sweep concurrency levels and print a table; the "knee" is where +p99 latency climbs sharply or req/s stops increasing:: + + python test/load/loadtest.py --mock-backend --stream \ + --ramp 16,32,64,128,256 --duration 15 + +The mock backend is a configurable "fake GPU": --mock-ttft-ms (prefill latency), +--mock-tokens (completion length) and --mock-tok-ms (per-token decode delay) let +you model anything from an instant echo (measure pure proxy overhead) to a slow, +long-streaming model (measure how many slow streams the box holds open). +""" +from __future__ import annotations + +import argparse +import asyncio +import contextlib +import json +import math +import os +import signal +import socket +import statistics +import sys +import tempfile +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import httpx + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +_WORDS = ( + "the quick brown fox jumps over the lazy dog while a router proxies many " + "concurrent streaming completions across several ollama and openai backends " + "without dropping a single token under sustained synthetic load testing" +).split() + + +def _gen_text(n_tokens: int) -> str: + """Deterministic pseudo-completion of roughly ``n_tokens`` space-separated tokens.""" + return " ".join(_WORDS[i % len(_WORDS)] for i in range(max(0, n_tokens))) + + +def _rfc3339_now() -> str: + # Ollama-style timestamp, e.g. 2024-01-01T00:00:00.000000Z + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" + + +def _count_prompt_tokens(messages: list) -> int: + total = 0 + for m in messages or []: + c = m.get("content") + if isinstance(c, str): + total += len(c.split()) + elif isinstance(c, list): + for part in c: + if isinstance(part, dict) and isinstance(part.get("text"), str): + total += len(part["text"].split()) + return max(1, total) + + +def _free_port() -> int: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +# =========================================================================== +# Mock backend (a fast, configurable fake Ollama + OpenAI-compatible server) +# =========================================================================== + +def build_mock_app(models: list[str], ttft_ms: float, tokens: int, tok_ms: float): + """Construct the aiohttp mock-backend application. + + Serves the native-Ollama surface the router uses for discovery and the + `/api/chat` path (`/api/version`, `/api/tags`, `/api/ps`, `/api/chat`, + `/api/generate`) plus the OpenAI-compatible surface used by the + `/v1/chat/completions` path (`/v1/models`, `/v1/chat/completions`, + `/v1/completions`). + """ + from aiohttp import web # imported lazily so the driver has no hard aiohttp dep + + ttft = ttft_ms / 1000.0 + tok_delay = tok_ms / 1000.0 + + def _tag_entry(name: str) -> dict: + return { + "name": name, + "model": name, + "modified_at": _rfc3339_now(), + "size": 4_000_000_000, + "digest": "0" * 64, + "details": { + "parent_model": "", + "format": "gguf", + "family": "mock", + "families": ["mock"], + "parameter_size": "7B", + "quantization_level": "Q4_0", + }, + } + + async def version(_req): + return web.json_response({"version": "0.0.0-nomyo-mock"}) + + async def tags(_req): + return web.json_response({"models": [_tag_entry(m) for m in models]}) + + async def ps(_req): + # Report every advertised model as loaded with VRAM so choose_endpoint + # treats this endpoint as "loaded + free". + out = [] + for m in models: + e = _tag_entry(m) + e["size_vram"] = e["size"] + e["expires_at"] = "2999-01-01T00:00:00Z" + out.append(e) + return web.json_response({"models": out}) + + async def v1_models(_req): + now = int(time.time()) + return web.json_response({ + "object": "list", + "data": [{"id": m, "object": "model", "created": now, "owned_by": "mock"} for m in models], + }) + + # ----- Ollama /api/chat ------------------------------------------------- + async def api_chat(req): + payload = await req.json() + model = payload.get("model", models[0] if models else "mock") + stream = payload.get("stream", True) + prompt_tok = _count_prompt_tokens(payload.get("messages", [])) + t0 = time.perf_counter() + + if stream: + resp = web.StreamResponse( + status=200, headers={"Content-Type": "application/x-ndjson"} + ) + await resp.prepare(req) + if ttft: + await asyncio.sleep(ttft) + for i in range(tokens): + if tok_delay and i: + await asyncio.sleep(tok_delay) + line = { + "model": model, + "created_at": _rfc3339_now(), + "message": {"role": "assistant", "content": _WORDS[i % len(_WORDS)] + " "}, + "done": False, + } + await resp.write(json.dumps(line).encode() + b"\n") + dur_ns = int((time.perf_counter() - t0) * 1e9) + final = { + "model": model, + "created_at": _rfc3339_now(), + "message": {"role": "assistant", "content": ""}, + "done": True, + "done_reason": "stop", + "total_duration": dur_ns, + "load_duration": 0, + "prompt_eval_count": prompt_tok, + "prompt_eval_duration": int(ttft * 1e9), + "eval_count": tokens, + "eval_duration": dur_ns, + } + await resp.write(json.dumps(final).encode() + b"\n") + await resp.write_eof() + return resp + + # non-streaming: simulate the whole generation latency, then one object + await asyncio.sleep(ttft + tokens * tok_delay) + dur_ns = int((time.perf_counter() - t0) * 1e9) + return web.json_response({ + "model": model, + "created_at": _rfc3339_now(), + "message": {"role": "assistant", "content": _gen_text(tokens)}, + "done": True, + "done_reason": "stop", + "total_duration": dur_ns, + "load_duration": 0, + "prompt_eval_count": prompt_tok, + "prompt_eval_duration": int(ttft * 1e9), + "eval_count": tokens, + "eval_duration": dur_ns, + }) + + # ----- Ollama /api/generate -------------------------------------------- + async def api_generate(req): + payload = await req.json() + model = payload.get("model", models[0] if models else "mock") + stream = payload.get("stream", True) + prompt_tok = max(1, len(str(payload.get("prompt", "")).split())) + t0 = time.perf_counter() + if stream: + resp = web.StreamResponse(status=200, headers={"Content-Type": "application/x-ndjson"}) + await resp.prepare(req) + if ttft: + await asyncio.sleep(ttft) + for i in range(tokens): + if tok_delay and i: + await asyncio.sleep(tok_delay) + await resp.write(json.dumps({ + "model": model, "created_at": _rfc3339_now(), + "response": _WORDS[i % len(_WORDS)] + " ", "done": False, + }).encode() + b"\n") + dur_ns = int((time.perf_counter() - t0) * 1e9) + await resp.write(json.dumps({ + "model": model, "created_at": _rfc3339_now(), "response": "", "done": True, + "done_reason": "stop", "total_duration": dur_ns, + "prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns, + }).encode() + b"\n") + await resp.write_eof() + return resp + await asyncio.sleep(ttft + tokens * tok_delay) + dur_ns = int((time.perf_counter() - t0) * 1e9) + return web.json_response({ + "model": model, "created_at": _rfc3339_now(), "response": _gen_text(tokens), + "done": True, "done_reason": "stop", "total_duration": dur_ns, + "prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns, + }) + + # ----- OpenAI /v1/chat/completions ------------------------------------- + async def v1_chat(req): + payload = await req.json() + model = payload.get("model", models[0] if models else "mock") + stream = payload.get("stream", False) + want_usage = bool((payload.get("stream_options") or {}).get("include_usage")) + prompt_tok = _count_prompt_tokens(payload.get("messages", [])) + created = int(time.time()) + cid = "chatcmpl-mock" + + if stream: + resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"}) + await resp.prepare(req) + if ttft: + await asyncio.sleep(ttft) + + def _sse(obj: dict) -> bytes: + return b"data: " + json.dumps(obj).encode() + b"\n\n" + + # first chunk carries the role + await resp.write(_sse({ + "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, + "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], + })) + for i in range(tokens): + if tok_delay and i: + await asyncio.sleep(tok_delay) + await resp.write(_sse({ + "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, + "choices": [{"index": 0, "delta": {"content": _WORDS[i % len(_WORDS)] + " "}, "finish_reason": None}], + })) + await resp.write(_sse({ + "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], + })) + if want_usage: + await resp.write(_sse({ + "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, + "choices": [], + "usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens, + "total_tokens": prompt_tok + tokens}, + })) + await resp.write(b"data: [DONE]\n\n") + await resp.write_eof() + return resp + + await asyncio.sleep(ttft + tokens * tok_delay) + return web.json_response({ + "id": cid, "object": "chat.completion", "created": created, "model": model, + "choices": [{"index": 0, "message": {"role": "assistant", "content": _gen_text(tokens)}, + "finish_reason": "stop", "logprobs": None}], + "usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens, + "total_tokens": prompt_tok + tokens}, + }) + + # ----- OpenAI /v1/completions ------------------------------------------ + async def v1_completions(req): + payload = await req.json() + model = payload.get("model", models[0] if models else "mock") + stream = payload.get("stream", False) + prompt_tok = max(1, len(str(payload.get("prompt", "")).split())) + created = int(time.time()) + cid = "cmpl-mock" + if stream: + resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"}) + await resp.prepare(req) + if ttft: + await asyncio.sleep(ttft) + for i in range(tokens): + if tok_delay and i: + await asyncio.sleep(tok_delay) + await resp.write(b"data: " + json.dumps({ + "id": cid, "object": "text_completion", "created": created, "model": model, + "choices": [{"index": 0, "text": _WORDS[i % len(_WORDS)] + " ", "finish_reason": None}], + }).encode() + b"\n\n") + await resp.write(b"data: [DONE]\n\n") + await resp.write_eof() + return resp + await asyncio.sleep(ttft + tokens * tok_delay) + return web.json_response({ + "id": cid, "object": "text_completion", "created": created, "model": model, + "choices": [{"index": 0, "text": _gen_text(tokens), "finish_reason": "stop"}], + "usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens, + "total_tokens": prompt_tok + tokens}, + }) + + app = web.Application(client_max_size=64 * 1024 * 1024) + app.add_routes([ + web.get("/api/version", version), + web.get("/api/tags", tags), + web.get("/api/ps", ps), + web.post("/api/chat", api_chat), + web.post("/api/generate", api_generate), + web.get("/v1/models", v1_models), + web.post("/v1/chat/completions", v1_chat), + web.post("/v1/completions", v1_completions), + ]) + return app + + +def serve_mock(args) -> None: + from aiohttp import web + models = [m.strip() for m in args.mock_models.split(",") if m.strip()] + app = build_mock_app(models, args.mock_ttft_ms, args.mock_tokens, args.mock_tok_ms) + print(f"[mock] serving models={models} on http://{args.mock_host}:{args.mock_port} " + f"(ttft={args.mock_ttft_ms}ms tokens={args.mock_tokens} tok={args.mock_tok_ms}ms)", + flush=True) + web.run_app(app, host=args.mock_host, port=args.mock_port, print=None) + + +# =========================================================================== +# Load driver +# =========================================================================== + +@dataclass +class Sample: + ok: bool + status: int + latency: float # full request wall time (s) + ttft: Optional[float] # time-to-first-byte for streaming (s), else None + err: Optional[str] = None + + +@dataclass +class Stats: + concurrency: int + wall: float = 0.0 + samples: list = field(default_factory=list) + + @property + def ok(self) -> list: + return [s for s in self.samples if s.ok] + + @property + def n_total(self) -> int: + return len(self.samples) + + @property + def n_ok(self) -> int: + return len(self.ok) + + +def _pct(values: list[float], p: float) -> float: + if not values: + return float("nan") + s = sorted(values) + if len(s) == 1: + return s[0] + k = (len(s) - 1) * (p / 100.0) + lo = math.floor(k) + hi = math.ceil(k) + if lo == hi: + return s[int(k)] + return s[lo] + (s[hi] - s[lo]) * (k - lo) + + +def _build_request(args): + """Return (path, json_payload) for a single request.""" + messages = [{"role": "user", "content": args.prompt}] + if args.api == "openai": + path = "/v1/chat/completions" + body = {"model": args.model, "messages": messages, "stream": args.stream} + else: + path = "/api/chat" + body = {"model": args.model, "messages": messages, "stream": args.stream} + return path, body + + +async def _one_request(client: httpx.AsyncClient, url: str, body: dict, stream: bool) -> Sample: + t0 = time.perf_counter() + try: + if stream: + ttft = None + async with client.stream("POST", url, json=body) as resp: + status = resp.status_code + async for _chunk in resp.aiter_bytes(): + if ttft is None: + ttft = time.perf_counter() - t0 + # drain complete + lat = time.perf_counter() - t0 + ok = 200 <= status < 300 + return Sample(ok=ok, status=status, latency=lat, ttft=ttft, + err=None if ok else f"HTTP {status}") + else: + resp = await client.post(url, json=body) + lat = time.perf_counter() - t0 + ok = 200 <= resp.status_code < 300 + # touch body so the full response is received + _ = resp.content + return Sample(ok=ok, status=resp.status_code, latency=lat, ttft=None, + err=None if ok else f"HTTP {resp.status_code}") + except Exception as e: # noqa: BLE001 — record any transport error as a failed sample + lat = time.perf_counter() - t0 + return Sample(ok=False, status=0, latency=lat, ttft=None, + err=f"{type(e).__name__}: {str(e)[:120]}") + + +async def run_stage(args, concurrency: int) -> Stats: + path, body = _build_request(args) + url = args.url.rstrip("/") + path + stats = Stats(concurrency=concurrency) + + limits = httpx.Limits(max_connections=concurrency + 50, + max_keepalive_connections=concurrency + 50) + timeout = httpx.Timeout(args.timeout, connect=15.0) + + async with httpx.AsyncClient(limits=limits, timeout=timeout) as client: + # warmup (unmeasured): make a few requests so caches/connections are hot + if args.warmup > 0: + warm_deadline = time.perf_counter() + args.warmup + async def _warm(): + while time.perf_counter() < warm_deadline: + await _one_request(client, url, body, args.stream) + await asyncio.gather(*[_warm() for _ in range(min(concurrency, 8))]) + + use_duration = args.requests is None + deadline = time.perf_counter() + args.duration if use_duration else None + remaining = args.requests if not use_duration else None + remaining_lock = asyncio.Lock() + + async def worker(): + nonlocal remaining + while True: + if use_duration: + if time.perf_counter() >= deadline: + return + else: + async with remaining_lock: + if remaining <= 0: + return + remaining -= 1 + s = await _one_request(client, url, body, args.stream) + stats.samples.append(s) + + wall0 = time.perf_counter() + await asyncio.gather(*[worker() for _ in range(concurrency)]) + stats.wall = time.perf_counter() - wall0 + + return stats + + +def _print_stage(stats: Stats, args, header: bool) -> None: + lat = [s.latency * 1000 for s in stats.ok] + ttfts = [s.ttft * 1000 for s in stats.ok if s.ttft is not None] + rps = stats.n_ok / stats.wall if stats.wall else 0.0 + errs = stats.n_total - stats.n_ok + + if args.ramp: + if header: + cols = f"{'conc':>5} {'req':>7} {'ok':>7} {'err':>5} {'req/s':>9} " \ + f"{'p50ms':>8} {'p90ms':>8} {'p99ms':>9} {'maxms':>9}" + if args.stream: + cols += f" {'ttftP50':>8} {'ttftP99':>8}" + print(cols) + print("-" * len(cols)) + row = (f"{stats.concurrency:>5} {stats.n_total:>7} {stats.n_ok:>7} {errs:>5} " + f"{rps:>9.1f} {_pct(lat,50):>8.1f} {_pct(lat,90):>8.1f} " + f"{_pct(lat,99):>9.1f} {(max(lat) if lat else float('nan')):>9.1f}") + if args.stream: + row += f" {_pct(ttfts,50):>8.1f} {_pct(ttfts,99):>8.1f}" + print(row, flush=True) + return + + # single-stage detailed report + print(f"\n=== Results (concurrency={stats.concurrency}, " + f"{'stream' if args.stream else 'non-stream'}, api={args.api}) ===") + print(f" wall time : {stats.wall:8.2f} s") + print(f" requests : {stats.n_total} total, {stats.n_ok} ok, {errs} failed") + print(f" throughput : {rps:8.1f} req/s") + if lat: + print(f" latency p50 : {_pct(lat,50):8.1f} ms") + print(f" p90 : {_pct(lat,90):8.1f} ms") + print(f" p95 : {_pct(lat,95):8.1f} ms") + print(f" p99 : {_pct(lat,99):8.1f} ms") + print(f" max : {max(lat):8.1f} ms") + print(f" mean : {statistics.mean(lat):8.1f} ms") + if ttfts: + print(f" TTFT p50 : {_pct(ttfts,50):8.1f} ms") + print(f" p90 : {_pct(ttfts,90):8.1f} ms") + print(f" p99 : {_pct(ttfts,99):8.1f} ms") + if errs: + by_err: dict[str, int] = {} + for s in stats.samples: + if not s.ok: + by_err[s.err or "unknown"] = by_err.get(s.err or "unknown", 0) + 1 + print(" errors:") + for k, v in sorted(by_err.items(), key=lambda kv: -kv[1]): + print(f" {v:>6} {k}") + + +async def run_driver(args) -> list[Stats]: + stages = ([int(x) for x in args.ramp.split(",")] if args.ramp else [args.concurrency]) + results: list[Stats] = [] + for i, c in enumerate(stages): + stats = await run_stage(args, c) + _print_stage(stats, args, header=(i == 0)) + results.append(stats) + return results + + +# =========================================================================== +# Orchestration: --mock-backend (spawn mock + router, run, tear down) +# =========================================================================== + +PROJECT_ROOT = Path(__file__).resolve().parents[2] + + +async def _wait_http_ok(url: str, timeout: float, accept=(200,)) -> bool: + deadline = time.perf_counter() + timeout + async with httpx.AsyncClient(timeout=5.0) as client: + while time.perf_counter() < deadline: + try: + r = await client.get(url) + if r.status_code in accept: + return True + except Exception: + pass + await asyncio.sleep(0.25) + return False + + +def _write_temp_config(mock_url: str, models: list[str], max_conc: int) -> Path: + fd, path = tempfile.mkstemp(prefix="nomyo_loadtest_", suffix=".yaml") + os.close(fd) + cfg = ( + "# Auto-generated by test/load/loadtest.py --mock-backend. Safe to delete.\n" + "endpoints:\n" + f" - {mock_url}\n" + "llama_server_endpoints: []\n" + f"max_concurrent_connections: {max_conc}\n" + "priority_routing: false\n" + "conversation_affinity: false\n" + "cache_enabled: false\n" + "nomyo-router-api-key: \"\"\n" + "api_keys:\n" + f" \"{mock_url}\": \"mock\"\n" + ) + Path(path).write_text(cfg) + return Path(path) + + +async def run_with_mock_backend(args) -> list[Stats]: + mock_port = args.mock_port or _free_port() + router_port = args.router_port or _free_port() + mock_url = f"http://127.0.0.1:{mock_port}" + router_url = f"http://127.0.0.1:{router_port}" + models = [m.strip() for m in args.mock_models.split(",") if m.strip()] + + # Size the router's per-endpoint admission limit so it does not artificially + # serialize the load (unless the user explicitly wants to measure that). + peak = max([int(x) for x in args.ramp.split(",")]) if args.ramp else args.concurrency + max_conc = args.router_max_conc if args.router_max_conc else max(peak, 1) + + cfg_path = _write_temp_config(mock_url, models, max_conc) + db_path = Path(tempfile.gettempdir()) / f"nomyo_loadtest_{os.getpid()}.db" + + env = dict(os.environ) + env["NOMYO_ROUTER_CONFIG_PATH"] = str(cfg_path) + env["NOMYO_ROUTER_DB_PATH"] = str(db_path) + + mock_proc = None + router_proc = None + try: + # 1. mock backend first, so the router never caches it as "down" + mock_cmd = [ + sys.executable, str(Path(__file__).resolve()), "--serve-mock", + "--mock-host", "127.0.0.1", "--mock-port", str(mock_port), + "--mock-models", args.mock_models, + "--mock-ttft-ms", str(args.mock_ttft_ms), + "--mock-tokens", str(args.mock_tokens), + "--mock-tok-ms", str(args.mock_tok_ms), + ] + print(f"[orchestrator] starting mock backend: {mock_url}", flush=True) + mock_proc = await asyncio.create_subprocess_exec(*mock_cmd) + if not await _wait_http_ok(f"{mock_url}/api/version", timeout=15): + raise RuntimeError("mock backend did not become ready") + + # 2. router + router_cmd = [ + sys.executable, "-m", "uvicorn", "router:app", + "--host", "127.0.0.1", "--port", str(router_port), + # Per-request access logging is pure noise (and overhead) under load. + "--no-access-log", + ] + if args.router_workers and args.router_workers > 1: + router_cmd += ["--workers", str(args.router_workers)] + print(f"[orchestrator] starting router: {router_url} " + f"(workers={args.router_workers}, max_concurrent_connections={max_conc})", flush=True) + router_proc = await asyncio.create_subprocess_exec( + *router_cmd, cwd=str(PROJECT_ROOT), env=env + ) + # /health returns 200 only once it can reach the (healthy) mock backend + if not await _wait_http_ok(f"{router_url}/health", timeout=40, accept=(200,)): + raise RuntimeError("router did not become healthy") + print("[orchestrator] router healthy — starting load\n", flush=True) + + # 3. drive load against the router + args.url = router_url + return await run_driver(args) + + finally: + for name, proc in (("router", router_proc), ("mock", mock_proc)): + if proc and proc.returncode is None: + with contextlib.suppress(ProcessLookupError): + proc.send_signal(signal.SIGINT) + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(proc.wait(), timeout=8) + if proc.returncode is None: + with contextlib.suppress(ProcessLookupError): + proc.kill() + print(f"[orchestrator] stopped {name}", flush=True) + if args.keep_config: + print(f"[orchestrator] kept config: {cfg_path}", flush=True) + else: + with contextlib.suppress(FileNotFoundError): + cfg_path.unlink() + for suffix in ("", "-shm", "-wal"): + with contextlib.suppress(FileNotFoundError): + Path(str(db_path) + suffix).unlink() + + +# =========================================================================== +# CLI +# =========================================================================== + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description="NOMYO Router load test (asyncio + httpx) with a built-in mock backend.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + mode = p.add_argument_group("mode") + mode.add_argument("--serve-mock", action="store_true", + help="Run ONLY the mock backend (foreground) and exit on Ctrl-C.") + mode.add_argument("--mock-backend", action="store_true", + help="Spawn mock backend + router, load-test them, then tear down.") + + tgt = p.add_argument_group("target (driver)") + tgt.add_argument("--url", default="http://127.0.0.1:12434", + help="Router base URL to drive (ignored with --mock-backend).") + tgt.add_argument("--api", choices=["ollama", "openai"], default="ollama", + help="ollama -> POST /api/chat ; openai -> POST /v1/chat/completions") + tgt.add_argument("--model", default="mock", help="Model name to request.") + tgt.add_argument("--prompt", default="Say hello and count to ten.", + help="User prompt sent in every request.") + stream_grp = tgt.add_mutually_exclusive_group() + stream_grp.add_argument("--stream", dest="stream", action="store_true", + help="Stream the response (default).") + stream_grp.add_argument("--no-stream", dest="stream", action="store_false", + help="Request a single non-streamed response.") + p.set_defaults(stream=True) + + load = p.add_argument_group("load shape") + load.add_argument("--concurrency", type=int, default=32, + help="Number of concurrent virtual clients.") + load.add_argument("--duration", type=float, default=20.0, + help="Seconds to run each stage (ignored if --requests given).") + load.add_argument("--requests", type=int, default=None, + help="Send exactly N requests instead of running for --duration.") + load.add_argument("--ramp", default=None, + help="Comma-separated concurrency stages, e.g. 16,32,64,128 " + "(prints a table to find the knee).") + load.add_argument("--warmup", type=float, default=2.0, + help="Seconds of unmeasured warmup before each stage.") + load.add_argument("--timeout", type=float, default=120.0, + help="Per-request timeout (seconds).") + load.add_argument("--json", dest="json_out", default=None, + help="Also write the results as JSON to this path.") + + mock = p.add_argument_group("mock backend tuning") + mock.add_argument("--mock-host", default="127.0.0.1") + mock.add_argument("--mock-port", type=int, default=0, + help="Mock backend port (0 = auto-pick a free port).") + mock.add_argument("--mock-models", default="mock", + help="Comma-separated model names the mock advertises.") + mock.add_argument("--mock-ttft-ms", type=float, default=0.0, + help="Simulated prefill latency before the first token (ms).") + mock.add_argument("--mock-tokens", type=int, default=64, + help="Completion length in tokens the mock emits.") + mock.add_argument("--mock-tok-ms", type=float, default=0.0, + help="Simulated per-token decode delay (ms) = inverse of tok/s.") + + orch = p.add_argument_group("router orchestration (--mock-backend only)") + orch.add_argument("--router-port", type=int, default=0, + help="Router port (0 = auto-pick a free port).") + orch.add_argument("--router-workers", type=int, default=1, + help="uvicorn --workers for the spawned router.") + orch.add_argument("--router-max-conc", type=int, default=0, + help="max_concurrent_connections in the generated config " + "(0 = match peak concurrency so the router does not queue).") + orch.add_argument("--keep-config", action="store_true", + help="Do not delete the generated temp config on exit.") + return p + + +def _dump_json(path: str, args, results: list[Stats]) -> None: + out = { + "config": {k: getattr(args, k) for k in ( + "api", "model", "stream", "duration", "requests", "warmup", "timeout", + "mock_tokens", "mock_ttft_ms", "mock_tok_ms")}, + "stages": [], + } + for st in results: + lat = [s.latency * 1000 for s in st.ok] + ttfts = [s.ttft * 1000 for s in st.ok if s.ttft is not None] + out["stages"].append({ + "concurrency": st.concurrency, + "wall_s": st.wall, + "requests": st.n_total, + "ok": st.n_ok, + "errors": st.n_total - st.n_ok, + "rps": (st.n_ok / st.wall) if st.wall else 0.0, + "latency_ms": {p: _pct(lat, p) for p in (50, 90, 95, 99)} | ( + {"max": max(lat), "mean": statistics.mean(lat)} if lat else {}), + "ttft_ms": {p: _pct(ttfts, p) for p in (50, 90, 99)} if ttfts else {}, + }) + Path(path).write_text(json.dumps(out, indent=2)) + print(f"\n[driver] wrote JSON results to {path}", flush=True) + + +def main() -> None: + args = build_parser().parse_args() + + if args.serve_mock: + try: + serve_mock(args) + except KeyboardInterrupt: + pass + return + + if args.requests is not None and args.requests <= 0: + print("--requests must be > 0", file=sys.stderr) + sys.exit(2) + + if args.mock_backend: + results = asyncio.run(run_with_mock_backend(args)) or [] + else: + results = asyncio.run(run_driver(args)) + + if args.json_out and results: + _dump_json(args.json_out, args, results) + + +if __name__ == "__main__": + main()