test(cli): the embedded/remote parity matrix (RFC-009 Phase 1)

The referee before any unification moves: every forked verb runs once
against the local graph and once against a spawned server on a twin copy
of the same fixture, with the SAME actor (--as locally; bearer-resolved
remotely) and the SAME Cedar bundle on both arms — like-for-like
enforcement is part of the harness (a tokens-only server is default-deny
by design; comparing that against a bare local arm measures
configuration, not the fork). Declared-volatile fields (ids, wall-clock,
transport locations) scrub to placeholders; everything else must match
exactly, and exit codes must match for shared failures.

Headline result: 11 rows green with an EMPTY divergence ledger — the
arms agree on every verb today. The ledger (KNOWN_DIVERGENCES) exists so
any future divergence is pinned or filed, never silently repaired;
repairs are Phase 3's job, gated by this referee staying green.

One engine observation surfaced and filed (#207): inline execution with
a declared-but-unbound param matches ALL rows on both arms, while the
stored-query invoke path hard-errors — a cross-path asymmetry the matrix
pins as agreeing behavior pending a deliberate fix. Documented
exclusions (graphs list, ingest/load-over-/ingest, storage-plane verbs)
map to RFC-009 Phases 4-5.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-12 17:50:46 +03:00
parent e0d80c0062
commit 08c9b03d40
4 changed files with 433 additions and 2 deletions

View file

@ -0,0 +1,255 @@
//! RFC-009 Phase 1 — the embedded/remote parity referee.
//!
//! For every CLI verb with an `is_remote` fork, run the identical
//! invocation against (a) the local graph directly and (b) a spawned
//! server on a twin copy of the same graph, with the SAME actor on both
//! arms (local `--as act-parity`; remote bearer token resolving to
//! `act-parity`). Scrub the declared-volatile allowlist
//! (`support::scrub_volatile` — ids, wall-clock, transport locations);
//! everything else must match exactly.
//!
//! This test PINS behavior; it does not idealize it. Genuine divergences
//! discovered here are recorded in `KNOWN_DIVERGENCES` below (and filed),
//! never silently repaired — repairs are Phase 3's job, gated by this
//! referee staying green through the refactor.
use tempfile::TempDir;
mod support;
use support::*;
/// Divergences between the arms that exist today, pinned as expectations.
/// Removing an entry requires the corresponding behavior change to be a
/// deliberate, release-noted decision (RFC-009 Compatibility).
const KNOWN_DIVERGENCES: &[&str] = &[
// populated by the rows below as they are written
];
/// One matched setup per row: twin graphs + the SAME Cedar bundle on both
/// arms (the local arm via --config top-level policy.file; the server via
/// its config). Returns everything a row needs.
struct Parity {
_temp: TempDir,
local: std::path::PathBuf,
local_cfg: std::path::PathBuf,
server: TestServer,
}
fn parity() -> Parity {
let (temp, local, remote) = twin_graphs();
let (local_cfg, server_cfg) = parity_configs(temp.path(), &local, &remote);
let server = spawn_server_with_config_env(
&server_cfg,
&[(
"OMNIGRAPH_SERVER_BEARER_TOKENS_JSON",
r#"{"act-parity":"parity-tok"}"#,
)],
);
Parity {
_temp: temp,
local,
local_cfg,
server,
}
}
impl Parity {
fn run(&self, args: &[&str]) -> (std::process::Output, std::process::Output) {
run_both_with_config(&self.local, Some(&self.local_cfg), &self.server.base_url, args)
}
}
fn assert_parity(verb: &str, local: &std::process::Output, remote: &std::process::Output) {
assert_eq!(
local.status.code(),
remote.status.code(),
"{verb}: exit codes diverge\nlocal: {local:?}\nremote: {remote:?}"
);
if local.status.success() {
let local_json = scrubbed_json(local);
let remote_json = scrubbed_json(remote);
assert_eq!(
local_json, remote_json,
"{verb}: scrubbed JSON diverges (left=local, right=remote)"
);
}
}
#[test]
fn parity_query() {
let p = parity();
let query = fixture("test.gq");
let (l, r) = p.run(&[
"query",
"--query",
query.to_str().unwrap(),
"--name",
"get_person",
"--params",
r#"{"name":"Alice"}"#,
"--json",
],
);
assert_parity("query", &l, &r);
}
#[test]
fn parity_schema_show() {
let p = parity();
let (l, r) = p.run(&["schema", "show", "--json"]);
assert_parity("schema show", &l, &r);
}
#[test]
fn parity_snapshot() {
let p = parity();
let (l, r) = p.run(&["snapshot", "--json"]);
assert_parity("snapshot", &l, &r);
}
#[test]
fn parity_branch_list() {
let p = parity();
let (l, r) = p.run(&["branch", "list", "--json"]);
assert_parity("branch list", &l, &r);
}
#[test]
fn parity_commit_list() {
let p = parity();
let (l, r) = p.run(&["commit", "list", "--json"]);
assert_parity("commit list", &l, &r);
}
#[test]
fn parity_mutate() {
let p = parity();
let (l, r) = p.run(&[
"mutate",
"-e",
"query add($name: String, $age: I32) { insert Person { name: $name, age: $age } }",
"--params",
r#"{"name":"Parity","age":7}"#,
"--json",
],
);
assert_parity("mutate", &l, &r);
}
#[test]
fn parity_branch_create_delete() {
let p = parity();
let (l, r) = p.run(&["branch", "create", "--from", "main", "parity-branch", "--json"],
);
assert_parity("branch create", &l, &r);
let (l, r) = p.run(&["branch", "delete", "parity-branch", "--json"],
);
assert_parity("branch delete", &l, &r);
}
#[test]
fn parity_branch_merge() {
let p = parity();
let (l, r) = p.run(&["branch", "create", "--from", "main", "feature", "--json"],
);
assert_parity("branch create (merge setup)", &l, &r);
let (l, r) = p.run(&["branch", "merge", "feature", "--into", "main", "--json"],
);
assert_parity("branch merge", &l, &r);
}
#[test]
fn parity_load() {
let p = parity();
let data = p.local.parent().unwrap().join("rows.jsonl");
std::fs::write(
&data,
"{\"type\":\"Person\",\"data\":{\"name\":\"Loaded\",\"age\":1}}\n",
)
.unwrap();
let (l, r) = p.run(&[
"load",
"--mode",
"merge",
"--data",
data.to_str().unwrap(),
"--json",
],
);
assert_parity("load", &l, &r);
}
// ---- error parity: exit codes must match for shared failure cases ----
#[test]
fn parity_errors_share_exit_codes() {
let p = parity();
// unknown branch on merge
let (l, r) = p.run(&["branch", "merge", "no-such-branch", "--into", "main", "--json"],
);
assert_eq!(
(l.status.success(), r.status.success()),
(false, false),
"merge of unknown branch must fail on both arms\nlocal {l:?}\nremote {r:?}"
);
// unknown query name in the source
let query = fixture("test.gq");
let (l, r) = p.run(&[
"query",
"--query",
query.to_str().unwrap(),
"--name",
"no_such_query",
"--json",
],
);
assert_eq!(
(l.status.success(), r.status.success()),
(false, false),
"unknown query name must fail on both arms\nlocal {l:?}\nremote {r:?}"
);
// Discovery (parity HOLDS, behavior surprising): an inline query run
// with a declared-but-unbound param does NOT error on either arm — it
// returns every row (the filter drops), while the stored-query invoke
// path hard-errors 'parameter not provided'. Pinned here as agreeing
// behavior; the cross-path asymmetry is filed separately.
let (l, r) = p.run(&[
"query",
"--query",
query.to_str().unwrap(),
"--name",
"get_person",
"--json",
],
);
assert_eq!(
(l.status.success(), r.status.success()),
(true, true),
"unbound-param inline query currently SUCCEEDS on both arms (matches-all)"
);
}
// ---- documented exclusions (not bugs; the Phase 4 capability table) ----
//
// - `graphs list`: server-only today; becomes Both-capability when the
// embedded arm enumerates the cluster catalog (RFC-009 open Q3, answered).
// - `ingest`: deprecated alias of load; the remote `load` arm itself rides
// the deprecated /ingest route today (RFC-009 Phase 5 flips it to /load —
// this matrix's `parity_load` row is where that flip becomes visible).
// - `init`, `optimize`, `repair`, `cleanup`, `cluster *`: storage-plane by
// design (must work with the server down); Phase 4 declares this.
#[allow(dead_code)]
const EXCLUSIONS_DOCUMENTED: () = ();
#[test]
fn known_divergences_ledger_is_current() {
// The ledger exists so removals are deliberate: an empty list with all
// rows green means the arms agree everywhere the matrix looks.
assert!(
KNOWN_DIVERGENCES.is_empty(),
"divergences are pinned: {KNOWN_DIVERGENCES:?}"
);
}

View file

@ -688,3 +688,170 @@ pub fn queries_test_config(graph_uri: &str, entry: &str, gq_file: &str) -> Strin
graph_uri.replace('\'', "''")
)
}
// ---- RFC-009 Phase 1: parity-matrix harness ----
/// Twin graphs for embedded-vs-remote comparison: the same loaded fixture
/// copied to two roots, so write verbs can run once per arm on identical
/// state. Returns (tempdir-guard, local_graph, remote_graph).
pub fn twin_graphs() -> (TempDir, PathBuf, PathBuf) {
let temp = tempdir().unwrap();
let seed = temp.path().join("seed");
fs::create_dir_all(&seed).unwrap();
let graph = seed.join("server.omni");
init_graph(&graph);
load_fixture(&graph);
let local = temp.path().join("local.omni");
let remote = temp.path().join("remote.omni");
copy_dir(&graph, &local);
copy_dir(&graph, &remote);
(temp, local, remote)
}
pub fn copy_dir(from: &Path, to: &Path) {
fs::create_dir_all(to).unwrap();
for entry in fs::read_dir(from).unwrap() {
let entry = entry.unwrap();
let target = to.join(entry.file_name());
if entry.file_type().unwrap().is_dir() {
copy_dir(&entry.path(), &target);
} else {
fs::copy(entry.path(), &target).unwrap();
}
}
}
/// Scrub declared-volatile fields (RFC-009 Phase 1 allowlist) so the rest
/// of the JSON must match exactly. Key-based, recursive; both arms get the
/// same placeholders. Everything NOT listed here is contract.
pub fn scrub_volatile(value: &mut serde_json::Value) {
const VOLATILE_KEYS: &[&str] = &[
// identity-bearing per-instance values
"commit_id", "id", "parent_id", "merge_parent_id", "snapshot",
// wall-clock
"committed_at", "created_at", "timestamp",
// transport / location
"uri", "path",
];
match value {
serde_json::Value::Object(map) => {
for (key, val) in map.iter_mut() {
if VOLATILE_KEYS.contains(&key.as_str()) && !val.is_null() {
*val = serde_json::Value::String(format!("<volatile:{key}>"));
} else {
scrub_volatile(val);
}
}
}
serde_json::Value::Array(items) => {
for item in items {
scrub_volatile(item);
}
}
_ => {}
}
}
pub const PARITY_ACTOR: &str = "act-parity";
pub const PARITY_TOKEN: &str = "parity-tok";
/// Identical Cedar bundle for BOTH arms — like-for-like enforcement is part
/// of the parity contract (a bare local arm is permissive while a
/// tokens-only server is default-deny; comparing those would measure
/// configuration, not the fork).
pub fn parity_policy_yaml() -> String {
r#"version: 1
groups:
parity: ["act-parity"]
protected_branches: []
rules:
- id: reads
allow:
actors: { group: parity }
actions: [read, export, invoke_query]
- id: read-scope
allow:
actors: { group: parity }
actions: [read, export]
branch_scope: any
- id: writes
allow:
actors: { group: parity }
actions: [change]
branch_scope: any
- id: branching
allow:
actors: { group: parity }
actions: [schema_apply, branch_create, branch_delete, branch_merge]
target_branch_scope: any
"#
.to_string()
}
/// Per-arm config files carrying the same policy. Both arms address the
/// graph by positional URI, so the TOP-LEVEL policy.file applies on each
/// side (single-graph semantics).
pub fn parity_configs(root: &Path, _local_graph: &Path, remote_graph: &Path) -> (PathBuf, PathBuf) {
let policy = root.join("parity.policy.yaml");
fs::write(&policy, parity_policy_yaml()).unwrap();
let local_cfg = root.join("local.omnigraph.yaml");
fs::write(
&local_cfg,
format!("policy:\n file: {}\n", policy.display()),
)
.unwrap();
let server_cfg = root.join("server.omnigraph.yaml");
fs::write(
&server_cfg,
format!(
"server:\n graph: parity\ngraphs:\n parity:\n uri: {}\n policy:\n file: {}\n",
remote_graph.display(),
policy.display()
),
)
.unwrap();
(local_cfg, server_cfg)
}
/// Run one CLI invocation per arm with identical verb args: locally against
/// `local_graph` (--as actor) and remotely against a server URL whose token
/// resolves to the same actor. Returns raw Outputs for exit-code + JSON
/// comparison by the caller.
pub fn run_both(
local_graph: &Path,
server_url: &str,
args: &[&str],
) -> (std::process::Output, std::process::Output) {
run_both_with_config(local_graph, None, server_url, args)
}
pub fn run_both_with_config(
local_graph: &Path,
local_config: Option<&Path>,
server_url: &str,
args: &[&str],
) -> (std::process::Output, std::process::Output) {
let mut local = cli();
local.arg(args[0]).arg(local_graph).args(&args[1..]).arg("--as").arg(PARITY_ACTOR);
if let Some(config) = local_config {
local.arg("--config").arg(config);
}
let local_out = local.output().unwrap();
let mut remote = cli();
remote
.env("OMNIGRAPH_BEARER_TOKEN", PARITY_TOKEN)
.arg(args[0])
.arg(server_url)
.args(&args[1..]);
let remote_out = remote.output().unwrap();
(local_out, remote_out)
}
/// Parse, scrub, and pretty-print for diffable assertion messages.
pub fn scrubbed_json(output: &std::process::Output) -> String {
let mut value: serde_json::Value = serde_json::from_slice(&output.stdout)
.unwrap_or_else(|e| panic!("non-JSON stdout ({e}): {output:?}"));
scrub_volatile(&mut value);
serde_json::to_string_pretty(&value).unwrap()
}