From fcb47620d301bc6e82519dfe4965cd12ab772b94 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Thu, 7 May 2026 16:22:38 +0200 Subject: [PATCH] mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bundles the working-tree state from the prior session (PR 0 bench harness, PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration) together with the first half of PR 2's interior-mutability foundation (catalog and schema_source wrapped in Arc>). The two streams intermix in 7 of the same files, so splitting via git add -p was impractical. Subsequent PR 2 steps land as separate atomic commits. PR 0 — server-level concurrent /change bench harness - crates/omnigraph-server/examples/bench_concurrent_http.rs (new) - .context/bench-results/{baseline-main,after-pr1}/ (gitignored) PR 1a — drop the audit_actor_id field, thread per-call - removed Omnigraph::audit_actor_id and the swap-restore patterns in mutation.rs, merge.rs, loader/mod.rs - actor_id: Option<&str> threaded through MutationStaging::finalize, mutate_with_current_actor, ingest_with_current_actor, branch_merge_impl, branch_merge_on_current_target, commit_prepared_updates*, record_merge_commit, commit_updates_on_branch_with_expected - apply_schema and ensure_indices_for_branch pass None (system-attributed) PR 1b — per-(table_key, branch) write queue + revalidation + sidecar - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager, acquire/acquire_many, sorted+deduped acquisition; 6 unit tests - Arc field on Omnigraph + db.write_queue() accessor - MutationStaging::finalize split into stage_all (Phase A, no queue) and StagedMutation::commit_all (Phase B, acquire_many + revalidate pins + sidecar + commit_staged); guards held across publisher - delete-only mutations now emit recovery sidecars; revalidation extended to inline_committed tables - branch_merge_on_current_target, apply_schema_with_lock, and ensure_indices_for_branch acquire per-table queues for their touched tables PR 2 Step B (partial) — catalog and schema_source via ArcSwap - catalog: Catalog -> Arc> - schema_source: String -> Arc> - public accessors return Arc / Arc; readers bind locally where the borrow has to outlive an expression - new pub(crate) store_catalog / store_schema_source helpers replace the field assignments in apply_schema and reload_schema_if_source_changed - 117 tests across lifecycle/end_to_end/branching/runs pass; engine lib + workspace compile clean Coordinator wrap (Mutex) and the &mut self -> &self engine API conversion follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 1 + crates/omnigraph-cli/src/main.rs | 2 +- .../examples/bench_concurrent_http.rs | 267 +++++++++++++ crates/omnigraph/src/db/mod.rs | 1 + crates/omnigraph/src/db/omnigraph.rs | 83 ++-- crates/omnigraph/src/db/omnigraph/export.rs | 10 +- crates/omnigraph/src/db/omnigraph/optimize.rs | 4 +- .../src/db/omnigraph/schema_apply.rs | 39 +- .../omnigraph/src/db/omnigraph/table_ops.rs | 101 +++-- crates/omnigraph/src/db/write_queue.rs | 231 +++++++++++ crates/omnigraph/src/exec/merge.rs | 39 +- crates/omnigraph/src/exec/mutation.rs | 29 +- crates/omnigraph/src/exec/query.rs | 17 +- crates/omnigraph/src/exec/staging.rs | 361 ++++++++++++++---- crates/omnigraph/src/loader/mod.rs | 39 +- 15 files changed, 1041 insertions(+), 183 deletions(-) create mode 100644 crates/omnigraph-server/examples/bench_concurrent_http.rs create mode 100644 crates/omnigraph/src/db/write_queue.rs diff --git a/AGENTS.md b/AGENTS.md index a98b974..370cfd8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -236,5 +236,6 @@ Rules: 4. **Re-verify before recommending.** If you cite a flag, env var, endpoint, or constant to the user or in code, grep for it in source first. Memory and docs go stale; the code is authoritative. 5. **Keep AGENTS.md a map, not an encyclopedia.** New deep content goes into `docs/`. Add an entry to "Where to find each topic" instead of pasting prose into this file. The "Always-on rules" section is the exception — it's for invariants that should always be in scope. 6. **Re-read on schema/query/IR changes.** Edits to `schema.pest`, `query.pest`, `ir/lower.rs`, `query/typecheck.rs`, or `query/lint.rs` should trigger a re-read of [docs/schema-language.md](docs/schema-language.md), [docs/query-language.md](docs/query-language.md), and [docs/execution.md](docs/execution.md) to confirm they still describe reality. +7. **Always make smaller commits.** Each commit does one thing, compiles, and passes tests; mechanical refactors land separately from the behavior changes they enable. CI check: `scripts/check-agents-md.sh` verifies that every `docs/*.md` link in this file resolves and that every doc in the canonical set is linked. Run it locally before opening a PR if you've moved or renamed docs. diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index f58fb1b..4fe89e0 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -1425,7 +1425,7 @@ async fn execute_query_lint( let uri = resolve_local_uri(config, cli_uri, cli_target, "query lint")?; let db = Omnigraph::open(&uri).await?; Ok(lint_query_file( - db.catalog(), + &db.catalog(), &query_source, query_path, QueryLintSchemaSource::repo(uri), diff --git a/crates/omnigraph-server/examples/bench_concurrent_http.rs b/crates/omnigraph-server/examples/bench_concurrent_http.rs new file mode 100644 index 0000000..11505e7 --- /dev/null +++ b/crates/omnigraph-server/examples/bench_concurrent_http.rs @@ -0,0 +1,267 @@ +//! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline). +//! +//! Drives concurrent `/change` requests against an in-process Omnigraph HTTP +//! server. Measures the global `Arc>` lock penalty on +//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline. +//! +//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as` +//! is `&mut self`, so an engine-level concurrent bench either serializes on the +//! borrow checker (measures nothing) or drives multiple handles (measures Lance +//! contention, not the server bottleneck). Driving the HTTP server is the only +//! way to measure the actual `RwLock` contention this work removes. +//! +//! Usage: +//! ```sh +//! cargo run --release -p omnigraph-server --example bench_concurrent_http -- \ +//! --tables 16 --actors 16 --ops-per-actor 1000 --mode disjoint \ +//! --output bench-results/baseline-main/cross-table.json +//! ``` +//! +//! Modes: +//! - `disjoint`: each actor writes to a distinct node type (cross-table fanout) +//! - `same-key`: all actors write to the same node type (hot-key contention) +//! - `mixed`: each actor writes to a different table per op (round-robin) + +use std::path::PathBuf; +use std::time::{Duration, Instant}; + +use axum::Router; +use axum::body::{Body, to_bytes}; +use axum::http::{Method, Request, StatusCode}; +use clap::{Parser, ValueEnum}; +use omnigraph::db::Omnigraph; +use omnigraph_server::api::ChangeRequest; +use omnigraph_server::{AppState, build_app}; +use serde::Serialize; +use tower::ServiceExt; + +#[derive(Parser, Debug)] +#[command(about = "Concurrent HTTP bench for MR-686")] +struct Args { + /// Number of distinct node types in the schema. + #[arg(long, default_value_t = 16)] + tables: usize, + /// Number of concurrent actors driving requests. + #[arg(long, default_value_t = 16)] + actors: usize, + /// Operations per actor. + #[arg(long, default_value_t = 100)] + ops_per_actor: usize, + /// Workload mode. + #[arg(long, value_enum, default_value_t = Mode::Disjoint)] + mode: Mode, + /// Output file for the JSON results. Stdout always gets a copy. + #[arg(long)] + output: Option, + /// Optional label to record alongside results (e.g. "baseline-main"). + #[arg(long, default_value = "")] + label: String, +} + +#[derive(Clone, Copy, Debug, ValueEnum, Serialize)] +#[serde(rename_all = "kebab-case")] +enum Mode { + Disjoint, + SameKey, + Mixed, +} + +#[derive(Serialize, Debug)] +struct BenchResults { + label: String, + mode: Mode, + tables: usize, + actors: usize, + ops_per_actor: usize, + total_ops: usize, + error_count: usize, + wall_time_ms: u64, + throughput_ops_per_sec: f64, + p50_ms: f64, + p95_ms: f64, + p99_ms: f64, + p999_ms: f64, + max_ms: f64, + notes: &'static str, +} + +fn build_schema(num_tables: usize) -> String { + let mut schema = String::new(); + for i in 0..num_tables { + schema.push_str(&format!( + "node Item{i} {{\n name: String @key\n value: I32?\n}}\n\n" + )); + } + schema +} + +fn build_query_source(table_idx: usize) -> String { + format!( + "query insert_item($name: String, $value: I32) {{\n insert Item{table_idx} {{ name: $name, value: $value }}\n}}" + ) +} + +fn pick_table(actor_idx: usize, op_idx: usize, mode: Mode, num_tables: usize) -> usize { + match mode { + Mode::Disjoint => actor_idx % num_tables, + Mode::SameKey => 0, + Mode::Mixed => (actor_idx.wrapping_mul(7919) ^ op_idx) % num_tables, + } +} + +async fn drive_actor( + app: Router, + actor_idx: usize, + ops: usize, + mode: Mode, + num_tables: usize, +) -> (Vec, usize) { + let mut latencies = Vec::with_capacity(ops); + let mut errors = 0usize; + for op_idx in 0..ops { + let table_idx = pick_table(actor_idx, op_idx, mode, num_tables); + let request_body = ChangeRequest { + query_source: build_query_source(table_idx), + query_name: Some("insert_item".to_string()), + params: Some(serde_json::json!({ + "name": format!("a{actor_idx}_o{op_idx}"), + "value": op_idx as i32, + })), + branch: None, + }; + let body = serde_json::to_vec(&request_body).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/change") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + + let start = Instant::now(); + let response = match app.clone().oneshot(req).await { + Ok(r) => r, + Err(e) => { + eprintln!("actor {actor_idx} op {op_idx} transport error: {e:?}"); + errors += 1; + continue; + } + }; + let elapsed = start.elapsed(); + let status = response.status(); + if status != StatusCode::OK { + errors += 1; + // Drain body for logging on the first few failures. + if errors <= 3 { + let body = to_bytes(response.into_body(), 64 * 1024).await.unwrap_or_default(); + eprintln!( + "actor {actor_idx} op {op_idx} status {status} body {}", + String::from_utf8_lossy(&body) + ); + } + } + latencies.push(elapsed); + } + (latencies, errors) +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + if args.tables == 0 || args.actors == 0 || args.ops_per_actor == 0 { + eprintln!("--tables, --actors, --ops-per-actor must all be > 0"); + std::process::exit(2); + } + + let temp = tempfile::tempdir().expect("tempdir"); + let repo = temp.path().join("bench.omni"); + let schema = build_schema(args.tables); + Omnigraph::init(repo.to_str().unwrap(), &schema) + .await + .expect("init repo"); + + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .expect("open AppState"); + let app = build_app(state); + + eprintln!( + "running mode={:?} tables={} actors={} ops_per_actor={}", + args.mode, args.tables, args.actors, args.ops_per_actor + ); + + let start = Instant::now(); + let mut handles = Vec::with_capacity(args.actors); + for actor_idx in 0..args.actors { + let app = app.clone(); + let mode = args.mode; + let ops = args.ops_per_actor; + let num_tables = args.tables; + handles.push(tokio::spawn(async move { + drive_actor(app, actor_idx, ops, mode, num_tables).await + })); + } + + let mut all_latencies: Vec = Vec::with_capacity(args.actors * args.ops_per_actor); + let mut total_errors = 0usize; + for h in handles { + let (lats, errs) = h.await.expect("actor task panicked"); + all_latencies.extend(lats); + total_errors += errs; + } + let wall = start.elapsed(); + + all_latencies.sort(); + let n = all_latencies.len(); + let pct = |p: f64| -> f64 { + if n == 0 { + return 0.0; + } + let idx = ((n as f64 - 1.0) * p).round() as usize; + all_latencies[idx].as_secs_f64() * 1000.0 + }; + let max_ms = all_latencies + .last() + .map(|d| d.as_secs_f64() * 1000.0) + .unwrap_or(0.0); + let throughput = if wall.as_secs_f64() > 0.0 { + n as f64 / wall.as_secs_f64() + } else { + 0.0 + }; + + let results = BenchResults { + label: args.label.clone(), + mode: args.mode, + tables: args.tables, + actors: args.actors, + ops_per_actor: args.ops_per_actor, + total_ops: n, + error_count: total_errors, + wall_time_ms: wall.as_millis() as u64, + throughput_ops_per_sec: throughput, + p50_ms: pct(0.50), + p95_ms: pct(0.95), + p99_ms: pct(0.99), + p999_ms: pct(0.999), + max_ms, + notes: "MR-686 PR 0 baseline. Drives /change via Tower oneshot.", + }; + + let json = serde_json::to_string_pretty(&results).unwrap(); + println!("{json}"); + + if let Some(path) = args.output.as_ref() { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + { + std::fs::create_dir_all(parent).expect("mkdir output parent"); + } + std::fs::write(path, &json).expect("write output"); + eprintln!("wrote {}", path.display()); + } + + if total_errors > 0 { + eprintln!("WARN: {total_errors} requests failed"); + std::process::exit(1); + } +} diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 7a335fd..4f292d3 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -5,6 +5,7 @@ mod omnigraph; mod recovery_audit; mod run_registry; mod schema_state; +pub(crate) mod write_queue; pub use commit_graph::GraphCommit; pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId}; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index e54b6eb..b21bea9 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::io::Write; use std::sync::Arc; +use arc_swap::ArcSwap; use arrow_array::{ Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, @@ -76,9 +77,19 @@ pub struct Omnigraph { coordinator: GraphCoordinator, table_store: TableStore, runtime_cache: RuntimeCache, - catalog: Catalog, - schema_source: String, - pub(crate) audit_actor_id: Option, + /// Read-heavy on every query, written only by `apply_schema`. ArcSwap + /// gives atomic pointer swap with zero-cost reads (`load()` returns a + /// `Guard>`), so concurrent queries on different actors + /// don't contend on a lock to read the catalog. + catalog: Arc>, + /// Read-heavy on schema introspection paths, written only by + /// `apply_schema`. Same ArcSwap rationale as `catalog`. + schema_source: Arc>, + /// Per-`(table_key, branch)` writer queues. Reachable from engine + /// internals (mutation finalize, schema_apply, branch_merge, + /// ensure_indices, delete_where) and from future MR-870 recovery + /// reconciler. PR 1b adds the field; callers acquire in commits 4+. + write_queue: Arc, } /// Whether [`Omnigraph::open`] runs the open-time recovery sweep. @@ -131,9 +142,9 @@ impl Omnigraph { coordinator, table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), - catalog, - schema_source: schema_source.to_string(), - audit_actor_id: None, + catalog: Arc::new(ArcSwap::from_pointee(catalog)), + schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())), + write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), }) } @@ -217,18 +228,35 @@ impl Omnigraph { coordinator, table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), - catalog, - schema_source, - audit_actor_id: None, + catalog: Arc::new(ArcSwap::from_pointee(catalog)), + schema_source: Arc::new(ArcSwap::from_pointee(schema_source)), + write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), }) } - pub fn catalog(&self) -> &Catalog { - &self.catalog + /// Returns an `Arc` snapshot. Cheap clone of the current + /// catalog pointer; callers can hold the returned `Arc` across awaits + /// without blocking concurrent `apply_schema`. + pub fn catalog(&self) -> Arc { + self.catalog.load_full() } - pub fn schema_source(&self) -> &str { - &self.schema_source + /// Returns an `Arc` snapshot of the schema source. + pub fn schema_source(&self) -> Arc { + self.schema_source.load_full() + } + + /// Atomically swap the in-memory catalog. Concurrent readers see + /// either the old or the new pointer; never a torn state. Used by + /// `apply_schema` and `reload_schema_if_source_changed`. + pub(crate) fn store_catalog(&self, catalog: Catalog) { + self.catalog.store(Arc::new(catalog)); + } + + /// Atomically swap the in-memory schema source. Same rationale as + /// [`store_catalog`](Self::store_catalog). + pub(crate) fn store_schema_source(&self, schema_source: String) { + self.schema_source.store(Arc::new(schema_source)); } pub fn uri(&self) -> &str { @@ -278,6 +306,17 @@ impl Omnigraph { self.storage.as_ref() } + /// Per-`(table_key, branch)` writer queues. + /// + /// Engine-internal writers (mutation finalize, schema_apply, + /// branch_merge, ensure_indices, delete_where) and the future MR-870 + /// recovery reconciler reach the queue manager via this accessor. + /// Returns an `Arc` clone so callers can hold the manager across + /// `&mut self` engine API boundaries. + pub(crate) fn write_queue(&self) -> Arc { + Arc::clone(&self.write_queue) + } + /// Engine-level access to the repo's normalized root URI. Used by /// the recovery sidecar protocol to compute `__recovery/` paths. pub(crate) fn root_uri(&self) -> &str { @@ -433,7 +472,7 @@ impl Omnigraph { async fn reload_schema_if_source_changed(&mut self) -> Result<()> { let schema_path = schema_source_uri(&self.root_uri); let schema_source = self.storage.read_text(&schema_path).await?; - if schema_source == self.schema_source { + if schema_source == *self.schema_source.load_full() { return Ok(()); } let current_source_ir = read_schema_ir_from_source(&schema_source)?; @@ -447,8 +486,8 @@ impl Omnigraph { .await?; let mut catalog = build_catalog_from_ir(&accepted_ir)?; fixup_blob_schemas(&mut catalog); - self.schema_source = schema_source; - self.catalog = catalog; + self.store_schema_source(schema_source); + self.store_catalog(catalog); Ok(()) } @@ -658,8 +697,8 @@ impl Omnigraph { /// ``` pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result { self.ensure_schema_state_valid().await?; - let node_type = self - .catalog + let catalog = self.catalog(); + let node_type = catalog .node_types .get(type_name) .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?; @@ -801,10 +840,6 @@ impl Omnigraph { self.coordinator.branch_create(name).await } - pub(crate) fn current_audit_actor(&self) -> Option<&str> { - self.audit_actor_id.as_deref() - } - pub async fn branch_create_from( &mut self, from: impl Into, @@ -976,12 +1011,14 @@ impl Omnigraph { manifest_version: u64, parent_commit_id: &str, merged_parent_commit_id: &str, + actor_id: Option<&str>, ) -> Result { table_ops::record_merge_commit( self, manifest_version, parent_commit_id, merged_parent_commit_id, + actor_id, ) .await } @@ -991,12 +1028,14 @@ impl Omnigraph { branch: Option<&str>, updates: &[crate::db::SubTableUpdate], expected_table_versions: &std::collections::HashMap, + actor_id: Option<&str>, ) -> Result { table_ops::commit_updates_on_branch_with_expected( self, branch, updates, expected_table_versions, + actor_id, ) .await } diff --git a/crates/omnigraph/src/db/omnigraph/export.rs b/crates/omnigraph/src/db/omnigraph/export.rs index 7269278..ad5560e 100644 --- a/crates/omnigraph/src/db/omnigraph/export.rs +++ b/crates/omnigraph/src/db/omnigraph/export.rs @@ -142,7 +142,8 @@ async fn export_table_to_writer( .open_snapshot_table(snapshot, table_key) .await?; let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]); - let blob_properties = blob_properties_for_table_key(db.catalog(), table_key)?; + let catalog = db.catalog(); + let blob_properties = blob_properties_for_table_key(&catalog, table_key)?; if blob_properties.is_empty() { for batch in db.table_store.scan(&ds, None, None, ordering).await? { @@ -207,9 +208,9 @@ fn write_export_rows_from_batch( blob_values: Option<&HashMap>>>, writer: &mut W, ) -> Result<()> { + let catalog = db.catalog(); if let Some(type_name) = table_key.strip_prefix("node:") { - let node_type = db - .catalog + let node_type = catalog .node_types .get(type_name) .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?; @@ -243,8 +244,7 @@ fn write_export_rows_from_batch( } if let Some(edge_name) = table_key.strip_prefix("edge:") { - let edge_type = db - .catalog + let edge_type = catalog .edge_types .get(edge_name) .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?; diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 21050c1..d70803e 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -81,7 +81,7 @@ pub async fn optimize_all_tables(db: &mut Omnigraph) -> Result = all_table_keys(&db.catalog) + let table_tasks: Vec<_> = all_table_keys(&db.catalog()) .into_iter() .filter_map(|table_key| { let entry = snapshot.entry(&table_key)?; @@ -144,7 +144,7 @@ pub async fn cleanup_all_tables( let resolved = db.resolved_branch_target(None).await?; let snapshot = resolved.snapshot; - let table_tasks: Vec<_> = all_table_keys(&db.catalog) + let table_tasks: Vec<_> = all_table_keys(&db.catalog()) .into_iter() .filter_map(|table_key| { let entry = snapshot.entry(&table_key)?; diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index e35258c..ad6aadc 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -209,6 +209,26 @@ pub(super) async fn apply_schema_with_lock( }); } + // Acquire per-(table_key, branch) queues for every existing table + // that schema_apply will rewrite or re-index. New tables (added or + // renamed targets) aren't acquired — they have no existing dataset + // to race against. Held across the per-table commit loop and the + // manifest publish via `commit_changes_with_actor` below. + // + // Schema-apply already holds the graph-wide `__schema_apply_lock__` + // sentinel branch, so under PR 1b's intermediate state these + // per-table acquisitions are uncontended. They exist for symmetry + // with future MR-870 recovery, which will need queue acquisition + // before any `Dataset::restore` it issues for SchemaApply sidecars. + let schema_apply_queue_keys: Vec<(String, Option)> = recovery_pins + .iter() + .map(|pin| (pin.table_key.clone(), pin.table_branch.clone())) + .collect(); + let _schema_apply_queue_guards = db + .write_queue() + .acquire_many(&schema_apply_queue_keys) + .await; + let recovery_handle = if recovery_pins.is_empty() && sidecar_registrations.is_empty() && sidecar_tombstones.is_empty() @@ -225,7 +245,10 @@ pub(super) async fn apply_schema_with_lock( let mut sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::SchemaApply, None, - db.audit_actor_id.clone(), + // `apply_schema` doesn't currently take an actor (no `apply_schema_as` + // public API). The HTTP server's /schema/apply handler can pass actor + // context through a follow-up addition. For now, system-attributed. + None, recovery_pins, ); sidecar.additional_registrations = sidecar_registrations; @@ -266,11 +289,12 @@ pub(super) async fn apply_schema_with_lock( })?; ensure_snapshot_entry_head_matches(db, source_entry).await?; let source_ds = snapshot.open(source_table_key).await?; + let current_catalog = db.catalog(); let batch = batch_for_schema_apply_rewrite( db, &source_ds, source_table_key, - &db.catalog, + ¤t_catalog, target_table_key, &desired_catalog, property_renames.get(target_table_key), @@ -311,11 +335,12 @@ pub(super) async fn apply_schema_with_lock( })?; ensure_snapshot_entry_head_matches(db, entry).await?; let source_ds = snapshot.open(table_key).await?; + let current_catalog = db.catalog(); let batch = batch_for_schema_apply_rewrite( db, &source_ds, table_key, - &db.catalog, + ¤t_catalog, table_key, &desired_catalog, property_renames.get(table_key), @@ -444,13 +469,13 @@ pub(super) async fn apply_schema_with_lock( crate::failpoints::maybe_fail("schema_apply.after_staging_write")?; - let actor_id = db.current_audit_actor().map(str::to_string); + // `apply_schema` doesn't currently take an actor; system-attributed. let PublishedSnapshot { manifest_version, _snapshot_id: _, } = db .coordinator - .commit_changes_with_actor(&manifest_changes, actor_id.as_deref()) + .commit_changes_with_actor(&manifest_changes, None) .await?; crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?; @@ -471,8 +496,8 @@ pub(super) async fn apply_schema_with_lock( ) .await?; - db.catalog = desired_catalog; - db.schema_source = desired_schema_source.to_string(); + db.store_catalog(desired_catalog); + db.store_schema_source(desired_schema_source.to_string()); db.coordinator.refresh().await?; db.runtime_cache.invalidate_all().await; if changed_edge_tables { diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 1e48d03..57549d1 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -11,14 +11,16 @@ pub(super) async fn graph_index(db: &Omnigraph) -> Result Result> { - db.runtime_cache.graph_index(resolved, &db.catalog).await + let catalog = db.catalog(); + db.runtime_cache.graph_index(resolved, &catalog).await } pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> { @@ -58,8 +60,14 @@ pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test( }; let mut expected = std::collections::HashMap::new(); expected.insert(table_key.to_string(), entry.table_version); - commit_prepared_updates_on_branch_with_expected(db, branch.as_deref(), &[update], &expected) - .await + commit_prepared_updates_on_branch_with_expected( + db, + branch.as_deref(), + &[update], + &expected, + None, + ) + .await } pub(super) async fn ensure_indices_for_branch( @@ -72,6 +80,7 @@ pub(super) async fn ensure_indices_for_branch( let snapshot = resolved.snapshot; let mut updates = Vec::new(); let active_branch = resolved.branch; + let catalog = db.catalog(); // Recovery sidecar: protect the per-table commit_staged loop in // build_indices_on_dataset (one commit per index built). Only pins @@ -83,7 +92,7 @@ pub(super) async fn ensure_indices_for_branch( // committed work on sibling tables. Steady-state runs (everything // already indexed) skip the sidecar entirely. let mut recovery_pins: Vec = Vec::new(); - for type_name in db.catalog.node_types.keys() { + for type_name in catalog.node_types.keys() { let table_key = format!("node:{}", type_name); let Some(entry) = snapshot.entry(&table_key) else { continue; @@ -122,7 +131,7 @@ pub(super) async fn ensure_indices_for_branch( }); } } - for edge_name in db.catalog.edge_types.keys() { + for edge_name in catalog.edge_types.keys() { let table_key = format!("edge:{}", edge_name); let Some(entry) = snapshot.entry(&table_key) else { continue; @@ -147,13 +156,28 @@ pub(super) async fn ensure_indices_for_branch( }); } } + // Acquire per-(table_key, active_branch) queues for every table + // that needs index work. Held across the per-table commit loop and + // the manifest publish at the end of this function. Sorted-order + // acquisition prevents lock-order inversion against concurrent + // multi-table writers (mutation finalize, branch_merge, future + // MR-870 recovery). Under PR 1b's intermediate state (global server + // RwLock still in place), this acquisition is uncontended. + let queue_keys: Vec<(String, Option)> = recovery_pins + .iter() + .map(|pin| (pin.table_key.clone(), pin.table_branch.clone())) + .collect(); + let _queue_guards = db.write_queue().acquire_many(&queue_keys).await; + let recovery_handle = if recovery_pins.is_empty() { None } else { let sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::EnsureIndices, active_branch.clone(), - db.audit_actor_id.clone(), + // `ensure_indices` doesn't currently take an actor; system-attributed. + // Future: add `ensure_indices_as` to thread actor context. + None, recovery_pins, ); Some( @@ -162,7 +186,7 @@ pub(super) async fn ensure_indices_for_branch( ) }; - for type_name in db.catalog.node_types.keys() { + for type_name in catalog.node_types.keys() { let table_key = format!("node:{}", type_name); let Some(entry) = snapshot.entry(&table_key) else { continue; @@ -209,7 +233,7 @@ pub(super) async fn ensure_indices_for_branch( } } - for edge_name in db.catalog.edge_types.keys() { + for edge_name in catalog.edge_types.keys() { let table_key = format!("edge:{}", edge_name); let Some(entry) = snapshot.entry(&table_key) else { continue; @@ -264,7 +288,7 @@ pub(super) async fn ensure_indices_for_branch( crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?; if !updates.is_empty() { - commit_prepared_updates_on_branch(db, branch, &updates).await?; + commit_prepared_updates_on_branch(db, branch, &updates, None).await?; } // Recovery sidecar lifecycle: delete after the manifest publish (or @@ -321,7 +345,8 @@ async fn needs_index_work_node( if !db.table_store.has_btree_index(&ds, "id").await? { return Ok(true); } - let Some(node_type) = db.catalog.node_types.get(type_name) else { + let catalog = db.catalog(); + let Some(node_type) = catalog.node_types.get(type_name) else { return Ok(false); }; for index_cols in &node_type.indices { @@ -505,7 +530,8 @@ pub(super) async fn build_indices_on_dataset( table_key: &str, ds: &mut Dataset, ) -> Result<()> { - build_indices_on_dataset_for_catalog(db, &db.catalog, table_key, ds).await + let catalog = db.catalog(); + build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await } pub(super) async fn build_indices_on_dataset_for_catalog( @@ -704,14 +730,14 @@ async fn prepare_updates_for_commit( async fn commit_prepared_updates( db: &mut Omnigraph, updates: &[crate::db::SubTableUpdate], + actor_id: Option<&str>, ) -> Result { - let actor_id = db.current_audit_actor().map(str::to_string); let PublishedSnapshot { manifest_version, _snapshot_id: _, } = db .coordinator - .commit_updates_with_actor(updates, actor_id.as_deref()) + .commit_updates_with_actor(updates, actor_id) .await?; Ok(manifest_version) } @@ -720,18 +746,14 @@ async fn commit_prepared_updates_with_expected( db: &mut Omnigraph, updates: &[crate::db::SubTableUpdate], expected_table_versions: &std::collections::HashMap, + actor_id: Option<&str>, ) -> Result { - let actor_id = db.current_audit_actor().map(str::to_string); let PublishedSnapshot { manifest_version, _snapshot_id: _, } = db .coordinator - .commit_updates_with_actor_with_expected( - updates, - expected_table_versions, - actor_id.as_deref(), - ) + .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id) .await?; Ok(manifest_version) } @@ -740,11 +762,12 @@ pub(super) async fn commit_prepared_updates_on_branch( db: &mut Omnigraph, branch: Option<&str>, updates: &[crate::db::SubTableUpdate], + actor_id: Option<&str>, ) -> Result { let current_branch = db.coordinator.current_branch().map(str::to_string); let requested_branch = branch.map(str::to_string); if requested_branch == current_branch { - return commit_prepared_updates(db, updates).await; + return commit_prepared_updates(db, updates, actor_id).await; } let mut coordinator = match requested_branch.as_deref() { @@ -753,12 +776,11 @@ pub(super) async fn commit_prepared_updates_on_branch( } None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?, }; - let actor_id = db.current_audit_actor().map(str::to_string); let PublishedSnapshot { manifest_version, _snapshot_id: _, } = coordinator - .commit_updates_with_actor(updates, actor_id.as_deref()) + .commit_updates_with_actor(updates, actor_id) .await?; Ok(manifest_version) } @@ -768,11 +790,18 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected( branch: Option<&str>, updates: &[crate::db::SubTableUpdate], expected_table_versions: &std::collections::HashMap, + actor_id: Option<&str>, ) -> Result { let current_branch = db.coordinator.current_branch().map(str::to_string); let requested_branch = branch.map(str::to_string); if requested_branch == current_branch { - return commit_prepared_updates_with_expected(db, updates, expected_table_versions).await; + return commit_prepared_updates_with_expected( + db, + updates, + expected_table_versions, + actor_id, + ) + .await; } let mut coordinator = match requested_branch.as_deref() { @@ -781,16 +810,11 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected( } None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?, }; - let actor_id = db.current_audit_actor().map(str::to_string); let PublishedSnapshot { manifest_version, _snapshot_id: _, } = coordinator - .commit_updates_with_actor_with_expected( - updates, - expected_table_versions, - actor_id.as_deref(), - ) + .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id) .await?; Ok(manifest_version) } @@ -805,7 +829,7 @@ pub(super) async fn commit_updates( db.ensure_schema_apply_not_locked("write commit").await?; let current_branch = db.coordinator.current_branch().map(str::to_string); let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?; - commit_prepared_updates(db, &prepared).await + commit_prepared_updates(db, &prepared, None).await } pub(super) async fn commit_manifest_updates( @@ -820,14 +844,14 @@ pub(super) async fn record_merge_commit( manifest_version: u64, parent_commit_id: &str, merged_parent_commit_id: &str, + actor_id: Option<&str>, ) -> Result { - let actor_id = db.current_audit_actor().map(str::to_string); db.coordinator .record_merge_commit( manifest_version, parent_commit_id, merged_parent_commit_id, - actor_id.as_deref(), + actor_id, ) .await .map(|snapshot_id| snapshot_id.as_str().to_string()) @@ -841,11 +865,18 @@ pub(super) async fn commit_updates_on_branch_with_expected( branch: Option<&str>, updates: &[crate::db::SubTableUpdate], expected_table_versions: &std::collections::HashMap, + actor_id: Option<&str>, ) -> Result { db.ensure_schema_apply_not_locked("write commit").await?; let prepared = prepare_updates_for_commit(db, branch, updates).await?; - commit_prepared_updates_on_branch_with_expected(db, branch, &prepared, expected_table_versions) - .await + commit_prepared_updates_on_branch_with_expected( + db, + branch, + &prepared, + expected_table_versions, + actor_id, + ) + .await } pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> { diff --git a/crates/omnigraph/src/db/write_queue.rs b/crates/omnigraph/src/db/write_queue.rs new file mode 100644 index 0000000..bb03022 --- /dev/null +++ b/crates/omnigraph/src/db/write_queue.rs @@ -0,0 +1,231 @@ +//! Per-`(table_key, branch)` writer queues — MR-686 scaffolding. +//! +//! Today every server-layer write serializes on the global +//! `Arc>` in `AppState`. MR-686 replaces that with +//! per-`(table_key, branch_ref)` queues so disjoint-key writes proceed +//! concurrently. This module owns the queue data structure; callers in +//! `MutationStaging::commit_all`, `branch_merge`, `schema_apply`, +//! `ensure_indices`, `delete_where`, and the future MR-870 recovery +//! reconciler acquire guards before any per-table Lance commit. +//! +//! ## Why exclusive `tokio::sync::Mutex<()>` per key +//! +//! Lance's `Dataset::restore` "wins" against concurrent Append/Update/ +//! Delete/CreateIndex/Merge per `check_restore_txn`, silently orphaning +//! the concurrent writer's commit. The queue's *only* application-layer +//! job is to serialize Restore against every other writer on the same +//! `(table_key, branch_ref)`. Lance OCC handles the rest of the conflict +//! matrix (Append vs Append fully compatible, Update vs Update rebases or +//! retries, etc.) but cannot make Restore symmetric — that's an upstream +//! design choice. Until Lance fixes Restore (or BatchCommitTables +//! changes the protocol), every writer takes the same exclusive lock. +//! +//! `RwLock` (shared for normal writes, exclusive for Restore) is the +//! natural follow-up but adds a writer-classification surface that's +//! easy to get wrong; misclassifying any writer reintroduces the +//! orphaning hazard. We start with `Mutex` and revisit based on +//! production telemetry. +//! +//! ## Sorted-order acquisition +//! +//! `acquire_many` accepts a slice of keys and acquires them in +//! lexicographic order. Multi-table writers (mutation finalize, +//! branch_merge, future recovery reconciler) MUST go through +//! `acquire_many` so all callers agree on acquisition order — this is +//! how lock-order inversion deadlock is prevented. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard}; + +/// Queue key: `(table_key, branch_ref)`. `branch_ref = None` means main. +/// +/// Branch is part of the key because the same Lance dataset can be +/// pinned at different versions on different branches; concurrent +/// writes to the same `table_key` on disjoint branches must NOT +/// serialize at the queue. +pub(crate) type TableQueueKey = (String, Option); + +/// Per-`(table_key, branch)` writer queue manager. +/// +/// Lives on `Omnigraph` as `Arc` so HTTP handlers, +/// engine internals, the CLI binary, and future background reconcilers +/// (MR-870 recovery, MR-848 index) all reach it via the engine handle. +#[derive(Default)] +pub(crate) struct WriteQueueManager { + /// Held only briefly per `acquire` call: clone out the per-key Arc, + /// release the std mutex, then await the per-key tokio Mutex. + queues: Mutex>>>, +} + +impl WriteQueueManager { + pub(crate) fn new() -> Self { + Self::default() + } + + /// Get-or-create the per-key queue and clone its Arc. + fn slot(&self, key: &TableQueueKey) -> Arc> { + let mut map = self.queues.lock().expect("write queue map poisoned"); + if let Some(existing) = map.get(key) { + return Arc::clone(existing); + } + let fresh = Arc::new(AsyncMutex::new(())); + map.insert(key.clone(), Arc::clone(&fresh)); + fresh + } + + /// Acquire exclusive access to the queue for one `(table_key, branch)`. + /// + /// Blocks until the lock is available. Drop the returned guard to + /// release; the lock outlives the `WriteQueueManager` borrow. + pub(crate) async fn acquire(&self, key: &TableQueueKey) -> OwnedMutexGuard<()> { + self.slot(key).lock_owned().await + } + + /// Acquire exclusive access to many `(table_key, branch)` keys + /// atomically, in lex-sorted order. Used by multi-table writers + /// (mutation finalize, branch_merge, recovery) so all callers + /// agree on acquisition order — prevents lock-order inversion. + /// + /// Empty input returns an empty Vec without touching the map. + /// Duplicates in `keys` are deduped before acquisition (the same + /// key acquired twice would deadlock against itself). + pub(crate) async fn acquire_many( + &self, + keys: &[TableQueueKey], + ) -> Vec> { + if keys.is_empty() { + return Vec::new(); + } + let mut sorted: Vec = keys.to_vec(); + sorted.sort(); + sorted.dedup(); + let mut guards = Vec::with_capacity(sorted.len()); + for key in &sorted { + guards.push(self.acquire(key).await); + } + guards + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::{Duration, Instant}; + use tokio::time::timeout; + + fn key(table: &str, branch: Option<&str>) -> TableQueueKey { + (table.to_string(), branch.map(str::to_string)) + } + + #[tokio::test] + async fn acquire_many_empty_returns_empty() { + let qm = WriteQueueManager::new(); + let guards = qm.acquire_many(&[]).await; + assert!(guards.is_empty()); + } + + #[tokio::test] + async fn acquire_many_dedupes_repeated_keys() { + // Same key passed twice would deadlock if not deduped. + let qm = WriteQueueManager::new(); + let k = key("t1", None); + let guards = timeout( + Duration::from_secs(2), + qm.acquire_many(&[k.clone(), k.clone(), k]), + ) + .await + .expect("acquire_many with duplicates deadlocked"); + assert_eq!(guards.len(), 1); + } + + #[tokio::test] + async fn acquire_many_sorts_keys_deterministically() { + // Two callers passing keys in different orders must acquire in + // the same internal order. We test this indirectly: caller A + // passes [a, c] and caller B passes [c, a]; if they both + // acquire in sorted order the second caller blocks on `a` first, + // not `c` — same as A — so no deadlock under any interleaving. + // Direct sort observation: call acquire_many with a reversed + // input and verify it doesn't deadlock against a held guard on + // the sorted-first key. + let qm = Arc::new(WriteQueueManager::new()); + let a = key("a", None); + let z = key("z", None); + + // Hold `a` exclusively. + let _held = qm.acquire(&a).await; + + // acquire_many([z, a]) — must sort to [a, z] internally and + // block on `a`. With a 200ms timeout we should NOT see it + // complete (it's blocked on `a`). + let qm2 = Arc::clone(&qm); + let z_clone = z.clone(); + let a_clone = a.clone(); + let result = timeout(Duration::from_millis(200), async move { + qm2.acquire_many(&[z_clone, a_clone]).await + }) + .await; + assert!(result.is_err(), "acquire_many should block on `a`, the lex-first key"); + } + + #[tokio::test] + async fn same_key_acquire_serializes() { + let qm = Arc::new(WriteQueueManager::new()); + let k = key("t1", None); + + let first = qm.acquire(&k).await; + + // Second acquire on same key should NOT complete within 200ms. + let qm2 = Arc::clone(&qm); + let k2 = k.clone(); + let blocked = timeout(Duration::from_millis(200), async move { + qm2.acquire(&k2).await + }) + .await; + assert!(blocked.is_err(), "second acquire on same key must block"); + + // Drop the first guard, then second acquire should succeed. + drop(first); + let _second = timeout(Duration::from_secs(2), qm.acquire(&k)) + .await + .expect("second acquire after release should not block"); + } + + #[tokio::test] + async fn disjoint_keys_acquire_concurrently() { + let qm = Arc::new(WriteQueueManager::new()); + let a = key("a", None); + let b = key("b", None); + + // Hold `a` indefinitely. + let _held_a = qm.acquire(&a).await; + + // Acquire `b` on a different task. Should complete promptly + // because `b` is disjoint from `a`. + let qm2 = Arc::clone(&qm); + let start = Instant::now(); + let _held_b = timeout(Duration::from_secs(2), qm2.acquire(&b)) + .await + .expect("disjoint key acquire must not block on unrelated held key"); + assert!( + start.elapsed() < Duration::from_millis(500), + "disjoint acquire took {:?}, should be near-instant", + start.elapsed() + ); + } + + #[tokio::test] + async fn disjoint_branches_on_same_table_do_not_serialize() { + // (table, main) and (table, feature) are different keys. + let qm = Arc::new(WriteQueueManager::new()); + let main_k = key("t1", None); + let feature_k = key("t1", Some("feature")); + + let _held_main = qm.acquire(&main_k).await; + let _held_feature = timeout(Duration::from_secs(2), qm.acquire(&feature_k)) + .await + .expect("same-table-different-branch should not serialize"); + } +} diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index b466663..f2284f0 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1018,17 +1018,14 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { self.ensure_schema_apply_idle("branch_merge").await?; - let previous_actor = self.audit_actor_id.clone(); - self.audit_actor_id = actor_id.map(str::to_string); - let result = self.branch_merge_impl(source, target).await; - self.audit_actor_id = previous_actor; - result + self.branch_merge_impl(source, target, actor_id).await } async fn branch_merge_impl( &mut self, source: &str, target: &str, + actor_id: Option<&str>, ) -> Result { if is_internal_run_branch(source) || is_internal_run_branch(target) { return Err(OmniError::manifest(format!( @@ -1090,6 +1087,7 @@ impl Omnigraph { &target_head_commit_id, &source_head_commit_id, is_fast_forward, + actor_id, ) .await; self.restore_coordinator(previous); @@ -1108,6 +1106,7 @@ impl Omnigraph { target_head_commit_id: &str, source_head_commit_id: &str, is_fast_forward: bool, + actor_id: Option<&str>, ) -> Result { self.ensure_commit_graph_initialized().await?; let target_snapshot = self.snapshot(); @@ -1146,7 +1145,7 @@ impl Omnigraph { if let Some(staged) = stage_streaming_table_merge( table_key, - self.catalog(), + &self.catalog(), base_snapshot, source_snapshot, &target_snapshot, @@ -1193,6 +1192,29 @@ impl Omnigraph { // requires pre-computing source deltas during candidate // classification (a structural change to `CandidateTableState`) // and is left as follow-up work. + // Acquire per-(table_key, target_branch) queues for every table + // touched by the merge plan. Sorted-order acquisition prevents + // lock-order inversion against concurrent multi-table writers. + // The active branch (set by the caller's `swap_coordinator_for_branch`) + // is the merge target; queue keys are scoped to it because a + // branch_merge writes only to the target branch. + // + // Held across the per-table publish loop and the manifest + // commit + record_merge_commit calls below. Under PR 1b's + // intermediate state (global server RwLock still in place), + // this acquisition is uncontended. + let merge_queue_keys: Vec<(String, Option)> = ordered_table_keys + .iter() + .filter(|table_key| { + matches!( + candidates.get(*table_key), + Some(CandidateTableState::RewriteMerged(_)) | Some(CandidateTableState::AdoptSourceState) + ) + }) + .map(|table_key| (table_key.clone(), self.active_branch().map(str::to_string))) + .collect(); + let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await; + let recovery_pins: Vec = ordered_table_keys .iter() .filter_map(|table_key| { @@ -1238,7 +1260,7 @@ impl Omnigraph { let mut sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::BranchMerge, target_branch, - self.audit_actor_id.clone(), + actor_id.map(str::to_string), recovery_pins, ); // Carry the source branch's HEAD commit id so the recovery @@ -1267,7 +1289,7 @@ impl Omnigraph { CandidateTableState::AdoptSourceState => { publish_adopted_source_state( self, - self.catalog(), + &self.catalog(), base_snapshot, source_snapshot, &target_snapshot, @@ -1315,6 +1337,7 @@ impl Omnigraph { manifest_version, target_head_commit_id, source_head_commit_id, + actor_id, ) .await?; diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index c6d2737..121467a 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -345,8 +345,8 @@ async fn validate_edge_insert_endpoints( edge_name: &str, assignments: &HashMap, ) -> Result<()> { - let edge_type = db - .catalog() + let catalog = db.catalog(); + let edge_type = catalog .edge_types .get(edge_name) .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?; @@ -688,13 +688,8 @@ impl Omnigraph { params: &ParamMap, actor_id: Option<&str>, ) -> Result { - let previous_actor = self.audit_actor_id.clone(); - self.audit_actor_id = actor_id.map(str::to_string); - let result = self - .mutate_with_current_actor(branch, query_source, query_name, params) - .await; - self.audit_actor_id = previous_actor; - result + self.mutate_with_current_actor(branch, query_source, query_name, params, actor_id) + .await } async fn mutate_with_current_actor( @@ -703,6 +698,7 @@ impl Omnigraph { query_source: &str, query_name: &str, params: &ParamMap, + actor_id: Option<&str>, ) -> Result { self.ensure_schema_state_valid().await?; let requested = Self::normalize_branch_name(branch)?; @@ -737,11 +733,19 @@ impl Omnigraph { Err(e) => Err(e), Ok(total) if staging.is_empty() => Ok(total), Ok(total) => { - let (updates, expected_versions, sidecar_handle) = staging - .finalize( + let staged = staging.stage_all(self, requested.as_deref()).await?; + // `_queue_guards` holds per-(table_key, branch) write + // queues acquired inside `commit_all`. Held across the + // manifest publish below so no concurrent writer can + // interleave between our commit_staged and our publish + // (which would correctly fail our CAS but leave Lance + // HEAD advanced — the residual class MR-870 recovers). + let (updates, expected_versions, sidecar_handle, _queue_guards) = staged + .commit_all( self, requested.as_deref(), crate::db::manifest::SidecarKind::Mutation, + actor_id, ) .await?; // Failpoint that wedges the documented finalize→publisher @@ -759,6 +763,7 @@ impl Omnigraph { requested.as_deref(), &updates, &expected_versions, + actor_id, ) .await?; // Phase C succeeded — sidecar can be deleted. If this @@ -804,7 +809,7 @@ impl Omnigraph { let query_decl = omnigraph_compiler::find_named_query(query_source, query_name) .map_err(|e| OmniError::manifest(e.to_string()))?; - let checked = typecheck_query_decl(self.catalog(), &query_decl)?; + let checked = typecheck_query_decl(&self.catalog(), &query_decl)?; match checked { CheckedQuery::Mutation(_) => {} CheckedQuery::Read(_) => { diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 30bd7ad..88865d8 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -13,11 +13,12 @@ impl Omnigraph { ) -> Result { self.ensure_schema_state_valid().await?; let resolved = self.resolved_target(target).await?; + let catalog = self.catalog(); let query_decl = omnigraph_compiler::find_named_query(query_source, query_name) .map_err(|e| OmniError::manifest(e.to_string()))?; - let type_ctx = typecheck_query(self.catalog(), &query_decl)?; - let ir = lower_query(self.catalog(), &query_decl, &type_ctx)?; + let type_ctx = typecheck_query(&catalog, &query_decl)?; + let ir = lower_query(&catalog, &query_decl, &type_ctx)?; let needs_graph = ir .pipeline @@ -34,7 +35,7 @@ impl Omnigraph { params, &resolved.snapshot, graph_index.as_deref(), - self.catalog(), + &catalog, ) .await } @@ -52,19 +53,19 @@ impl Omnigraph { ) -> Result { self.ensure_schema_state_valid().await?; let snapshot = self.snapshot_at_version(version).await?; + let catalog = self.catalog(); let query_decl = omnigraph_compiler::find_named_query(query_source, query_name) .map_err(|e| OmniError::manifest(e.to_string()))?; - let type_ctx = typecheck_query(self.catalog(), &query_decl)?; - let ir = lower_query(self.catalog(), &query_decl, &type_ctx)?; + let type_ctx = typecheck_query(&catalog, &query_decl)?; + let ir = lower_query(&catalog, &query_decl, &type_ctx)?; let needs_graph = ir .pipeline .iter() .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); let graph_index = if needs_graph { - let edge_types = self - .catalog() + let edge_types = catalog .edge_types .iter() .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) @@ -79,7 +80,7 @@ impl Omnigraph { params, &snapshot, graph_index.as_deref(), - self.catalog(), + &catalog, ) .await } diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 47433be..fd43ea0 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -210,24 +210,21 @@ impl MutationStaging { } /// End-of-query: for each pending table, concat batches and commit via - /// `stage_append` or `stage_merge_insert` followed by `commit_staged`. - /// Merge with inline-committed entries. Return `(updates, - /// expected_versions)` for `commit_updates_on_branch_with_expected`. + /// **Phase A** of the two-phase commit: stage uncommitted fragments + /// for every table in `pending`. No Lance HEAD movement, no sidecar, + /// no manifest publish. Returns a [`StagedMutation`] carrying the + /// staged transactions so a future MR-686 queue acquisition step can + /// run between staging (slow S3 PUTs, no queue) and commit (fast, + /// under per-`(table_key, branch)` queue). /// - /// Sequential per-table — no cross-table dependency, but a parallel - /// version is a perf optimization for multi-table writes (loader with - /// many node + edge types). v1 ships sequential; the fan-out can land - /// in a follow-up. - pub(crate) async fn finalize( + /// Sequential per-table for now — parallelizing across independent + /// Lance datasets is a perf follow-up; same loop structure as the + /// pre-split `finalize`. + pub(crate) async fn stage_all( self, db: &crate::db::Omnigraph, - branch: Option<&str>, - sidecar_kind: SidecarKind, - ) -> Result<( - Vec, - HashMap, - Option, - )> { + _branch: Option<&str>, + ) -> Result { let MutationStaging { expected_versions, paths, @@ -235,63 +232,17 @@ impl MutationStaging { inline_committed, } = self; - let mut updates: Vec = - inline_committed.into_values().collect(); - - // Sidecar protocol: build the per-table pin list BEFORE any Lance - // commit_staged runs, then write the sidecar so a crash between - // Phase B (this loop's commit_staged calls) and Phase C (the - // manifest publish in the caller) is recoverable on next open. - // Skipped when `pending` is empty (delete-only mutation; the D₂ - // parse-time rule keeps deletes out of this code path so this - // branch is reached only for the inline-committed-only case). - let pins: Vec = pending - .iter() - .map(|(table_key, _)| { - let path = paths.get(table_key).ok_or_else(|| { - OmniError::manifest_internal(format!( - "MutationStaging::finalize: missing path for table '{}'", - table_key, - )) - })?; - let expected = *expected_versions.get(table_key).ok_or_else(|| { - OmniError::manifest_internal(format!( - "MutationStaging::finalize: missing expected version for table '{}'", - table_key, - )) - })?; - Ok::(SidecarTablePin { - table_key: table_key.clone(), - table_path: path.full_path.clone(), - expected_version: expected, - post_commit_pin: expected + 1, - table_branch: path.table_branch.clone(), - }) - }) - .collect::>>()?; - - let sidecar_handle = if pins.is_empty() { - None - } else { - let sidecar = new_sidecar( - sidecar_kind, - branch.map(|s| s.to_string()), - db.audit_actor_id.clone(), - pins, - ); - Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?) - }; - + let mut staged_entries: Vec = Vec::with_capacity(pending.len()); for (table_key, table) in pending { - let path = paths.get(&table_key).ok_or_else(|| { + let path = paths.get(&table_key).cloned().ok_or_else(|| { OmniError::manifest_internal(format!( - "MutationStaging::finalize: missing path for table '{}'", + "MutationStaging::stage_all: missing path for table '{}'", table_key )) })?; let expected = *expected_versions.get(&table_key).ok_or_else(|| { OmniError::manifest_internal(format!( - "MutationStaging::finalize: missing expected version for table '{}'", + "MutationStaging::stage_all: missing expected version for table '{}'", table_key )) })?; @@ -335,8 +286,8 @@ impl MutationStaging { } }; - // Commit via Lance's two-phase write: stage produces - // uncommitted fragments + transaction; commit advances HEAD. + // Stage produces uncommitted fragments + transaction. No + // Lance HEAD advance until `commit_all` runs `commit_staged`. let staged = match table.mode { PendingMode::Append => { db.table_store().stage_append(&ds, combined, &[]).await? @@ -353,16 +304,286 @@ impl MutationStaging { .await? } }; + staged_entries.push(StagedTableEntry { + table_key, + path, + expected_version: expected, + dataset: ds, + staged_write: staged, + }); + } + + Ok(StagedMutation { + inline_committed, + staged: staged_entries, + expected_versions, + paths, + }) + } +} + +/// Output of [`MutationStaging::stage_all`]. Carries the staged Lance +/// transactions (Phase A complete; uncommitted fragments written) plus +/// the per-table metadata needed to write the recovery sidecar, run +/// `commit_staged` (Phase B), and produce the publisher's input. +/// +/// Splitting `stage_all` and `commit_all` is the structural prerequisite +/// for MR-686: a future commit can drop queue acquisition + manifest-pin +/// revalidation between Phase A and Phase B without touching staging +/// logic. +pub(crate) struct StagedMutation { + /// Updates from delete-touching ops (D₂ parse-time rule keeps + /// pending and inline_committed disjoint per table). Tables here + /// have already advanced Lance HEAD via inline `delete_where`; + /// `commit_all` builds sidecar pins for these too so the + /// commit→publish residual is recoverable for delete-only paths + /// (third-agent Finding 3). + inline_committed: HashMap, + /// One entry per table that had pending batches successfully staged. + staged: Vec, + /// Pre-write manifest version per table — the publisher's CAS fence. + expected_versions: HashMap, + /// Per-table identifiers from `MutationStaging::paths`. Carried + /// through so `commit_all` can build sidecar pins for both staged + /// and inline-committed tables. + paths: HashMap, +} + +/// Per-table state captured during `stage_all` and consumed by +/// `commit_all`. Holds the opened `Dataset` so `commit_staged` doesn't +/// re-open, and the `StagedWrite` whose `transaction` `commit_staged` +/// will execute. +struct StagedTableEntry { + table_key: String, + path: StagedTablePath, + expected_version: u64, + dataset: lance::Dataset, + staged_write: crate::table_store::StagedWrite, +} + +impl StagedMutation { + /// **Phase B** of the two-phase commit: acquire per-`(table_key, + /// branch)` queues, revalidate manifest pins, write the recovery + /// sidecar, run `commit_staged` per table to advance Lance HEAD, and + /// return the publisher's input plus the queue guards. + /// + /// **Caller must hold the returned `_guards` Vec across the + /// subsequent manifest publish.** Releasing guards before publish + /// would let another writer interleave their commit_staged between + /// ours and our publish — which would correctly fail our CAS but + /// leave Lance HEAD advanced (the residual class MR-870 recovers + /// from). Holding the guards across publish keeps the residual + /// unreachable for op-execution failures on the happy path. + /// + /// Revalidation: between `stage_all` and `commit_all`, another + /// writer (in the same process or another process sharing the + /// repo) may have committed to one of our touched tables, advancing + /// the manifest pin past our `expected_version`. We revalidate + /// under the queue and fail-fast with `manifest_conflict` before + /// any `commit_staged` so the orphaned uncommitted fragments stay + /// unreferenced (cleaned by `cleanup_old_versions`'s age sweep) + /// rather than being committed and creating a Lance-HEAD-ahead + /// residual. + pub(crate) async fn commit_all( + self, + db: &crate::db::Omnigraph, + branch: Option<&str>, + sidecar_kind: SidecarKind, + actor_id: Option<&str>, + ) -> Result<( + Vec, + HashMap, + Option, + Vec>, + )> { + let StagedMutation { + inline_committed, + staged, + expected_versions, + paths, + } = self; + + // Acquire per-(table_key, branch) queues for every touched + // table — both staged and inline-committed. Sorted by + // `acquire_many` internally so all multi-table writers + // (mutation, branch_merge, schema_apply, future MR-870 + // recovery) agree on acquisition order — prevents lock-order + // inversion deadlock. + // + // For inline-committed tables (delete-only mutations), Lance + // HEAD has already advanced inside `delete_where` before + // `commit_all` runs. Holding the queue here doesn't prevent + // that interleaving (commit 6 will move queue acquisition into + // `delete_where`'s call site); it does prevent another writer + // from interleaving between our delete and our publish, which + // would otherwise leave a Lance-HEAD-ahead residual the + // delete-only sidecar (added below) would have to recover. + let mut queue_keys: Vec<(String, Option)> = Vec::with_capacity( + staged.len() + inline_committed.len(), + ); + for entry in &staged { + queue_keys.push((entry.table_key.clone(), entry.path.table_branch.clone())); + } + for table_key in inline_committed.keys() { + let path = paths.get(table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "StagedMutation::commit_all: missing path for inline-committed table '{}'", + table_key + )) + })?; + queue_keys.push((table_key.clone(), path.table_branch.clone())); + } + let guards = db.write_queue().acquire_many(&queue_keys).await; + + // Revalidate manifest pins. Read fresh per-branch snapshot — + // in-memory `db.snapshot()` may be stale if another writer + // committed since our stage. If any pin moved past our + // expected_version, fail-fast before commit_staged moves + // Lance HEAD. + // + // Both staged and inline-committed tables are revalidated. + // Inline-committed tables (delete-only path) had their Lance + // HEAD advanced before this point, but the *manifest pin* + // shouldn't have moved if no other writer interleaved. If it + // has, return manifest_conflict — the sidecar emitted below + // captures (expected, post) so the next open's recovery sweep + // can resolve the Lance-HEAD-vs-manifest divergence. + // + // Note: under PR 1b's intermediate state (global server RwLock + // in place), this revalidation is a no-op because no concurrent + // writer can run. Becomes load-bearing once PR 2 removes the + // global lock — see `.context/pr-1b-plan.md` Risk 3. + if !staged.is_empty() || !inline_committed.is_empty() { + let snapshot = db.snapshot_for_branch(branch).await?; + for entry in &staged { + let current = snapshot.entry(&entry.table_key).map(|e| e.table_version); + match current { + Some(v) if v == entry.expected_version => {} + Some(other) => { + return Err(OmniError::manifest_conflict(format!( + "table '{}' pin moved from {} to {} between stage and commit", + entry.table_key, entry.expected_version, other, + ))); + } + None => { + return Err(OmniError::manifest_conflict(format!( + "table '{}' missing from manifest at commit time", + entry.table_key, + ))); + } + } + } + for table_key in inline_committed.keys() { + let expected = expected_versions.get(table_key).copied().ok_or_else(|| { + OmniError::manifest_internal(format!( + "StagedMutation::commit_all: missing expected version for inline-committed table '{}'", + table_key + )) + })?; + let current = snapshot.entry(table_key).map(|e| e.table_version); + match current { + Some(v) if v == expected => {} + Some(other) => { + return Err(OmniError::manifest_conflict(format!( + "table '{}' pin moved from {} to {} between inline-commit and publish", + table_key, expected, other, + ))); + } + None => { + return Err(OmniError::manifest_conflict(format!( + "table '{}' missing from manifest at commit time", + table_key, + ))); + } + } + } + } + + // Sidecar protocol: build the per-table pin list and write the + // sidecar BEFORE any Lance commit_staged runs, so a crash + // between Phase B (this loop) and Phase C (the caller's manifest + // publish) is recoverable on next open. + // + // Pins cover BOTH staged tables (Lance HEAD will advance below + // when `commit_staged` runs) AND inline-committed tables + // (Lance HEAD already advanced inside `delete_where` — we still + // need a sidecar so that an upcoming publish failure is + // recoverable on next open). This closes the third-agent + // Finding 3 hazard: delete-only mutations would otherwise skip + // the sidecar, leaving any commit→publish residual unreachable + // by recovery. + let mut pins: Vec = Vec::with_capacity( + staged.len() + inline_committed.len(), + ); + for entry in &staged { + pins.push(SidecarTablePin { + table_key: entry.table_key.clone(), + table_path: entry.path.full_path.clone(), + expected_version: entry.expected_version, + post_commit_pin: entry.expected_version + 1, + table_branch: entry.path.table_branch.clone(), + }); + } + for (table_key, update) in &inline_committed { + let path = paths.get(table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "StagedMutation::commit_all: missing path for inline-committed table '{}'", + table_key + )) + })?; + let expected = *expected_versions.get(table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "StagedMutation::commit_all: missing expected version for inline-committed table '{}'", + table_key + )) + })?; + pins.push(SidecarTablePin { + table_key: table_key.clone(), + table_path: path.full_path.clone(), + expected_version: expected, + // For inline-committed tables, the post-commit pin is + // the actual post-delete version recorded by + // `record_inline`, NOT `expected + 1` — `delete_where` + // can advance HEAD by more than one version (e.g., + // when Lance internally compacts deletion vectors). + post_commit_pin: update.table_version, + table_branch: path.table_branch.clone(), + }); + } + + let mut updates: Vec = inline_committed.into_values().collect(); + + let sidecar_handle = if pins.is_empty() { + None + } else { + let sidecar = new_sidecar( + sidecar_kind, + branch.map(|s| s.to_string()), + actor_id.map(str::to_string), + pins, + ); + Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?) + }; + + for entry in staged { + let StagedTableEntry { + table_key, + path, + expected_version: _, + dataset, + staged_write, + } = entry; + let new_ds = db .table_store() - .commit_staged(Arc::new(ds), staged.transaction) + .commit_staged(Arc::new(dataset), staged_write.transaction) .await?; let state = db .table_store() .table_state(&path.full_path, &new_ds) .await?; updates.push(SubTableUpdate { - table_key: table_key.clone(), + table_key, table_version: state.version, table_branch: path.table_branch.clone(), row_count: state.row_count, @@ -370,7 +591,7 @@ impl MutationStaging { }); } - Ok((updates, expected_versions, sidecar_handle)) + Ok((updates, expected_versions, sidecar_handle, guards)) } } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index f4cf7d1..3e971ca 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -90,13 +90,8 @@ impl Omnigraph { mode: LoadMode, actor_id: Option<&str>, ) -> Result { - let previous_actor = self.audit_actor_id.clone(); - self.audit_actor_id = actor_id.map(str::to_string); - let result = self - .ingest_with_current_actor(branch, from, data, mode) - .await; - self.audit_actor_id = previous_actor; - result + self.ingest_with_current_actor(branch, from, data, mode, actor_id) + .await } pub async fn ingest_file( @@ -127,6 +122,7 @@ impl Omnigraph { from: Option<&str>, data: &str, mode: LoadMode, + actor_id: Option<&str>, ) -> Result { self.ensure_schema_state_valid().await?; let target_branch = @@ -143,7 +139,7 @@ impl Omnigraph { .await?; } - let result = self.load(&target_branch, data, mode).await?; + let result = self.load_as(&target_branch, data, mode, actor_id).await?; Ok(IngestResult { branch: target_branch, base_branch, @@ -154,6 +150,16 @@ impl Omnigraph { } pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result { + self.load_as(branch, data, mode, None).await + } + + pub async fn load_as( + &mut self, + branch: &str, + data: &str, + mode: LoadMode, + actor_id: Option<&str>, + ) -> Result { self.ensure_schema_state_valid().await?; // Reject internal `__run__*` / system-prefixed branches at the // public write boundary. Direct-publish paths assert this @@ -169,7 +175,7 @@ impl Omnigraph { // Direct-to-target writes: no Run state machine, no `__run__` staging // branch. Cross-table OCC is enforced by the publisher's // `expected_table_versions` CAS inside `load_jsonl_reader`. - self.load_direct_on_branch(requested.as_deref(), data, mode) + self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id) .await } @@ -188,9 +194,10 @@ impl Omnigraph { branch: Option<&str>, data: &str, mode: LoadMode, + actor_id: Option<&str>, ) -> Result { let reader = BufReader::new(Cursor::new(data.as_bytes())); - load_jsonl_reader(self, branch, reader, mode).await + load_jsonl_reader(self, branch, reader, mode, actor_id).await } } @@ -232,6 +239,7 @@ async fn load_jsonl_reader( branch: Option<&str>, reader: R, mode: LoadMode, + actor_id: Option<&str>, ) -> Result { let catalog = db.catalog().clone(); @@ -537,15 +545,19 @@ async fn load_jsonl_reader( // Phase 4: Atomic manifest commit with publisher-level OCC. if use_staging { - let (updates, expected_versions, sidecar_handle) = staging - .finalize(db, branch, crate::db::manifest::SidecarKind::Load) + let staged = staging.stage_all(db, branch).await?; + // `_queue_guards` holds per-(table_key, branch) write queues + // across the manifest publish below — see exec/mutation.rs for + // the rationale (interleaving prevention). + let (updates, expected_versions, sidecar_handle, _queue_guards) = staged + .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id) .await?; // Same finalize → publisher residual as mutations: per-table // staged commits have advanced Lance HEAD, but the manifest // publish has not run yet. Reuse the mutation failpoint name so // one failpoint pins the shared `MutationStaging` boundary. crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; - db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions) + db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id) .await?; // The recovery sidecar protects the per-table commit_staged → // manifest publish window. Phase C succeeded — clean up @@ -574,6 +586,7 @@ async fn load_jsonl_reader( branch, &overwrite_updates, &overwrite_expected, + actor_id, ) .await?; }