803 lines
33 KiB
Python
803 lines
33 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
NOMYO Router load test — asyncio + httpx driver with a built-in mock backend.
|
|
|
|
Three modes
|
|
-----------
|
|
1. Drive an already-running router (default)::
|
|
|
|
python test/load/loadtest.py --url http://127.0.0.1:12434 \
|
|
--concurrency 64 --duration 30 --stream
|
|
|
|
2. Fully self-contained "mock backend" mode — spins up a fast fake Ollama/OpenAI
|
|
backend AND the router (wired to that backend via a temp config), load-tests
|
|
them, then tears both down. This isolates the *router's* proxy overhead from
|
|
real GPU compute, so the numbers tell you how many concurrent connections the
|
|
router itself can sustain on this machine::
|
|
|
|
python test/load/loadtest.py --mock-backend \
|
|
--concurrency 128 --duration 30 --stream
|
|
|
|
3. Run just the mock backend (point your own router config at it)::
|
|
|
|
python test/load/loadtest.py --serve-mock --mock-port 11434
|
|
|
|
Both streaming and non-streaming are supported (--stream / --no-stream), against
|
|
either the Ollama API (--api ollama -> POST /api/chat) or the OpenAI-compatible
|
|
API (--api openai -> POST /v1/chat/completions).
|
|
|
|
Finding the concurrency ceiling
|
|
-------------------------------
|
|
Use --ramp to sweep concurrency levels and print a table; the "knee" is where
|
|
p99 latency climbs sharply or req/s stops increasing::
|
|
|
|
python test/load/loadtest.py --mock-backend --stream \
|
|
--ramp 16,32,64,128,256 --duration 15
|
|
|
|
The mock backend is a configurable "fake GPU": --mock-ttft-ms (prefill latency),
|
|
--mock-tokens (completion length) and --mock-tok-ms (per-token decode delay) let
|
|
you model anything from an instant echo (measure pure proxy overhead) to a slow,
|
|
long-streaming model (measure how many slow streams the box holds open).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import contextlib
|
|
import json
|
|
import math
|
|
import os
|
|
import signal
|
|
import socket
|
|
import statistics
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_WORDS = (
|
|
"the quick brown fox jumps over the lazy dog while a router proxies many "
|
|
"concurrent streaming completions across several ollama and openai backends "
|
|
"without dropping a single token under sustained synthetic load testing"
|
|
).split()
|
|
|
|
|
|
def _gen_text(n_tokens: int) -> str:
|
|
"""Deterministic pseudo-completion of roughly ``n_tokens`` space-separated tokens."""
|
|
return " ".join(_WORDS[i % len(_WORDS)] for i in range(max(0, n_tokens)))
|
|
|
|
|
|
def _rfc3339_now() -> str:
|
|
# Ollama-style timestamp, e.g. 2024-01-01T00:00:00.000000Z
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z"
|
|
|
|
|
|
def _count_prompt_tokens(messages: list) -> int:
|
|
total = 0
|
|
for m in messages or []:
|
|
c = m.get("content")
|
|
if isinstance(c, str):
|
|
total += len(c.split())
|
|
elif isinstance(c, list):
|
|
for part in c:
|
|
if isinstance(part, dict) and isinstance(part.get("text"), str):
|
|
total += len(part["text"].split())
|
|
return max(1, total)
|
|
|
|
|
|
def _free_port() -> int:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind(("127.0.0.1", 0))
|
|
port = s.getsockname()[1]
|
|
s.close()
|
|
return port
|
|
|
|
|
|
# ===========================================================================
|
|
# Mock backend (a fast, configurable fake Ollama + OpenAI-compatible server)
|
|
# ===========================================================================
|
|
|
|
def build_mock_app(models: list[str], ttft_ms: float, tokens: int, tok_ms: float):
|
|
"""Construct the aiohttp mock-backend application.
|
|
|
|
Serves the native-Ollama surface the router uses for discovery and the
|
|
`/api/chat` path (`/api/version`, `/api/tags`, `/api/ps`, `/api/chat`,
|
|
`/api/generate`) plus the OpenAI-compatible surface used by the
|
|
`/v1/chat/completions` path (`/v1/models`, `/v1/chat/completions`,
|
|
`/v1/completions`).
|
|
"""
|
|
from aiohttp import web # imported lazily so the driver has no hard aiohttp dep
|
|
|
|
ttft = ttft_ms / 1000.0
|
|
tok_delay = tok_ms / 1000.0
|
|
|
|
def _tag_entry(name: str) -> dict:
|
|
return {
|
|
"name": name,
|
|
"model": name,
|
|
"modified_at": _rfc3339_now(),
|
|
"size": 4_000_000_000,
|
|
"digest": "0" * 64,
|
|
"details": {
|
|
"parent_model": "",
|
|
"format": "gguf",
|
|
"family": "mock",
|
|
"families": ["mock"],
|
|
"parameter_size": "7B",
|
|
"quantization_level": "Q4_0",
|
|
},
|
|
}
|
|
|
|
async def version(_req):
|
|
return web.json_response({"version": "0.0.0-nomyo-mock"})
|
|
|
|
async def tags(_req):
|
|
return web.json_response({"models": [_tag_entry(m) for m in models]})
|
|
|
|
async def ps(_req):
|
|
# Report every advertised model as loaded with VRAM so choose_endpoint
|
|
# treats this endpoint as "loaded + free".
|
|
out = []
|
|
for m in models:
|
|
e = _tag_entry(m)
|
|
e["size_vram"] = e["size"]
|
|
e["expires_at"] = "2999-01-01T00:00:00Z"
|
|
out.append(e)
|
|
return web.json_response({"models": out})
|
|
|
|
async def v1_models(_req):
|
|
now = int(time.time())
|
|
return web.json_response({
|
|
"object": "list",
|
|
"data": [{"id": m, "object": "model", "created": now, "owned_by": "mock"} for m in models],
|
|
})
|
|
|
|
# ----- Ollama /api/chat -------------------------------------------------
|
|
async def api_chat(req):
|
|
payload = await req.json()
|
|
model = payload.get("model", models[0] if models else "mock")
|
|
stream = payload.get("stream", True)
|
|
prompt_tok = _count_prompt_tokens(payload.get("messages", []))
|
|
t0 = time.perf_counter()
|
|
|
|
if stream:
|
|
resp = web.StreamResponse(
|
|
status=200, headers={"Content-Type": "application/x-ndjson"}
|
|
)
|
|
await resp.prepare(req)
|
|
if ttft:
|
|
await asyncio.sleep(ttft)
|
|
for i in range(tokens):
|
|
if tok_delay and i:
|
|
await asyncio.sleep(tok_delay)
|
|
line = {
|
|
"model": model,
|
|
"created_at": _rfc3339_now(),
|
|
"message": {"role": "assistant", "content": _WORDS[i % len(_WORDS)] + " "},
|
|
"done": False,
|
|
}
|
|
await resp.write(json.dumps(line).encode() + b"\n")
|
|
dur_ns = int((time.perf_counter() - t0) * 1e9)
|
|
final = {
|
|
"model": model,
|
|
"created_at": _rfc3339_now(),
|
|
"message": {"role": "assistant", "content": ""},
|
|
"done": True,
|
|
"done_reason": "stop",
|
|
"total_duration": dur_ns,
|
|
"load_duration": 0,
|
|
"prompt_eval_count": prompt_tok,
|
|
"prompt_eval_duration": int(ttft * 1e9),
|
|
"eval_count": tokens,
|
|
"eval_duration": dur_ns,
|
|
}
|
|
await resp.write(json.dumps(final).encode() + b"\n")
|
|
await resp.write_eof()
|
|
return resp
|
|
|
|
# non-streaming: simulate the whole generation latency, then one object
|
|
await asyncio.sleep(ttft + tokens * tok_delay)
|
|
dur_ns = int((time.perf_counter() - t0) * 1e9)
|
|
return web.json_response({
|
|
"model": model,
|
|
"created_at": _rfc3339_now(),
|
|
"message": {"role": "assistant", "content": _gen_text(tokens)},
|
|
"done": True,
|
|
"done_reason": "stop",
|
|
"total_duration": dur_ns,
|
|
"load_duration": 0,
|
|
"prompt_eval_count": prompt_tok,
|
|
"prompt_eval_duration": int(ttft * 1e9),
|
|
"eval_count": tokens,
|
|
"eval_duration": dur_ns,
|
|
})
|
|
|
|
# ----- Ollama /api/generate --------------------------------------------
|
|
async def api_generate(req):
|
|
payload = await req.json()
|
|
model = payload.get("model", models[0] if models else "mock")
|
|
stream = payload.get("stream", True)
|
|
prompt_tok = max(1, len(str(payload.get("prompt", "")).split()))
|
|
t0 = time.perf_counter()
|
|
if stream:
|
|
resp = web.StreamResponse(status=200, headers={"Content-Type": "application/x-ndjson"})
|
|
await resp.prepare(req)
|
|
if ttft:
|
|
await asyncio.sleep(ttft)
|
|
for i in range(tokens):
|
|
if tok_delay and i:
|
|
await asyncio.sleep(tok_delay)
|
|
await resp.write(json.dumps({
|
|
"model": model, "created_at": _rfc3339_now(),
|
|
"response": _WORDS[i % len(_WORDS)] + " ", "done": False,
|
|
}).encode() + b"\n")
|
|
dur_ns = int((time.perf_counter() - t0) * 1e9)
|
|
await resp.write(json.dumps({
|
|
"model": model, "created_at": _rfc3339_now(), "response": "", "done": True,
|
|
"done_reason": "stop", "total_duration": dur_ns,
|
|
"prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns,
|
|
}).encode() + b"\n")
|
|
await resp.write_eof()
|
|
return resp
|
|
await asyncio.sleep(ttft + tokens * tok_delay)
|
|
dur_ns = int((time.perf_counter() - t0) * 1e9)
|
|
return web.json_response({
|
|
"model": model, "created_at": _rfc3339_now(), "response": _gen_text(tokens),
|
|
"done": True, "done_reason": "stop", "total_duration": dur_ns,
|
|
"prompt_eval_count": prompt_tok, "eval_count": tokens, "eval_duration": dur_ns,
|
|
})
|
|
|
|
# ----- OpenAI /v1/chat/completions -------------------------------------
|
|
async def v1_chat(req):
|
|
payload = await req.json()
|
|
model = payload.get("model", models[0] if models else "mock")
|
|
stream = payload.get("stream", False)
|
|
want_usage = bool((payload.get("stream_options") or {}).get("include_usage"))
|
|
prompt_tok = _count_prompt_tokens(payload.get("messages", []))
|
|
created = int(time.time())
|
|
cid = "chatcmpl-mock"
|
|
|
|
if stream:
|
|
resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"})
|
|
await resp.prepare(req)
|
|
if ttft:
|
|
await asyncio.sleep(ttft)
|
|
|
|
def _sse(obj: dict) -> bytes:
|
|
return b"data: " + json.dumps(obj).encode() + b"\n\n"
|
|
|
|
# first chunk carries the role
|
|
await resp.write(_sse({
|
|
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}],
|
|
}))
|
|
for i in range(tokens):
|
|
if tok_delay and i:
|
|
await asyncio.sleep(tok_delay)
|
|
await resp.write(_sse({
|
|
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {"content": _WORDS[i % len(_WORDS)] + " "}, "finish_reason": None}],
|
|
}))
|
|
await resp.write(_sse({
|
|
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
|
}))
|
|
if want_usage:
|
|
await resp.write(_sse({
|
|
"id": cid, "object": "chat.completion.chunk", "created": created, "model": model,
|
|
"choices": [],
|
|
"usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens,
|
|
"total_tokens": prompt_tok + tokens},
|
|
}))
|
|
await resp.write(b"data: [DONE]\n\n")
|
|
await resp.write_eof()
|
|
return resp
|
|
|
|
await asyncio.sleep(ttft + tokens * tok_delay)
|
|
return web.json_response({
|
|
"id": cid, "object": "chat.completion", "created": created, "model": model,
|
|
"choices": [{"index": 0, "message": {"role": "assistant", "content": _gen_text(tokens)},
|
|
"finish_reason": "stop", "logprobs": None}],
|
|
"usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens,
|
|
"total_tokens": prompt_tok + tokens},
|
|
})
|
|
|
|
# ----- OpenAI /v1/completions ------------------------------------------
|
|
async def v1_completions(req):
|
|
payload = await req.json()
|
|
model = payload.get("model", models[0] if models else "mock")
|
|
stream = payload.get("stream", False)
|
|
prompt_tok = max(1, len(str(payload.get("prompt", "")).split()))
|
|
created = int(time.time())
|
|
cid = "cmpl-mock"
|
|
if stream:
|
|
resp = web.StreamResponse(status=200, headers={"Content-Type": "text/event-stream"})
|
|
await resp.prepare(req)
|
|
if ttft:
|
|
await asyncio.sleep(ttft)
|
|
for i in range(tokens):
|
|
if tok_delay and i:
|
|
await asyncio.sleep(tok_delay)
|
|
await resp.write(b"data: " + json.dumps({
|
|
"id": cid, "object": "text_completion", "created": created, "model": model,
|
|
"choices": [{"index": 0, "text": _WORDS[i % len(_WORDS)] + " ", "finish_reason": None}],
|
|
}).encode() + b"\n\n")
|
|
await resp.write(b"data: [DONE]\n\n")
|
|
await resp.write_eof()
|
|
return resp
|
|
await asyncio.sleep(ttft + tokens * tok_delay)
|
|
return web.json_response({
|
|
"id": cid, "object": "text_completion", "created": created, "model": model,
|
|
"choices": [{"index": 0, "text": _gen_text(tokens), "finish_reason": "stop"}],
|
|
"usage": {"prompt_tokens": prompt_tok, "completion_tokens": tokens,
|
|
"total_tokens": prompt_tok + tokens},
|
|
})
|
|
|
|
app = web.Application(client_max_size=64 * 1024 * 1024)
|
|
app.add_routes([
|
|
web.get("/api/version", version),
|
|
web.get("/api/tags", tags),
|
|
web.get("/api/ps", ps),
|
|
web.post("/api/chat", api_chat),
|
|
web.post("/api/generate", api_generate),
|
|
web.get("/v1/models", v1_models),
|
|
web.post("/v1/chat/completions", v1_chat),
|
|
web.post("/v1/completions", v1_completions),
|
|
])
|
|
return app
|
|
|
|
|
|
def serve_mock(args) -> None:
|
|
from aiohttp import web
|
|
models = [m.strip() for m in args.mock_models.split(",") if m.strip()]
|
|
app = build_mock_app(models, args.mock_ttft_ms, args.mock_tokens, args.mock_tok_ms)
|
|
print(f"[mock] serving models={models} on http://{args.mock_host}:{args.mock_port} "
|
|
f"(ttft={args.mock_ttft_ms}ms tokens={args.mock_tokens} tok={args.mock_tok_ms}ms)",
|
|
flush=True)
|
|
web.run_app(app, host=args.mock_host, port=args.mock_port, print=None)
|
|
|
|
|
|
# ===========================================================================
|
|
# Load driver
|
|
# ===========================================================================
|
|
|
|
@dataclass
|
|
class Sample:
|
|
ok: bool
|
|
status: int
|
|
latency: float # full request wall time (s)
|
|
ttft: Optional[float] # time-to-first-byte for streaming (s), else None
|
|
err: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class Stats:
|
|
concurrency: int
|
|
wall: float = 0.0
|
|
samples: list = field(default_factory=list)
|
|
|
|
@property
|
|
def ok(self) -> list:
|
|
return [s for s in self.samples if s.ok]
|
|
|
|
@property
|
|
def n_total(self) -> int:
|
|
return len(self.samples)
|
|
|
|
@property
|
|
def n_ok(self) -> int:
|
|
return len(self.ok)
|
|
|
|
|
|
def _pct(values: list[float], p: float) -> float:
|
|
if not values:
|
|
return float("nan")
|
|
s = sorted(values)
|
|
if len(s) == 1:
|
|
return s[0]
|
|
k = (len(s) - 1) * (p / 100.0)
|
|
lo = math.floor(k)
|
|
hi = math.ceil(k)
|
|
if lo == hi:
|
|
return s[int(k)]
|
|
return s[lo] + (s[hi] - s[lo]) * (k - lo)
|
|
|
|
|
|
def _build_request(args):
|
|
"""Return (path, json_payload) for a single request."""
|
|
messages = [{"role": "user", "content": args.prompt}]
|
|
if args.api == "openai":
|
|
path = "/v1/chat/completions"
|
|
body = {"model": args.model, "messages": messages, "stream": args.stream}
|
|
else:
|
|
path = "/api/chat"
|
|
body = {"model": args.model, "messages": messages, "stream": args.stream}
|
|
return path, body
|
|
|
|
|
|
async def _one_request(client: httpx.AsyncClient, url: str, body: dict, stream: bool) -> Sample:
|
|
t0 = time.perf_counter()
|
|
try:
|
|
if stream:
|
|
ttft = None
|
|
async with client.stream("POST", url, json=body) as resp:
|
|
status = resp.status_code
|
|
async for _chunk in resp.aiter_bytes():
|
|
if ttft is None:
|
|
ttft = time.perf_counter() - t0
|
|
# drain complete
|
|
lat = time.perf_counter() - t0
|
|
ok = 200 <= status < 300
|
|
return Sample(ok=ok, status=status, latency=lat, ttft=ttft,
|
|
err=None if ok else f"HTTP {status}")
|
|
else:
|
|
resp = await client.post(url, json=body)
|
|
lat = time.perf_counter() - t0
|
|
ok = 200 <= resp.status_code < 300
|
|
# touch body so the full response is received
|
|
_ = resp.content
|
|
return Sample(ok=ok, status=resp.status_code, latency=lat, ttft=None,
|
|
err=None if ok else f"HTTP {resp.status_code}")
|
|
except Exception as e: # noqa: BLE001 — record any transport error as a failed sample
|
|
lat = time.perf_counter() - t0
|
|
return Sample(ok=False, status=0, latency=lat, ttft=None,
|
|
err=f"{type(e).__name__}: {str(e)[:120]}")
|
|
|
|
|
|
async def run_stage(args, concurrency: int) -> Stats:
|
|
path, body = _build_request(args)
|
|
url = args.url.rstrip("/") + path
|
|
stats = Stats(concurrency=concurrency)
|
|
|
|
limits = httpx.Limits(max_connections=concurrency + 50,
|
|
max_keepalive_connections=concurrency + 50)
|
|
timeout = httpx.Timeout(args.timeout, connect=15.0)
|
|
|
|
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
|
|
# warmup (unmeasured): make a few requests so caches/connections are hot
|
|
if args.warmup > 0:
|
|
warm_deadline = time.perf_counter() + args.warmup
|
|
async def _warm():
|
|
while time.perf_counter() < warm_deadline:
|
|
await _one_request(client, url, body, args.stream)
|
|
await asyncio.gather(*[_warm() for _ in range(min(concurrency, 8))])
|
|
|
|
use_duration = args.requests is None
|
|
deadline = time.perf_counter() + args.duration if use_duration else None
|
|
remaining = args.requests if not use_duration else None
|
|
remaining_lock = asyncio.Lock()
|
|
|
|
async def worker():
|
|
nonlocal remaining
|
|
while True:
|
|
if use_duration:
|
|
if time.perf_counter() >= deadline:
|
|
return
|
|
else:
|
|
async with remaining_lock:
|
|
if remaining <= 0:
|
|
return
|
|
remaining -= 1
|
|
s = await _one_request(client, url, body, args.stream)
|
|
stats.samples.append(s)
|
|
|
|
wall0 = time.perf_counter()
|
|
await asyncio.gather(*[worker() for _ in range(concurrency)])
|
|
stats.wall = time.perf_counter() - wall0
|
|
|
|
return stats
|
|
|
|
|
|
def _print_stage(stats: Stats, args, header: bool) -> None:
|
|
lat = [s.latency * 1000 for s in stats.ok]
|
|
ttfts = [s.ttft * 1000 for s in stats.ok if s.ttft is not None]
|
|
rps = stats.n_ok / stats.wall if stats.wall else 0.0
|
|
errs = stats.n_total - stats.n_ok
|
|
|
|
if args.ramp:
|
|
if header:
|
|
cols = f"{'conc':>5} {'req':>7} {'ok':>7} {'err':>5} {'req/s':>9} " \
|
|
f"{'p50ms':>8} {'p90ms':>8} {'p99ms':>9} {'maxms':>9}"
|
|
if args.stream:
|
|
cols += f" {'ttftP50':>8} {'ttftP99':>8}"
|
|
print(cols)
|
|
print("-" * len(cols))
|
|
row = (f"{stats.concurrency:>5} {stats.n_total:>7} {stats.n_ok:>7} {errs:>5} "
|
|
f"{rps:>9.1f} {_pct(lat,50):>8.1f} {_pct(lat,90):>8.1f} "
|
|
f"{_pct(lat,99):>9.1f} {(max(lat) if lat else float('nan')):>9.1f}")
|
|
if args.stream:
|
|
row += f" {_pct(ttfts,50):>8.1f} {_pct(ttfts,99):>8.1f}"
|
|
print(row, flush=True)
|
|
return
|
|
|
|
# single-stage detailed report
|
|
print(f"\n=== Results (concurrency={stats.concurrency}, "
|
|
f"{'stream' if args.stream else 'non-stream'}, api={args.api}) ===")
|
|
print(f" wall time : {stats.wall:8.2f} s")
|
|
print(f" requests : {stats.n_total} total, {stats.n_ok} ok, {errs} failed")
|
|
print(f" throughput : {rps:8.1f} req/s")
|
|
if lat:
|
|
print(f" latency p50 : {_pct(lat,50):8.1f} ms")
|
|
print(f" p90 : {_pct(lat,90):8.1f} ms")
|
|
print(f" p95 : {_pct(lat,95):8.1f} ms")
|
|
print(f" p99 : {_pct(lat,99):8.1f} ms")
|
|
print(f" max : {max(lat):8.1f} ms")
|
|
print(f" mean : {statistics.mean(lat):8.1f} ms")
|
|
if ttfts:
|
|
print(f" TTFT p50 : {_pct(ttfts,50):8.1f} ms")
|
|
print(f" p90 : {_pct(ttfts,90):8.1f} ms")
|
|
print(f" p99 : {_pct(ttfts,99):8.1f} ms")
|
|
if errs:
|
|
by_err: dict[str, int] = {}
|
|
for s in stats.samples:
|
|
if not s.ok:
|
|
by_err[s.err or "unknown"] = by_err.get(s.err or "unknown", 0) + 1
|
|
print(" errors:")
|
|
for k, v in sorted(by_err.items(), key=lambda kv: -kv[1]):
|
|
print(f" {v:>6} {k}")
|
|
|
|
|
|
async def run_driver(args) -> list[Stats]:
|
|
stages = ([int(x) for x in args.ramp.split(",")] if args.ramp else [args.concurrency])
|
|
results: list[Stats] = []
|
|
for i, c in enumerate(stages):
|
|
stats = await run_stage(args, c)
|
|
_print_stage(stats, args, header=(i == 0))
|
|
results.append(stats)
|
|
return results
|
|
|
|
|
|
# ===========================================================================
|
|
# Orchestration: --mock-backend (spawn mock + router, run, tear down)
|
|
# ===========================================================================
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
|
|
|
|
|
async def _wait_http_ok(url: str, timeout: float, accept=(200,)) -> bool:
|
|
deadline = time.perf_counter() + timeout
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
while time.perf_counter() < deadline:
|
|
try:
|
|
r = await client.get(url)
|
|
if r.status_code in accept:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(0.25)
|
|
return False
|
|
|
|
|
|
def _write_temp_config(mock_url: str, models: list[str], max_conc: int) -> Path:
|
|
fd, path = tempfile.mkstemp(prefix="nomyo_loadtest_", suffix=".yaml")
|
|
os.close(fd)
|
|
cfg = (
|
|
"# Auto-generated by test/load/loadtest.py --mock-backend. Safe to delete.\n"
|
|
"endpoints:\n"
|
|
f" - {mock_url}\n"
|
|
"llama_server_endpoints: []\n"
|
|
f"max_concurrent_connections: {max_conc}\n"
|
|
"priority_routing: false\n"
|
|
"conversation_affinity: false\n"
|
|
"cache_enabled: false\n"
|
|
"nomyo-router-api-key: \"\"\n"
|
|
"api_keys:\n"
|
|
f" \"{mock_url}\": \"mock\"\n"
|
|
)
|
|
Path(path).write_text(cfg)
|
|
return Path(path)
|
|
|
|
|
|
async def run_with_mock_backend(args) -> list[Stats]:
|
|
mock_port = args.mock_port or _free_port()
|
|
router_port = args.router_port or _free_port()
|
|
mock_url = f"http://127.0.0.1:{mock_port}"
|
|
router_url = f"http://127.0.0.1:{router_port}"
|
|
models = [m.strip() for m in args.mock_models.split(",") if m.strip()]
|
|
|
|
# Size the router's per-endpoint admission limit so it does not artificially
|
|
# serialize the load (unless the user explicitly wants to measure that).
|
|
peak = max([int(x) for x in args.ramp.split(",")]) if args.ramp else args.concurrency
|
|
max_conc = args.router_max_conc if args.router_max_conc else max(peak, 1)
|
|
|
|
cfg_path = _write_temp_config(mock_url, models, max_conc)
|
|
db_path = Path(tempfile.gettempdir()) / f"nomyo_loadtest_{os.getpid()}.db"
|
|
|
|
env = dict(os.environ)
|
|
env["NOMYO_ROUTER_CONFIG_PATH"] = str(cfg_path)
|
|
env["NOMYO_ROUTER_DB_PATH"] = str(db_path)
|
|
|
|
mock_proc = None
|
|
router_proc = None
|
|
try:
|
|
# 1. mock backend first, so the router never caches it as "down"
|
|
mock_cmd = [
|
|
sys.executable, str(Path(__file__).resolve()), "--serve-mock",
|
|
"--mock-host", "127.0.0.1", "--mock-port", str(mock_port),
|
|
"--mock-models", args.mock_models,
|
|
"--mock-ttft-ms", str(args.mock_ttft_ms),
|
|
"--mock-tokens", str(args.mock_tokens),
|
|
"--mock-tok-ms", str(args.mock_tok_ms),
|
|
]
|
|
print(f"[orchestrator] starting mock backend: {mock_url}", flush=True)
|
|
mock_proc = await asyncio.create_subprocess_exec(*mock_cmd)
|
|
if not await _wait_http_ok(f"{mock_url}/api/version", timeout=15):
|
|
raise RuntimeError("mock backend did not become ready")
|
|
|
|
# 2. router
|
|
router_cmd = [
|
|
sys.executable, "-m", "uvicorn", "router:app",
|
|
"--host", "127.0.0.1", "--port", str(router_port),
|
|
# Per-request access logging is pure noise (and overhead) under load.
|
|
"--no-access-log",
|
|
]
|
|
if args.router_workers and args.router_workers > 1:
|
|
router_cmd += ["--workers", str(args.router_workers)]
|
|
print(f"[orchestrator] starting router: {router_url} "
|
|
f"(workers={args.router_workers}, max_concurrent_connections={max_conc})", flush=True)
|
|
router_proc = await asyncio.create_subprocess_exec(
|
|
*router_cmd, cwd=str(PROJECT_ROOT), env=env
|
|
)
|
|
# /health returns 200 only once it can reach the (healthy) mock backend
|
|
if not await _wait_http_ok(f"{router_url}/health", timeout=40, accept=(200,)):
|
|
raise RuntimeError("router did not become healthy")
|
|
print("[orchestrator] router healthy — starting load\n", flush=True)
|
|
|
|
# 3. drive load against the router
|
|
args.url = router_url
|
|
return await run_driver(args)
|
|
|
|
finally:
|
|
for name, proc in (("router", router_proc), ("mock", mock_proc)):
|
|
if proc and proc.returncode is None:
|
|
with contextlib.suppress(ProcessLookupError):
|
|
proc.send_signal(signal.SIGINT)
|
|
with contextlib.suppress(asyncio.TimeoutError):
|
|
await asyncio.wait_for(proc.wait(), timeout=8)
|
|
if proc.returncode is None:
|
|
with contextlib.suppress(ProcessLookupError):
|
|
proc.kill()
|
|
print(f"[orchestrator] stopped {name}", flush=True)
|
|
if args.keep_config:
|
|
print(f"[orchestrator] kept config: {cfg_path}", flush=True)
|
|
else:
|
|
with contextlib.suppress(FileNotFoundError):
|
|
cfg_path.unlink()
|
|
for suffix in ("", "-shm", "-wal"):
|
|
with contextlib.suppress(FileNotFoundError):
|
|
Path(str(db_path) + suffix).unlink()
|
|
|
|
|
|
# ===========================================================================
|
|
# CLI
|
|
# ===========================================================================
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
p = argparse.ArgumentParser(
|
|
description="NOMYO Router load test (asyncio + httpx) with a built-in mock backend.",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
mode = p.add_argument_group("mode")
|
|
mode.add_argument("--serve-mock", action="store_true",
|
|
help="Run ONLY the mock backend (foreground) and exit on Ctrl-C.")
|
|
mode.add_argument("--mock-backend", action="store_true",
|
|
help="Spawn mock backend + router, load-test them, then tear down.")
|
|
|
|
tgt = p.add_argument_group("target (driver)")
|
|
tgt.add_argument("--url", default="http://127.0.0.1:12434",
|
|
help="Router base URL to drive (ignored with --mock-backend).")
|
|
tgt.add_argument("--api", choices=["ollama", "openai"], default="ollama",
|
|
help="ollama -> POST /api/chat ; openai -> POST /v1/chat/completions")
|
|
tgt.add_argument("--model", default="mock", help="Model name to request.")
|
|
tgt.add_argument("--prompt", default="Say hello and count to ten.",
|
|
help="User prompt sent in every request.")
|
|
stream_grp = tgt.add_mutually_exclusive_group()
|
|
stream_grp.add_argument("--stream", dest="stream", action="store_true",
|
|
help="Stream the response (default).")
|
|
stream_grp.add_argument("--no-stream", dest="stream", action="store_false",
|
|
help="Request a single non-streamed response.")
|
|
p.set_defaults(stream=True)
|
|
|
|
load = p.add_argument_group("load shape")
|
|
load.add_argument("--concurrency", type=int, default=32,
|
|
help="Number of concurrent virtual clients.")
|
|
load.add_argument("--duration", type=float, default=20.0,
|
|
help="Seconds to run each stage (ignored if --requests given).")
|
|
load.add_argument("--requests", type=int, default=None,
|
|
help="Send exactly N requests instead of running for --duration.")
|
|
load.add_argument("--ramp", default=None,
|
|
help="Comma-separated concurrency stages, e.g. 16,32,64,128 "
|
|
"(prints a table to find the knee).")
|
|
load.add_argument("--warmup", type=float, default=2.0,
|
|
help="Seconds of unmeasured warmup before each stage.")
|
|
load.add_argument("--timeout", type=float, default=120.0,
|
|
help="Per-request timeout (seconds).")
|
|
load.add_argument("--json", dest="json_out", default=None,
|
|
help="Also write the results as JSON to this path.")
|
|
|
|
mock = p.add_argument_group("mock backend tuning")
|
|
mock.add_argument("--mock-host", default="127.0.0.1")
|
|
mock.add_argument("--mock-port", type=int, default=0,
|
|
help="Mock backend port (0 = auto-pick a free port).")
|
|
mock.add_argument("--mock-models", default="mock",
|
|
help="Comma-separated model names the mock advertises.")
|
|
mock.add_argument("--mock-ttft-ms", type=float, default=0.0,
|
|
help="Simulated prefill latency before the first token (ms).")
|
|
mock.add_argument("--mock-tokens", type=int, default=64,
|
|
help="Completion length in tokens the mock emits.")
|
|
mock.add_argument("--mock-tok-ms", type=float, default=0.0,
|
|
help="Simulated per-token decode delay (ms) = inverse of tok/s.")
|
|
|
|
orch = p.add_argument_group("router orchestration (--mock-backend only)")
|
|
orch.add_argument("--router-port", type=int, default=0,
|
|
help="Router port (0 = auto-pick a free port).")
|
|
orch.add_argument("--router-workers", type=int, default=1,
|
|
help="uvicorn --workers for the spawned router.")
|
|
orch.add_argument("--router-max-conc", type=int, default=0,
|
|
help="max_concurrent_connections in the generated config "
|
|
"(0 = match peak concurrency so the router does not queue).")
|
|
orch.add_argument("--keep-config", action="store_true",
|
|
help="Do not delete the generated temp config on exit.")
|
|
return p
|
|
|
|
|
|
def _dump_json(path: str, args, results: list[Stats]) -> None:
|
|
out = {
|
|
"config": {k: getattr(args, k) for k in (
|
|
"api", "model", "stream", "duration", "requests", "warmup", "timeout",
|
|
"mock_tokens", "mock_ttft_ms", "mock_tok_ms")},
|
|
"stages": [],
|
|
}
|
|
for st in results:
|
|
lat = [s.latency * 1000 for s in st.ok]
|
|
ttfts = [s.ttft * 1000 for s in st.ok if s.ttft is not None]
|
|
out["stages"].append({
|
|
"concurrency": st.concurrency,
|
|
"wall_s": st.wall,
|
|
"requests": st.n_total,
|
|
"ok": st.n_ok,
|
|
"errors": st.n_total - st.n_ok,
|
|
"rps": (st.n_ok / st.wall) if st.wall else 0.0,
|
|
"latency_ms": {p: _pct(lat, p) for p in (50, 90, 95, 99)} | (
|
|
{"max": max(lat), "mean": statistics.mean(lat)} if lat else {}),
|
|
"ttft_ms": {p: _pct(ttfts, p) for p in (50, 90, 99)} if ttfts else {},
|
|
})
|
|
Path(path).write_text(json.dumps(out, indent=2))
|
|
print(f"\n[driver] wrote JSON results to {path}", flush=True)
|
|
|
|
|
|
def main() -> None:
|
|
args = build_parser().parse_args()
|
|
|
|
if args.serve_mock:
|
|
try:
|
|
serve_mock(args)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
return
|
|
|
|
if args.requests is not None and args.requests <= 0:
|
|
print("--requests must be > 0", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
if args.mock_backend:
|
|
results = asyncio.run(run_with_mock_backend(args)) or []
|
|
else:
|
|
results = asyncio.run(run_driver(args))
|
|
|
|
if args.json_out and results:
|
|
_dump_json(args.json_out, args, results)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|