omnigraph/crates/omnigraph-server/examples/bench_concurrent_http.rs

270 lines
8.4 KiB
Rust
Raw Permalink Normal View History

mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
//! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline).
//!
//! Drives concurrent `/change` requests against an in-process Omnigraph HTTP
//! server. Measures the global `Arc<RwLock<Omnigraph>>` lock penalty on
//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline.
//!
//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as`
//! is `&mut self`, so an engine-level concurrent bench either serializes on the
//! borrow checker (measures nothing) or drives multiple handles (measures Lance
//! contention, not the server bottleneck). Driving the HTTP server is the only
//! way to measure the actual `RwLock<Omnigraph>` contention this work removes.
//!
//! Usage:
//! ```sh
//! cargo run --release -p omnigraph-server --example bench_concurrent_http -- \
//! --tables 16 --actors 16 --ops-per-actor 1000 --mode disjoint \
//! --output bench-results/baseline-main/cross-table.json
//! ```
//!
//! Modes:
//! - `disjoint`: each actor writes to a distinct node type (cross-table fanout)
//! - `same-key`: all actors write to the same node type (hot-key contention)
//! - `mixed`: each actor writes to a different table per op (round-robin)
use std::path::PathBuf;
use std::time::{Duration, Instant};
use axum::Router;
use axum::body::{Body, to_bytes};
use axum::http::{Method, Request, StatusCode};
use clap::{Parser, ValueEnum};
use omnigraph::db::Omnigraph;
use omnigraph_server::api::ChangeRequest;
use omnigraph_server::{AppState, build_app};
use serde::Serialize;
use tower::ServiceExt;
#[derive(Parser, Debug)]
#[command(about = "Concurrent HTTP bench for MR-686")]
struct Args {
/// Number of distinct node types in the schema.
#[arg(long, default_value_t = 16)]
tables: usize,
/// Number of concurrent actors driving requests.
#[arg(long, default_value_t = 16)]
actors: usize,
/// Operations per actor.
#[arg(long, default_value_t = 100)]
ops_per_actor: usize,
/// Workload mode.
#[arg(long, value_enum, default_value_t = Mode::Disjoint)]
mode: Mode,
/// Output file for the JSON results. Stdout always gets a copy.
#[arg(long)]
output: Option<PathBuf>,
/// Optional label to record alongside results (e.g. "baseline-main").
#[arg(long, default_value = "")]
label: String,
}
#[derive(Clone, Copy, Debug, ValueEnum, Serialize)]
#[serde(rename_all = "kebab-case")]
enum Mode {
Disjoint,
SameKey,
Mixed,
}
#[derive(Serialize, Debug)]
struct BenchResults {
label: String,
mode: Mode,
tables: usize,
actors: usize,
ops_per_actor: usize,
total_ops: usize,
error_count: usize,
wall_time_ms: u64,
throughput_ops_per_sec: f64,
p50_ms: f64,
p95_ms: f64,
p99_ms: f64,
p999_ms: f64,
max_ms: f64,
notes: &'static str,
}
fn build_schema(num_tables: usize) -> String {
let mut schema = String::new();
for i in 0..num_tables {
schema.push_str(&format!(
"node Item{i} {{\n name: String @key\n value: I32?\n}}\n\n"
));
}
schema
}
fn build_query_source(table_idx: usize) -> String {
format!(
"query insert_item($name: String, $value: I32) {{\n insert Item{table_idx} {{ name: $name, value: $value }}\n}}"
)
}
fn pick_table(actor_idx: usize, op_idx: usize, mode: Mode, num_tables: usize) -> usize {
match mode {
Mode::Disjoint => actor_idx % num_tables,
Mode::SameKey => 0,
Mode::Mixed => (actor_idx.wrapping_mul(7919) ^ op_idx) % num_tables,
}
}
async fn drive_actor(
app: Router,
actor_idx: usize,
ops: usize,
mode: Mode,
num_tables: usize,
) -> (Vec<Duration>, usize) {
let mut latencies = Vec::with_capacity(ops);
let mut errors = 0usize;
for op_idx in 0..ops {
let table_idx = pick_table(actor_idx, op_idx, mode, num_tables);
let request_body = ChangeRequest {
feat: inline query strings in CLI and HTTP server (#110) * feat(MR-656): inline query strings in CLI and HTTP server CLI: - Add -e / --query-string <STRING> to omnigraph read and omnigraph change - Exactly one of --query, --query-string, --alias is required (3-way XOR) - Empty --query-string is rejected with a clear error HTTP: - New POST /query (read-only, clean field names: query/name/params/branch/snapshot) - Mutations on /query are rejected with 400 -- use POST /change instead - ChangeRequest fields polished: query (alias query_source), name (alias query_name) - POST /read and POST /change remain byte-compatible for existing clients Tests: - cli.rs: -e happy-path on read/change, mutex error vs --query, empty -e rejected - system_local.rs: inline -e read and -e change exercise the local flow - system_remote.rs: inline -e read/change over HTTP plus direct /query 200/400 - server.rs: /query 200, /query 400 on mutation, /change legacy field alias - openapi.rs: new /query path, QueryRequest schema, ChangeRequest field-name polish Docs: cli.md (-e examples), cli-reference.md (read/change rows), server.md (/query) Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com> * feat(MR-656): rename read/change to query/mutate with deprecation signals HTTP server: - Add POST /mutate as canonical write endpoint (pairs with POST /query). - Mark POST /read and POST /change as deprecated. Three-channel signal: * OpenAPI: `deprecated: true` on the operation (every codegen flags the generated SDK method). * RFC 9745: response `Deprecation: true` header on every response. * RFC 8288: response `Link: </successor>; rel="successor-version"` pointing at /query and /mutate respectively. - Share business logic across /mutate and /change via run_mutate(); the /change wrapper is the only place that adds the deprecation headers. - ChangeRequest field aliases (query_source/query_name) preserved. - AliasCommand serde now accepts `query`/`mutate` alongside `read`/`change`. CLI: - Promote `omnigraph query` / `omnigraph mutate` to top-level canonical subcommands (clap visible_alias keeps `omnigraph read` / `omnigraph change` working forever). - Promote `omnigraph lint` / `omnigraph check` to top-level (was nested under `omnigraph query lint`, which is now a deprecated argv shim that rewrites to the canonical form). - Argv-level preprocessing prints a one-line deprecation warning to stderr when any legacy spelling is used. Canonical names are silent. Tests: - Server: /mutate works, /change emits Deprecation+Link headers, /read emits Deprecation+Link headers, /query carries no deprecation signal. - OpenAPI: /read and /change flagged deprecated; /query and /mutate not. - CLI: canonical `lint` matches deprecated `query lint` / `query check` output; `read` / `change` print deprecation warnings. Docs: - cli.md: new canonical examples; "Deprecated names" migration table. - cli-reference.md: top-level table updated; aliases.<name>.command accepts both legacy and canonical spellings. - server.md: endpoint inventory shows /query and /mutate as canonical and /read and /change as deprecated; dedicated section explains the three-channel deprecation signal. - og-cheet-sheet.md: use new `omnigraph lint` / `omnigraph check`. - openapi.json regenerated. Migration is purely cosmetic — every deprecated form continues to work indefinitely; only the spelling changes. Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com> * fix(MR-656): address Devin Review findings on /query and /change Two issues raised by Devin Review on PR #110: 1. `POST /query` mutation-rejection error pointed at the deprecated `/change` endpoint instead of the canonical `/mutate`. Fixed in three places: the runtime error message in `server_query`, the utoipa 400-response description, and the handler doc comment. The `QueryRequest` schema docstrings in `api.rs` got the same update so the openapi.json bodies match. Server and openapi tests updated. 2. `execute_change_remote` serialized `ChangeRequest` directly, which emits the new canonical field names `query` / `name` on the wire. `#[serde(alias = "query_source")]` only affects deserialization, so a newer CLI talking to an older server would have its `/change` POST body fail with "missing field: query_source". Fixed by extracting a `legacy_change_request_body` helper that hand-rolls the JSON with the legacy keys (`query_source` / `query_name`), the same byte-stable contract `execute_read_remote` already uses against `/read`. Added two unit tests on the helper to lock the wire shape in. Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com> * docs(dev): RFC 001 — inline + stored queries, envelope, MCP Tracked artifact consolidating the design across MR-656 (this branch), MR-976 (Phase 1 envelope hardening parent, with MR-977/978/979/980 sub-issues), and MR-969 (stored queries + MCP). Sections: * Two paths, one engine — inline `/query` + `/mutate` (this PR) coexist with stored `/queries/{name}` (MR-969). Same `run_query` / `run_mutate` backend (the fold-in landed in the previous commit). * Request envelope ("before") — Idempotency-Key, If-Match, X-Deadline, X-Trace-Id, expect, dry_run, fields. Phase 1 ships the load-bearing subset on `/mutate`. * Response envelope ("after") — audit_id, snapshot_id, commit_id, stats, warnings. Closes the provenance loop today's `ChangeOutput` leaves open. * `.gq` pragmas — `@description`, `@returns`, `@mcp`. Source-of-truth for the stored-query agent contract; no separate YAML registry. * Multi-graph MCP — per-graph `/graphs/{id}/mcp/tools` + `/mcp/invoke`. Token binds to one graph by default; cross-graph agents loop. * Cedar split — `read`/`change` for inline, `invoke_query` for stored. Operators deny ad-hoc for agent groups while keeping curated tool list open. * Rejected alternatives — per-env override files, compiled bundles, tool-name prefixing across graphs, body-field graph dispatch. Index entry added under "Active Implementation Plans" so future agents land on the RFC before touching queries / mutations / envelope code. `scripts/check-agents-md.sh` clean (35 links, 34 docs). * docs(server): clarify why run_query lacks AppState parameter run_mutate takes state for workload admission; run_query doesn't because reads aren't admission-gated today. Mark the asymmetry as intentional and flag the two future events that would grow the signature: Phase 1's `expect: { max_rows_scanned: N }` budget (MR-976) or per-actor admission extending to stored-read invocations (MR-969). Prevents the natural "make these symmetrical" follow-up. * refactor(server): run_query / run_mutate take &ResolvedActor Replace `Option<Extension<ResolvedActor>>` in the helpers with `Option<&ResolvedActor>`. Saves MR-969's stored-query handler from wrapping a bare actor in axum's `Extension(...)` before calling. Handler signatures (`server_query`, `server_read`, `server_mutate`, `server_change`) keep `Option<Extension<ResolvedActor>>` because that is what axum injects, and unwrap at the call site with `actor.as_ref().map(|Extension(actor)| actor)`. Net: -13/+10 LOC, 89/0 server tests pass. * docs(releases): v0.6.0 — describe inline + canonical-named queries (MR-656) Extend the v0.6.0 release notes to cover the third piece of work landing alongside the graph terminology rename and multi-graph server mode: canonical-named `POST /query` and `POST /mutate` endpoints, the CLI's new `-e/--query-string` flag, the top-level promotion of `lint` / `check`, and the three-channel deprecation signal on `/read` and `/change` (OpenAPI `deprecated: true` + RFC 9745 + RFC 8288). Additions: * Top blurb: "Two pieces" -> "Three pieces" with a bullet describing the rename + inline flow. * Breaking Changes: new "Query / mutation rename" subsection covering the `ChangeRequest` field rename (with the back-compat serde aliases and the CLI's `legacy_change_request_body` byte-stable wire helper) and the `omnigraph query lint` -> `omnigraph lint` move. * New: 5 bullets — the two endpoints, the CLI subcommands, the `-e` flag, the deprecation signal channels, the widened `aliases.<name>.command` vocabulary. * User Impact: one bullet making explicit that the rename is cosmetic on the client side and migration is voluntary. * Documentation: pointers to the updated `server.md` / `cli.md` / `cli-reference.md` and the new `docs/dev/rfc-001-queries-envelope-mcp.md`. +15/-1 lines. `./scripts/check-agents-md.sh` clean. * refactor(cli): demote `check` from visible_alias to deprecation shim `omnigraph check` was a clap `visible_alias` on `lint`, advertised in `--help` as an equivalent canonical name. Per MR-981 §6 (long-form flags as canonical, short forms as visible aliases), visible aliases on subcommand names hurt agent CX: agents emit either spelling depending on training-data drift, and there's no length signal pointing at the canonical name. Changes: * Remove `#[command(visible_alias = "check")]` from the `Lint` variant. `omnigraph --help` now shows only `lint`. * Add bare `check` to `rewrite_deprecated_argv` so `omnigraph check <args>` still works — it rewrites to `omnigraph lint <args>` and emits a one-line stderr deprecation warning, matching the existing pattern for `read` / `change` / `query lint` / `query check`. * Fix the nested `query check` shim to substitute `check` -> `lint` in the rewritten argv (previously it relied on `check` being a visible_alias to reach the `Lint` variant). * New test `deprecated_check_top_level_rewrites_to_lint` covers: bare `check` produces identical stdout to `lint`, emits the deprecation warning, and `check` does NOT appear as an alias in `omnigraph --help`. * Release notes updated to reflect the deprecation-shim treatment and cross-reference MR-981 §6 reasoning. Cargo / Go users typing `check` still work indefinitely; one stderr nudge per invocation teaches the canonical name. Agents see only `lint` in `--help --json` so they emit one canonical form. 67/0 omnigraph-cli tests pass; 39 workspace test suites green. --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Ragnor Comerford <ragnor.comerford@gmail.com> Co-authored-by: Ragnor Comerford <hello@ragnor.co>
2026-05-29 13:41:54 +02:00
query: build_query_source(table_idx),
name: Some("insert_item".to_string()),
mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
params: Some(serde_json::json!({
"name": format!("a{actor_idx}_o{op_idx}"),
"value": op_idx as i32,
})),
branch: None,
};
let body = serde_json::to_vec(&request_body).unwrap();
let req = Request::builder()
.method(Method::POST)
.uri("/change")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let start = Instant::now();
let response = match app.clone().oneshot(req).await {
Ok(r) => r,
Err(e) => {
eprintln!("actor {actor_idx} op {op_idx} transport error: {e:?}");
errors += 1;
continue;
}
};
let elapsed = start.elapsed();
let status = response.status();
if status != StatusCode::OK {
errors += 1;
// Drain body for logging on the first few failures.
if errors <= 3 {
let body = to_bytes(response.into_body(), 64 * 1024)
.await
.unwrap_or_default();
mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
eprintln!(
"actor {actor_idx} op {op_idx} status {status} body {}",
String::from_utf8_lossy(&body)
);
}
}
latencies.push(elapsed);
}
(latencies, errors)
}
#[tokio::main]
async fn main() {
let args = Args::parse();
if args.tables == 0 || args.actors == 0 || args.ops_per_actor == 0 {
eprintln!("--tables, --actors, --ops-per-actor must all be > 0");
std::process::exit(2);
}
let temp = tempfile::tempdir().expect("tempdir");
let graph = temp.path().join("bench.omni");
mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
let schema = build_schema(args.tables);
Omnigraph::init(graph.to_str().unwrap(), &schema)
mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
.await
.expect("init graph");
mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
let state = AppState::open(graph.to_string_lossy().to_string())
mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc<ArcSwap<Catalog>> - schema_source: String -> Arc<ArcSwap<String>> - public accessors return Arc<Catalog> / Arc<String>; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:22:38 +02:00
.await
.expect("open AppState");
let app = build_app(state);
eprintln!(
"running mode={:?} tables={} actors={} ops_per_actor={}",
args.mode, args.tables, args.actors, args.ops_per_actor
);
let start = Instant::now();
let mut handles = Vec::with_capacity(args.actors);
for actor_idx in 0..args.actors {
let app = app.clone();
let mode = args.mode;
let ops = args.ops_per_actor;
let num_tables = args.tables;
handles.push(tokio::spawn(async move {
drive_actor(app, actor_idx, ops, mode, num_tables).await
}));
}
let mut all_latencies: Vec<Duration> = Vec::with_capacity(args.actors * args.ops_per_actor);
let mut total_errors = 0usize;
for h in handles {
let (lats, errs) = h.await.expect("actor task panicked");
all_latencies.extend(lats);
total_errors += errs;
}
let wall = start.elapsed();
all_latencies.sort();
let n = all_latencies.len();
let pct = |p: f64| -> f64 {
if n == 0 {
return 0.0;
}
let idx = ((n as f64 - 1.0) * p).round() as usize;
all_latencies[idx].as_secs_f64() * 1000.0
};
let max_ms = all_latencies
.last()
.map(|d| d.as_secs_f64() * 1000.0)
.unwrap_or(0.0);
let throughput = if wall.as_secs_f64() > 0.0 {
n as f64 / wall.as_secs_f64()
} else {
0.0
};
let results = BenchResults {
label: args.label.clone(),
mode: args.mode,
tables: args.tables,
actors: args.actors,
ops_per_actor: args.ops_per_actor,
total_ops: n,
error_count: total_errors,
wall_time_ms: wall.as_millis() as u64,
throughput_ops_per_sec: throughput,
p50_ms: pct(0.50),
p95_ms: pct(0.95),
p99_ms: pct(0.99),
p999_ms: pct(0.999),
max_ms,
notes: "MR-686 PR 0 baseline. Drives /change via Tower oneshot.",
};
let json = serde_json::to_string_pretty(&results).unwrap();
println!("{json}");
if let Some(path) = args.output.as_ref() {
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent).expect("mkdir output parent");
}
std::fs::write(path, &json).expect("write output");
eprintln!("wrote {}", path.display());
}
if total_errors > 0 {
eprintln!("WARN: {total_errors} requests failed");
std::process::exit(1);
}
}