mr-686: bundle PR 0/1a/1b foundation + PR 2 catalog/schema_source ArcSwap

Bundles the working-tree state from the prior session (PR 0 bench harness,
PR 1a audit_actor_id removal, PR 1b WriteQueueManager + writer integration)
together with the first half of PR 2's interior-mutability foundation
(catalog and schema_source wrapped in Arc<ArcSwap<...>>). The two streams
intermix in 7 of the same files, so splitting via git add -p was
impractical. Subsequent PR 2 steps land as separate atomic commits.

PR 0 — server-level concurrent /change bench harness
  - crates/omnigraph-server/examples/bench_concurrent_http.rs (new)
  - .context/bench-results/{baseline-main,after-pr1}/ (gitignored)

PR 1a — drop the audit_actor_id field, thread per-call
  - removed Omnigraph::audit_actor_id and the swap-restore patterns in
    mutation.rs, merge.rs, loader/mod.rs
  - actor_id: Option<&str> threaded through MutationStaging::finalize,
    mutate_with_current_actor, ingest_with_current_actor,
    branch_merge_impl, branch_merge_on_current_target,
    commit_prepared_updates*, record_merge_commit,
    commit_updates_on_branch_with_expected
  - apply_schema and ensure_indices_for_branch pass None (system-attributed)

PR 1b — per-(table_key, branch) write queue + revalidation + sidecar
  - new crates/omnigraph/src/db/write_queue.rs with WriteQueueManager,
    acquire/acquire_many, sorted+deduped acquisition; 6 unit tests
  - Arc<WriteQueueManager> field on Omnigraph + db.write_queue() accessor
  - MutationStaging::finalize split into stage_all (Phase A, no queue)
    and StagedMutation::commit_all (Phase B, acquire_many + revalidate
    pins + sidecar + commit_staged); guards held across publisher
  - delete-only mutations now emit recovery sidecars; revalidation
    extended to inline_committed tables
  - branch_merge_on_current_target, apply_schema_with_lock, and
    ensure_indices_for_branch acquire per-table queues for their
    touched tables

PR 2 Step B (partial) — catalog and schema_source via ArcSwap
  - catalog: Catalog -> Arc<ArcSwap<Catalog>>
  - schema_source: String -> Arc<ArcSwap<String>>
  - public accessors return Arc<Catalog> / Arc<String>; readers bind
    locally where the borrow has to outlive an expression
  - new pub(crate) store_catalog / store_schema_source helpers replace
    the field assignments in apply_schema and reload_schema_if_source_changed
  - 117 tests across lifecycle/end_to_end/branching/runs pass; engine
    lib + workspace compile clean

Coordinator wrap (Mutex) and the &mut self -> &self engine API
conversion follow in subsequent commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-07 16:22:38 +02:00
parent cd780e2d37
commit fcb47620d3
No known key found for this signature in database
15 changed files with 1041 additions and 183 deletions

View file

@ -236,5 +236,6 @@ Rules:
4. **Re-verify before recommending.** If you cite a flag, env var, endpoint, or constant to the user or in code, grep for it in source first. Memory and docs go stale; the code is authoritative.
5. **Keep AGENTS.md a map, not an encyclopedia.** New deep content goes into `docs/`. Add an entry to "Where to find each topic" instead of pasting prose into this file. The "Always-on rules" section is the exception — it's for invariants that should always be in scope.
6. **Re-read on schema/query/IR changes.** Edits to `schema.pest`, `query.pest`, `ir/lower.rs`, `query/typecheck.rs`, or `query/lint.rs` should trigger a re-read of [docs/schema-language.md](docs/schema-language.md), [docs/query-language.md](docs/query-language.md), and [docs/execution.md](docs/execution.md) to confirm they still describe reality.
7. **Always make smaller commits.** Each commit does one thing, compiles, and passes tests; mechanical refactors land separately from the behavior changes they enable.
CI check: `scripts/check-agents-md.sh` verifies that every `docs/*.md` link in this file resolves and that every doc in the canonical set is linked. Run it locally before opening a PR if you've moved or renamed docs.

View file

@ -1425,7 +1425,7 @@ async fn execute_query_lint(
let uri = resolve_local_uri(config, cli_uri, cli_target, "query lint")?;
let db = Omnigraph::open(&uri).await?;
Ok(lint_query_file(
db.catalog(),
&db.catalog(),
&query_source,
query_path,
QueryLintSchemaSource::repo(uri),

View file

@ -0,0 +1,267 @@
//! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline).
//!
//! Drives concurrent `/change` requests against an in-process Omnigraph HTTP
//! server. Measures the global `Arc<RwLock<Omnigraph>>` lock penalty on
//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline.
//!
//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as`
//! is `&mut self`, so an engine-level concurrent bench either serializes on the
//! borrow checker (measures nothing) or drives multiple handles (measures Lance
//! contention, not the server bottleneck). Driving the HTTP server is the only
//! way to measure the actual `RwLock<Omnigraph>` contention this work removes.
//!
//! Usage:
//! ```sh
//! cargo run --release -p omnigraph-server --example bench_concurrent_http -- \
//! --tables 16 --actors 16 --ops-per-actor 1000 --mode disjoint \
//! --output bench-results/baseline-main/cross-table.json
//! ```
//!
//! Modes:
//! - `disjoint`: each actor writes to a distinct node type (cross-table fanout)
//! - `same-key`: all actors write to the same node type (hot-key contention)
//! - `mixed`: each actor writes to a different table per op (round-robin)
use std::path::PathBuf;
use std::time::{Duration, Instant};
use axum::Router;
use axum::body::{Body, to_bytes};
use axum::http::{Method, Request, StatusCode};
use clap::{Parser, ValueEnum};
use omnigraph::db::Omnigraph;
use omnigraph_server::api::ChangeRequest;
use omnigraph_server::{AppState, build_app};
use serde::Serialize;
use tower::ServiceExt;
#[derive(Parser, Debug)]
#[command(about = "Concurrent HTTP bench for MR-686")]
struct Args {
/// Number of distinct node types in the schema.
#[arg(long, default_value_t = 16)]
tables: usize,
/// Number of concurrent actors driving requests.
#[arg(long, default_value_t = 16)]
actors: usize,
/// Operations per actor.
#[arg(long, default_value_t = 100)]
ops_per_actor: usize,
/// Workload mode.
#[arg(long, value_enum, default_value_t = Mode::Disjoint)]
mode: Mode,
/// Output file for the JSON results. Stdout always gets a copy.
#[arg(long)]
output: Option<PathBuf>,
/// Optional label to record alongside results (e.g. "baseline-main").
#[arg(long, default_value = "")]
label: String,
}
#[derive(Clone, Copy, Debug, ValueEnum, Serialize)]
#[serde(rename_all = "kebab-case")]
enum Mode {
Disjoint,
SameKey,
Mixed,
}
#[derive(Serialize, Debug)]
struct BenchResults {
label: String,
mode: Mode,
tables: usize,
actors: usize,
ops_per_actor: usize,
total_ops: usize,
error_count: usize,
wall_time_ms: u64,
throughput_ops_per_sec: f64,
p50_ms: f64,
p95_ms: f64,
p99_ms: f64,
p999_ms: f64,
max_ms: f64,
notes: &'static str,
}
fn build_schema(num_tables: usize) -> String {
let mut schema = String::new();
for i in 0..num_tables {
schema.push_str(&format!(
"node Item{i} {{\n name: String @key\n value: I32?\n}}\n\n"
));
}
schema
}
fn build_query_source(table_idx: usize) -> String {
format!(
"query insert_item($name: String, $value: I32) {{\n insert Item{table_idx} {{ name: $name, value: $value }}\n}}"
)
}
fn pick_table(actor_idx: usize, op_idx: usize, mode: Mode, num_tables: usize) -> usize {
match mode {
Mode::Disjoint => actor_idx % num_tables,
Mode::SameKey => 0,
Mode::Mixed => (actor_idx.wrapping_mul(7919) ^ op_idx) % num_tables,
}
}
async fn drive_actor(
app: Router,
actor_idx: usize,
ops: usize,
mode: Mode,
num_tables: usize,
) -> (Vec<Duration>, usize) {
let mut latencies = Vec::with_capacity(ops);
let mut errors = 0usize;
for op_idx in 0..ops {
let table_idx = pick_table(actor_idx, op_idx, mode, num_tables);
let request_body = ChangeRequest {
query_source: build_query_source(table_idx),
query_name: Some("insert_item".to_string()),
params: Some(serde_json::json!({
"name": format!("a{actor_idx}_o{op_idx}"),
"value": op_idx as i32,
})),
branch: None,
};
let body = serde_json::to_vec(&request_body).unwrap();
let req = Request::builder()
.method(Method::POST)
.uri("/change")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let start = Instant::now();
let response = match app.clone().oneshot(req).await {
Ok(r) => r,
Err(e) => {
eprintln!("actor {actor_idx} op {op_idx} transport error: {e:?}");
errors += 1;
continue;
}
};
let elapsed = start.elapsed();
let status = response.status();
if status != StatusCode::OK {
errors += 1;
// Drain body for logging on the first few failures.
if errors <= 3 {
let body = to_bytes(response.into_body(), 64 * 1024).await.unwrap_or_default();
eprintln!(
"actor {actor_idx} op {op_idx} status {status} body {}",
String::from_utf8_lossy(&body)
);
}
}
latencies.push(elapsed);
}
(latencies, errors)
}
#[tokio::main]
async fn main() {
let args = Args::parse();
if args.tables == 0 || args.actors == 0 || args.ops_per_actor == 0 {
eprintln!("--tables, --actors, --ops-per-actor must all be > 0");
std::process::exit(2);
}
let temp = tempfile::tempdir().expect("tempdir");
let repo = temp.path().join("bench.omni");
let schema = build_schema(args.tables);
Omnigraph::init(repo.to_str().unwrap(), &schema)
.await
.expect("init repo");
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.expect("open AppState");
let app = build_app(state);
eprintln!(
"running mode={:?} tables={} actors={} ops_per_actor={}",
args.mode, args.tables, args.actors, args.ops_per_actor
);
let start = Instant::now();
let mut handles = Vec::with_capacity(args.actors);
for actor_idx in 0..args.actors {
let app = app.clone();
let mode = args.mode;
let ops = args.ops_per_actor;
let num_tables = args.tables;
handles.push(tokio::spawn(async move {
drive_actor(app, actor_idx, ops, mode, num_tables).await
}));
}
let mut all_latencies: Vec<Duration> = Vec::with_capacity(args.actors * args.ops_per_actor);
let mut total_errors = 0usize;
for h in handles {
let (lats, errs) = h.await.expect("actor task panicked");
all_latencies.extend(lats);
total_errors += errs;
}
let wall = start.elapsed();
all_latencies.sort();
let n = all_latencies.len();
let pct = |p: f64| -> f64 {
if n == 0 {
return 0.0;
}
let idx = ((n as f64 - 1.0) * p).round() as usize;
all_latencies[idx].as_secs_f64() * 1000.0
};
let max_ms = all_latencies
.last()
.map(|d| d.as_secs_f64() * 1000.0)
.unwrap_or(0.0);
let throughput = if wall.as_secs_f64() > 0.0 {
n as f64 / wall.as_secs_f64()
} else {
0.0
};
let results = BenchResults {
label: args.label.clone(),
mode: args.mode,
tables: args.tables,
actors: args.actors,
ops_per_actor: args.ops_per_actor,
total_ops: n,
error_count: total_errors,
wall_time_ms: wall.as_millis() as u64,
throughput_ops_per_sec: throughput,
p50_ms: pct(0.50),
p95_ms: pct(0.95),
p99_ms: pct(0.99),
p999_ms: pct(0.999),
max_ms,
notes: "MR-686 PR 0 baseline. Drives /change via Tower oneshot.",
};
let json = serde_json::to_string_pretty(&results).unwrap();
println!("{json}");
if let Some(path) = args.output.as_ref() {
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent).expect("mkdir output parent");
}
std::fs::write(path, &json).expect("write output");
eprintln!("wrote {}", path.display());
}
if total_errors > 0 {
eprintln!("WARN: {total_errors} requests failed");
std::process::exit(1);
}
}

View file

@ -5,6 +5,7 @@ mod omnigraph;
mod recovery_audit;
mod run_registry;
mod schema_state;
pub(crate) mod write_queue;
pub use commit_graph::GraphCommit;
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};

View file

@ -2,6 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::io::Write;
use std::sync::Arc;
use arc_swap::ArcSwap;
use arrow_array::{
Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
@ -76,9 +77,19 @@ pub struct Omnigraph {
coordinator: GraphCoordinator,
table_store: TableStore,
runtime_cache: RuntimeCache,
catalog: Catalog,
schema_source: String,
pub(crate) audit_actor_id: Option<String>,
/// Read-heavy on every query, written only by `apply_schema`. ArcSwap
/// gives atomic pointer swap with zero-cost reads (`load()` returns a
/// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
/// don't contend on a lock to read the catalog.
catalog: Arc<ArcSwap<Catalog>>,
/// Read-heavy on schema introspection paths, written only by
/// `apply_schema`. Same ArcSwap rationale as `catalog`.
schema_source: Arc<ArcSwap<String>>,
/// Per-`(table_key, branch)` writer queues. Reachable from engine
/// internals (mutation finalize, schema_apply, branch_merge,
/// ensure_indices, delete_where) and from future MR-870 recovery
/// reconciler. PR 1b adds the field; callers acquire in commits 4+.
write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
}
/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
@ -131,9 +142,9 @@ impl Omnigraph {
coordinator,
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
catalog,
schema_source: schema_source.to_string(),
audit_actor_id: None,
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
})
}
@ -217,18 +228,35 @@ impl Omnigraph {
coordinator,
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
catalog,
schema_source,
audit_actor_id: None,
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
})
}
pub fn catalog(&self) -> &Catalog {
&self.catalog
/// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
/// catalog pointer; callers can hold the returned `Arc` across awaits
/// without blocking concurrent `apply_schema`.
pub fn catalog(&self) -> Arc<Catalog> {
self.catalog.load_full()
}
pub fn schema_source(&self) -> &str {
&self.schema_source
/// Returns an `Arc<String>` snapshot of the schema source.
pub fn schema_source(&self) -> Arc<String> {
self.schema_source.load_full()
}
/// Atomically swap the in-memory catalog. Concurrent readers see
/// either the old or the new pointer; never a torn state. Used by
/// `apply_schema` and `reload_schema_if_source_changed`.
pub(crate) fn store_catalog(&self, catalog: Catalog) {
self.catalog.store(Arc::new(catalog));
}
/// Atomically swap the in-memory schema source. Same rationale as
/// [`store_catalog`](Self::store_catalog).
pub(crate) fn store_schema_source(&self, schema_source: String) {
self.schema_source.store(Arc::new(schema_source));
}
pub fn uri(&self) -> &str {
@ -278,6 +306,17 @@ impl Omnigraph {
self.storage.as_ref()
}
/// Per-`(table_key, branch)` writer queues.
///
/// Engine-internal writers (mutation finalize, schema_apply,
/// branch_merge, ensure_indices, delete_where) and the future MR-870
/// recovery reconciler reach the queue manager via this accessor.
/// Returns an `Arc` clone so callers can hold the manager across
/// `&mut self` engine API boundaries.
pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
Arc::clone(&self.write_queue)
}
/// Engine-level access to the repo's normalized root URI. Used by
/// the recovery sidecar protocol to compute `__recovery/` paths.
pub(crate) fn root_uri(&self) -> &str {
@ -433,7 +472,7 @@ impl Omnigraph {
async fn reload_schema_if_source_changed(&mut self) -> Result<()> {
let schema_path = schema_source_uri(&self.root_uri);
let schema_source = self.storage.read_text(&schema_path).await?;
if schema_source == self.schema_source {
if schema_source == *self.schema_source.load_full() {
return Ok(());
}
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
@ -447,8 +486,8 @@ impl Omnigraph {
.await?;
let mut catalog = build_catalog_from_ir(&accepted_ir)?;
fixup_blob_schemas(&mut catalog);
self.schema_source = schema_source;
self.catalog = catalog;
self.store_schema_source(schema_source);
self.store_catalog(catalog);
Ok(())
}
@ -658,8 +697,8 @@ impl Omnigraph {
/// ```
pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
self.ensure_schema_state_valid().await?;
let node_type = self
.catalog
let catalog = self.catalog();
let node_type = catalog
.node_types
.get(type_name)
.ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
@ -801,10 +840,6 @@ impl Omnigraph {
self.coordinator.branch_create(name).await
}
pub(crate) fn current_audit_actor(&self) -> Option<&str> {
self.audit_actor_id.as_deref()
}
pub async fn branch_create_from(
&mut self,
from: impl Into<ReadTarget>,
@ -976,12 +1011,14 @@ impl Omnigraph {
manifest_version: u64,
parent_commit_id: &str,
merged_parent_commit_id: &str,
actor_id: Option<&str>,
) -> Result<String> {
table_ops::record_merge_commit(
self,
manifest_version,
parent_commit_id,
merged_parent_commit_id,
actor_id,
)
.await
}
@ -991,12 +1028,14 @@ impl Omnigraph {
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
) -> Result<u64> {
table_ops::commit_updates_on_branch_with_expected(
self,
branch,
updates,
expected_table_versions,
actor_id,
)
.await
}

View file

@ -142,7 +142,8 @@ async fn export_table_to_writer<W: Write>(
.open_snapshot_table(snapshot, table_key)
.await?;
let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]);
let blob_properties = blob_properties_for_table_key(db.catalog(), table_key)?;
let catalog = db.catalog();
let blob_properties = blob_properties_for_table_key(&catalog, table_key)?;
if blob_properties.is_empty() {
for batch in db.table_store.scan(&ds, None, None, ordering).await? {
@ -207,9 +208,9 @@ fn write_export_rows_from_batch<W: Write>(
blob_values: Option<&HashMap<String, Vec<Option<String>>>>,
writer: &mut W,
) -> Result<()> {
let catalog = db.catalog();
if let Some(type_name) = table_key.strip_prefix("node:") {
let node_type = db
.catalog
let node_type = catalog
.node_types
.get(type_name)
.ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
@ -243,8 +244,7 @@ fn write_export_rows_from_batch<W: Write>(
}
if let Some(edge_name) = table_key.strip_prefix("edge:") {
let edge_type = db
.catalog
let edge_type = catalog
.edge_types
.get(edge_name)
.ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;

View file

@ -81,7 +81,7 @@ pub async fn optimize_all_tables(db: &mut Omnigraph) -> Result<Vec<TableOptimize
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<_> = all_table_keys(&db.catalog)
let table_tasks: Vec<_> = all_table_keys(&db.catalog())
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
@ -144,7 +144,7 @@ pub async fn cleanup_all_tables(
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<_> = all_table_keys(&db.catalog)
let table_tasks: Vec<_> = all_table_keys(&db.catalog())
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;

View file

@ -209,6 +209,26 @@ pub(super) async fn apply_schema_with_lock(
});
}
// Acquire per-(table_key, branch) queues for every existing table
// that schema_apply will rewrite or re-index. New tables (added or
// renamed targets) aren't acquired — they have no existing dataset
// to race against. Held across the per-table commit loop and the
// manifest publish via `commit_changes_with_actor` below.
//
// Schema-apply already holds the graph-wide `__schema_apply_lock__`
// sentinel branch, so under PR 1b's intermediate state these
// per-table acquisitions are uncontended. They exist for symmetry
// with future MR-870 recovery, which will need queue acquisition
// before any `Dataset::restore` it issues for SchemaApply sidecars.
let schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
.iter()
.map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
.collect();
let _schema_apply_queue_guards = db
.write_queue()
.acquire_many(&schema_apply_queue_keys)
.await;
let recovery_handle = if recovery_pins.is_empty()
&& sidecar_registrations.is_empty()
&& sidecar_tombstones.is_empty()
@ -225,7 +245,10 @@ pub(super) async fn apply_schema_with_lock(
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::SchemaApply,
None,
db.audit_actor_id.clone(),
// `apply_schema` doesn't currently take an actor (no `apply_schema_as`
// public API). The HTTP server's /schema/apply handler can pass actor
// context through a follow-up addition. For now, system-attributed.
None,
recovery_pins,
);
sidecar.additional_registrations = sidecar_registrations;
@ -266,11 +289,12 @@ pub(super) async fn apply_schema_with_lock(
})?;
ensure_snapshot_entry_head_matches(db, source_entry).await?;
let source_ds = snapshot.open(source_table_key).await?;
let current_catalog = db.catalog();
let batch = batch_for_schema_apply_rewrite(
db,
&source_ds,
source_table_key,
&db.catalog,
&current_catalog,
target_table_key,
&desired_catalog,
property_renames.get(target_table_key),
@ -311,11 +335,12 @@ pub(super) async fn apply_schema_with_lock(
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
let source_ds = snapshot.open(table_key).await?;
let current_catalog = db.catalog();
let batch = batch_for_schema_apply_rewrite(
db,
&source_ds,
table_key,
&db.catalog,
&current_catalog,
table_key,
&desired_catalog,
property_renames.get(table_key),
@ -444,13 +469,13 @@ pub(super) async fn apply_schema_with_lock(
crate::failpoints::maybe_fail("schema_apply.after_staging_write")?;
let actor_id = db.current_audit_actor().map(str::to_string);
// `apply_schema` doesn't currently take an actor; system-attributed.
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = db
.coordinator
.commit_changes_with_actor(&manifest_changes, actor_id.as_deref())
.commit_changes_with_actor(&manifest_changes, None)
.await?;
crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?;
@ -471,8 +496,8 @@ pub(super) async fn apply_schema_with_lock(
)
.await?;
db.catalog = desired_catalog;
db.schema_source = desired_schema_source.to_string();
db.store_catalog(desired_catalog);
db.store_schema_source(desired_schema_source.to_string());
db.coordinator.refresh().await?;
db.runtime_cache.invalidate_all().await;
if changed_edge_tables {

View file

@ -11,14 +11,16 @@ pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index
.to_string(),
))
.await?;
db.runtime_cache.graph_index(&resolved, &db.catalog).await
let catalog = db.catalog();
db.runtime_cache.graph_index(&resolved, &catalog).await
}
pub(super) async fn graph_index_for_resolved(
db: &Omnigraph,
resolved: &ResolvedTarget,
) -> Result<Arc<crate::graph_index::GraphIndex>> {
db.runtime_cache.graph_index(resolved, &db.catalog).await
let catalog = db.catalog();
db.runtime_cache.graph_index(resolved, &catalog).await
}
pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> {
@ -58,8 +60,14 @@ pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.to_string(), entry.table_version);
commit_prepared_updates_on_branch_with_expected(db, branch.as_deref(), &[update], &expected)
.await
commit_prepared_updates_on_branch_with_expected(
db,
branch.as_deref(),
&[update],
&expected,
None,
)
.await
}
pub(super) async fn ensure_indices_for_branch(
@ -72,6 +80,7 @@ pub(super) async fn ensure_indices_for_branch(
let snapshot = resolved.snapshot;
let mut updates = Vec::new();
let active_branch = resolved.branch;
let catalog = db.catalog();
// Recovery sidecar: protect the per-table commit_staged loop in
// build_indices_on_dataset (one commit per index built). Only pins
@ -83,7 +92,7 @@ pub(super) async fn ensure_indices_for_branch(
// committed work on sibling tables. Steady-state runs (everything
// already indexed) skip the sidecar entirely.
let mut recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = Vec::new();
for type_name in db.catalog.node_types.keys() {
for type_name in catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
@ -122,7 +131,7 @@ pub(super) async fn ensure_indices_for_branch(
});
}
}
for edge_name in db.catalog.edge_types.keys() {
for edge_name in catalog.edge_types.keys() {
let table_key = format!("edge:{}", edge_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
@ -147,13 +156,28 @@ pub(super) async fn ensure_indices_for_branch(
});
}
}
// Acquire per-(table_key, active_branch) queues for every table
// that needs index work. Held across the per-table commit loop and
// the manifest publish at the end of this function. Sorted-order
// acquisition prevents lock-order inversion against concurrent
// multi-table writers (mutation finalize, branch_merge, future
// MR-870 recovery). Under PR 1b's intermediate state (global server
// RwLock still in place), this acquisition is uncontended.
let queue_keys: Vec<(String, Option<String>)> = recovery_pins
.iter()
.map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
.collect();
let _queue_guards = db.write_queue().acquire_many(&queue_keys).await;
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::EnsureIndices,
active_branch.clone(),
db.audit_actor_id.clone(),
// `ensure_indices` doesn't currently take an actor; system-attributed.
// Future: add `ensure_indices_as` to thread actor context.
None,
recovery_pins,
);
Some(
@ -162,7 +186,7 @@ pub(super) async fn ensure_indices_for_branch(
)
};
for type_name in db.catalog.node_types.keys() {
for type_name in catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
@ -209,7 +233,7 @@ pub(super) async fn ensure_indices_for_branch(
}
}
for edge_name in db.catalog.edge_types.keys() {
for edge_name in catalog.edge_types.keys() {
let table_key = format!("edge:{}", edge_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
@ -264,7 +288,7 @@ pub(super) async fn ensure_indices_for_branch(
crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?;
if !updates.is_empty() {
commit_prepared_updates_on_branch(db, branch, &updates).await?;
commit_prepared_updates_on_branch(db, branch, &updates, None).await?;
}
// Recovery sidecar lifecycle: delete after the manifest publish (or
@ -321,7 +345,8 @@ async fn needs_index_work_node(
if !db.table_store.has_btree_index(&ds, "id").await? {
return Ok(true);
}
let Some(node_type) = db.catalog.node_types.get(type_name) else {
let catalog = db.catalog();
let Some(node_type) = catalog.node_types.get(type_name) else {
return Ok(false);
};
for index_cols in &node_type.indices {
@ -505,7 +530,8 @@ pub(super) async fn build_indices_on_dataset(
table_key: &str,
ds: &mut Dataset,
) -> Result<()> {
build_indices_on_dataset_for_catalog(db, &db.catalog, table_key, ds).await
let catalog = db.catalog();
build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await
}
pub(super) async fn build_indices_on_dataset_for_catalog(
@ -704,14 +730,14 @@ async fn prepare_updates_for_commit(
async fn commit_prepared_updates(
db: &mut Omnigraph,
updates: &[crate::db::SubTableUpdate],
actor_id: Option<&str>,
) -> Result<u64> {
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = db
.coordinator
.commit_updates_with_actor(updates, actor_id.as_deref())
.commit_updates_with_actor(updates, actor_id)
.await?;
Ok(manifest_version)
}
@ -720,18 +746,14 @@ async fn commit_prepared_updates_with_expected(
db: &mut Omnigraph,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
) -> Result<u64> {
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = db
.coordinator
.commit_updates_with_actor_with_expected(
updates,
expected_table_versions,
actor_id.as_deref(),
)
.commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
.await?;
Ok(manifest_version)
}
@ -740,11 +762,12 @@ pub(super) async fn commit_prepared_updates_on_branch(
db: &mut Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
actor_id: Option<&str>,
) -> Result<u64> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates(db, updates).await;
return commit_prepared_updates(db, updates, actor_id).await;
}
let mut coordinator = match requested_branch.as_deref() {
@ -753,12 +776,11 @@ pub(super) async fn commit_prepared_updates_on_branch(
}
None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
};
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = coordinator
.commit_updates_with_actor(updates, actor_id.as_deref())
.commit_updates_with_actor(updates, actor_id)
.await?;
Ok(manifest_version)
}
@ -768,11 +790,18 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected(
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
) -> Result<u64> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates_with_expected(db, updates, expected_table_versions).await;
return commit_prepared_updates_with_expected(
db,
updates,
expected_table_versions,
actor_id,
)
.await;
}
let mut coordinator = match requested_branch.as_deref() {
@ -781,16 +810,11 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected(
}
None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
};
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = coordinator
.commit_updates_with_actor_with_expected(
updates,
expected_table_versions,
actor_id.as_deref(),
)
.commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
.await?;
Ok(manifest_version)
}
@ -805,7 +829,7 @@ pub(super) async fn commit_updates(
db.ensure_schema_apply_not_locked("write commit").await?;
let current_branch = db.coordinator.current_branch().map(str::to_string);
let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
commit_prepared_updates(db, &prepared).await
commit_prepared_updates(db, &prepared, None).await
}
pub(super) async fn commit_manifest_updates(
@ -820,14 +844,14 @@ pub(super) async fn record_merge_commit(
manifest_version: u64,
parent_commit_id: &str,
merged_parent_commit_id: &str,
actor_id: Option<&str>,
) -> Result<String> {
let actor_id = db.current_audit_actor().map(str::to_string);
db.coordinator
.record_merge_commit(
manifest_version,
parent_commit_id,
merged_parent_commit_id,
actor_id.as_deref(),
actor_id,
)
.await
.map(|snapshot_id| snapshot_id.as_str().to_string())
@ -841,11 +865,18 @@ pub(super) async fn commit_updates_on_branch_with_expected(
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
) -> Result<u64> {
db.ensure_schema_apply_not_locked("write commit").await?;
let prepared = prepare_updates_for_commit(db, branch, updates).await?;
commit_prepared_updates_on_branch_with_expected(db, branch, &prepared, expected_table_versions)
.await
commit_prepared_updates_on_branch_with_expected(
db,
branch,
&prepared,
expected_table_versions,
actor_id,
)
.await
}
pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> {

View file

@ -0,0 +1,231 @@
//! Per-`(table_key, branch)` writer queues — MR-686 scaffolding.
//!
//! Today every server-layer write serializes on the global
//! `Arc<RwLock<Omnigraph>>` in `AppState`. MR-686 replaces that with
//! per-`(table_key, branch_ref)` queues so disjoint-key writes proceed
//! concurrently. This module owns the queue data structure; callers in
//! `MutationStaging::commit_all`, `branch_merge`, `schema_apply`,
//! `ensure_indices`, `delete_where`, and the future MR-870 recovery
//! reconciler acquire guards before any per-table Lance commit.
//!
//! ## Why exclusive `tokio::sync::Mutex<()>` per key
//!
//! Lance's `Dataset::restore` "wins" against concurrent Append/Update/
//! Delete/CreateIndex/Merge per `check_restore_txn`, silently orphaning
//! the concurrent writer's commit. The queue's *only* application-layer
//! job is to serialize Restore against every other writer on the same
//! `(table_key, branch_ref)`. Lance OCC handles the rest of the conflict
//! matrix (Append vs Append fully compatible, Update vs Update rebases or
//! retries, etc.) but cannot make Restore symmetric — that's an upstream
//! design choice. Until Lance fixes Restore (or BatchCommitTables
//! changes the protocol), every writer takes the same exclusive lock.
//!
//! `RwLock` (shared for normal writes, exclusive for Restore) is the
//! natural follow-up but adds a writer-classification surface that's
//! easy to get wrong; misclassifying any writer reintroduces the
//! orphaning hazard. We start with `Mutex` and revisit based on
//! production telemetry.
//!
//! ## Sorted-order acquisition
//!
//! `acquire_many` accepts a slice of keys and acquires them in
//! lexicographic order. Multi-table writers (mutation finalize,
//! branch_merge, future recovery reconciler) MUST go through
//! `acquire_many` so all callers agree on acquisition order — this is
//! how lock-order inversion deadlock is prevented.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard};
/// Queue key: `(table_key, branch_ref)`. `branch_ref = None` means main.
///
/// Branch is part of the key because the same Lance dataset can be
/// pinned at different versions on different branches; concurrent
/// writes to the same `table_key` on disjoint branches must NOT
/// serialize at the queue.
pub(crate) type TableQueueKey = (String, Option<String>);
/// Per-`(table_key, branch)` writer queue manager.
///
/// Lives on `Omnigraph` as `Arc<WriteQueueManager>` so HTTP handlers,
/// engine internals, the CLI binary, and future background reconcilers
/// (MR-870 recovery, MR-848 index) all reach it via the engine handle.
#[derive(Default)]
pub(crate) struct WriteQueueManager {
/// Held only briefly per `acquire` call: clone out the per-key Arc,
/// release the std mutex, then await the per-key tokio Mutex.
queues: Mutex<HashMap<TableQueueKey, Arc<AsyncMutex<()>>>>,
}
impl WriteQueueManager {
pub(crate) fn new() -> Self {
Self::default()
}
/// Get-or-create the per-key queue and clone its Arc.
fn slot(&self, key: &TableQueueKey) -> Arc<AsyncMutex<()>> {
let mut map = self.queues.lock().expect("write queue map poisoned");
if let Some(existing) = map.get(key) {
return Arc::clone(existing);
}
let fresh = Arc::new(AsyncMutex::new(()));
map.insert(key.clone(), Arc::clone(&fresh));
fresh
}
/// Acquire exclusive access to the queue for one `(table_key, branch)`.
///
/// Blocks until the lock is available. Drop the returned guard to
/// release; the lock outlives the `WriteQueueManager` borrow.
pub(crate) async fn acquire(&self, key: &TableQueueKey) -> OwnedMutexGuard<()> {
self.slot(key).lock_owned().await
}
/// Acquire exclusive access to many `(table_key, branch)` keys
/// atomically, in lex-sorted order. Used by multi-table writers
/// (mutation finalize, branch_merge, recovery) so all callers
/// agree on acquisition order — prevents lock-order inversion.
///
/// Empty input returns an empty Vec without touching the map.
/// Duplicates in `keys` are deduped before acquisition (the same
/// key acquired twice would deadlock against itself).
pub(crate) async fn acquire_many(
&self,
keys: &[TableQueueKey],
) -> Vec<OwnedMutexGuard<()>> {
if keys.is_empty() {
return Vec::new();
}
let mut sorted: Vec<TableQueueKey> = keys.to_vec();
sorted.sort();
sorted.dedup();
let mut guards = Vec::with_capacity(sorted.len());
for key in &sorted {
guards.push(self.acquire(key).await);
}
guards
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, Instant};
use tokio::time::timeout;
fn key(table: &str, branch: Option<&str>) -> TableQueueKey {
(table.to_string(), branch.map(str::to_string))
}
#[tokio::test]
async fn acquire_many_empty_returns_empty() {
let qm = WriteQueueManager::new();
let guards = qm.acquire_many(&[]).await;
assert!(guards.is_empty());
}
#[tokio::test]
async fn acquire_many_dedupes_repeated_keys() {
// Same key passed twice would deadlock if not deduped.
let qm = WriteQueueManager::new();
let k = key("t1", None);
let guards = timeout(
Duration::from_secs(2),
qm.acquire_many(&[k.clone(), k.clone(), k]),
)
.await
.expect("acquire_many with duplicates deadlocked");
assert_eq!(guards.len(), 1);
}
#[tokio::test]
async fn acquire_many_sorts_keys_deterministically() {
// Two callers passing keys in different orders must acquire in
// the same internal order. We test this indirectly: caller A
// passes [a, c] and caller B passes [c, a]; if they both
// acquire in sorted order the second caller blocks on `a` first,
// not `c` — same as A — so no deadlock under any interleaving.
// Direct sort observation: call acquire_many with a reversed
// input and verify it doesn't deadlock against a held guard on
// the sorted-first key.
let qm = Arc::new(WriteQueueManager::new());
let a = key("a", None);
let z = key("z", None);
// Hold `a` exclusively.
let _held = qm.acquire(&a).await;
// acquire_many([z, a]) — must sort to [a, z] internally and
// block on `a`. With a 200ms timeout we should NOT see it
// complete (it's blocked on `a`).
let qm2 = Arc::clone(&qm);
let z_clone = z.clone();
let a_clone = a.clone();
let result = timeout(Duration::from_millis(200), async move {
qm2.acquire_many(&[z_clone, a_clone]).await
})
.await;
assert!(result.is_err(), "acquire_many should block on `a`, the lex-first key");
}
#[tokio::test]
async fn same_key_acquire_serializes() {
let qm = Arc::new(WriteQueueManager::new());
let k = key("t1", None);
let first = qm.acquire(&k).await;
// Second acquire on same key should NOT complete within 200ms.
let qm2 = Arc::clone(&qm);
let k2 = k.clone();
let blocked = timeout(Duration::from_millis(200), async move {
qm2.acquire(&k2).await
})
.await;
assert!(blocked.is_err(), "second acquire on same key must block");
// Drop the first guard, then second acquire should succeed.
drop(first);
let _second = timeout(Duration::from_secs(2), qm.acquire(&k))
.await
.expect("second acquire after release should not block");
}
#[tokio::test]
async fn disjoint_keys_acquire_concurrently() {
let qm = Arc::new(WriteQueueManager::new());
let a = key("a", None);
let b = key("b", None);
// Hold `a` indefinitely.
let _held_a = qm.acquire(&a).await;
// Acquire `b` on a different task. Should complete promptly
// because `b` is disjoint from `a`.
let qm2 = Arc::clone(&qm);
let start = Instant::now();
let _held_b = timeout(Duration::from_secs(2), qm2.acquire(&b))
.await
.expect("disjoint key acquire must not block on unrelated held key");
assert!(
start.elapsed() < Duration::from_millis(500),
"disjoint acquire took {:?}, should be near-instant",
start.elapsed()
);
}
#[tokio::test]
async fn disjoint_branches_on_same_table_do_not_serialize() {
// (table, main) and (table, feature) are different keys.
let qm = Arc::new(WriteQueueManager::new());
let main_k = key("t1", None);
let feature_k = key("t1", Some("feature"));
let _held_main = qm.acquire(&main_k).await;
let _held_feature = timeout(Duration::from_secs(2), qm.acquire(&feature_k))
.await
.expect("same-table-different-branch should not serialize");
}
}

View file

@ -1018,17 +1018,14 @@ impl Omnigraph {
actor_id: Option<&str>,
) -> Result<MergeOutcome> {
self.ensure_schema_apply_idle("branch_merge").await?;
let previous_actor = self.audit_actor_id.clone();
self.audit_actor_id = actor_id.map(str::to_string);
let result = self.branch_merge_impl(source, target).await;
self.audit_actor_id = previous_actor;
result
self.branch_merge_impl(source, target, actor_id).await
}
async fn branch_merge_impl(
&mut self,
source: &str,
target: &str,
actor_id: Option<&str>,
) -> Result<MergeOutcome> {
if is_internal_run_branch(source) || is_internal_run_branch(target) {
return Err(OmniError::manifest(format!(
@ -1090,6 +1087,7 @@ impl Omnigraph {
&target_head_commit_id,
&source_head_commit_id,
is_fast_forward,
actor_id,
)
.await;
self.restore_coordinator(previous);
@ -1108,6 +1106,7 @@ impl Omnigraph {
target_head_commit_id: &str,
source_head_commit_id: &str,
is_fast_forward: bool,
actor_id: Option<&str>,
) -> Result<MergeOutcome> {
self.ensure_commit_graph_initialized().await?;
let target_snapshot = self.snapshot();
@ -1146,7 +1145,7 @@ impl Omnigraph {
if let Some(staged) = stage_streaming_table_merge(
table_key,
self.catalog(),
&self.catalog(),
base_snapshot,
source_snapshot,
&target_snapshot,
@ -1193,6 +1192,29 @@ impl Omnigraph {
// requires pre-computing source deltas during candidate
// classification (a structural change to `CandidateTableState`)
// and is left as follow-up work.
// Acquire per-(table_key, target_branch) queues for every table
// touched by the merge plan. Sorted-order acquisition prevents
// lock-order inversion against concurrent multi-table writers.
// The active branch (set by the caller's `swap_coordinator_for_branch`)
// is the merge target; queue keys are scoped to it because a
// branch_merge writes only to the target branch.
//
// Held across the per-table publish loop and the manifest
// commit + record_merge_commit calls below. Under PR 1b's
// intermediate state (global server RwLock still in place),
// this acquisition is uncontended.
let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
.iter()
.filter(|table_key| {
matches!(
candidates.get(*table_key),
Some(CandidateTableState::RewriteMerged(_)) | Some(CandidateTableState::AdoptSourceState)
)
})
.map(|table_key| (table_key.clone(), self.active_branch().map(str::to_string)))
.collect();
let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
.iter()
.filter_map(|table_key| {
@ -1238,7 +1260,7 @@ impl Omnigraph {
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::BranchMerge,
target_branch,
self.audit_actor_id.clone(),
actor_id.map(str::to_string),
recovery_pins,
);
// Carry the source branch's HEAD commit id so the recovery
@ -1267,7 +1289,7 @@ impl Omnigraph {
CandidateTableState::AdoptSourceState => {
publish_adopted_source_state(
self,
self.catalog(),
&self.catalog(),
base_snapshot,
source_snapshot,
&target_snapshot,
@ -1315,6 +1337,7 @@ impl Omnigraph {
manifest_version,
target_head_commit_id,
source_head_commit_id,
actor_id,
)
.await?;

View file

@ -345,8 +345,8 @@ async fn validate_edge_insert_endpoints(
edge_name: &str,
assignments: &HashMap<String, Literal>,
) -> Result<()> {
let edge_type = db
.catalog()
let catalog = db.catalog();
let edge_type = catalog
.edge_types
.get(edge_name)
.ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;
@ -688,13 +688,8 @@ impl Omnigraph {
params: &ParamMap,
actor_id: Option<&str>,
) -> Result<MutationResult> {
let previous_actor = self.audit_actor_id.clone();
self.audit_actor_id = actor_id.map(str::to_string);
let result = self
.mutate_with_current_actor(branch, query_source, query_name, params)
.await;
self.audit_actor_id = previous_actor;
result
self.mutate_with_current_actor(branch, query_source, query_name, params, actor_id)
.await
}
async fn mutate_with_current_actor(
@ -703,6 +698,7 @@ impl Omnigraph {
query_source: &str,
query_name: &str,
params: &ParamMap,
actor_id: Option<&str>,
) -> Result<MutationResult> {
self.ensure_schema_state_valid().await?;
let requested = Self::normalize_branch_name(branch)?;
@ -737,11 +733,19 @@ impl Omnigraph {
Err(e) => Err(e),
Ok(total) if staging.is_empty() => Ok(total),
Ok(total) => {
let (updates, expected_versions, sidecar_handle) = staging
.finalize(
let staged = staging.stage_all(self, requested.as_deref()).await?;
// `_queue_guards` holds per-(table_key, branch) write
// queues acquired inside `commit_all`. Held across the
// manifest publish below so no concurrent writer can
// interleave between our commit_staged and our publish
// (which would correctly fail our CAS but leave Lance
// HEAD advanced — the residual class MR-870 recovers).
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
.commit_all(
self,
requested.as_deref(),
crate::db::manifest::SidecarKind::Mutation,
actor_id,
)
.await?;
// Failpoint that wedges the documented finalize→publisher
@ -759,6 +763,7 @@ impl Omnigraph {
requested.as_deref(),
&updates,
&expected_versions,
actor_id,
)
.await?;
// Phase C succeeded — sidecar can be deleted. If this
@ -804,7 +809,7 @@ impl Omnigraph {
let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
.map_err(|e| OmniError::manifest(e.to_string()))?;
let checked = typecheck_query_decl(self.catalog(), &query_decl)?;
let checked = typecheck_query_decl(&self.catalog(), &query_decl)?;
match checked {
CheckedQuery::Mutation(_) => {}
CheckedQuery::Read(_) => {

View file

@ -13,11 +13,12 @@ impl Omnigraph {
) -> Result<QueryResult> {
self.ensure_schema_state_valid().await?;
let resolved = self.resolved_target(target).await?;
let catalog = self.catalog();
let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
.map_err(|e| OmniError::manifest(e.to_string()))?;
let type_ctx = typecheck_query(self.catalog(), &query_decl)?;
let ir = lower_query(self.catalog(), &query_decl, &type_ctx)?;
let type_ctx = typecheck_query(&catalog, &query_decl)?;
let ir = lower_query(&catalog, &query_decl, &type_ctx)?;
let needs_graph = ir
.pipeline
@ -34,7 +35,7 @@ impl Omnigraph {
params,
&resolved.snapshot,
graph_index.as_deref(),
self.catalog(),
&catalog,
)
.await
}
@ -52,19 +53,19 @@ impl Omnigraph {
) -> Result<QueryResult> {
self.ensure_schema_state_valid().await?;
let snapshot = self.snapshot_at_version(version).await?;
let catalog = self.catalog();
let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
.map_err(|e| OmniError::manifest(e.to_string()))?;
let type_ctx = typecheck_query(self.catalog(), &query_decl)?;
let ir = lower_query(self.catalog(), &query_decl, &type_ctx)?;
let type_ctx = typecheck_query(&catalog, &query_decl)?;
let ir = lower_query(&catalog, &query_decl, &type_ctx)?;
let needs_graph = ir
.pipeline
.iter()
.any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. }));
let graph_index = if needs_graph {
let edge_types = self
.catalog()
let edge_types = catalog
.edge_types
.iter()
.map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
@ -79,7 +80,7 @@ impl Omnigraph {
params,
&snapshot,
graph_index.as_deref(),
self.catalog(),
&catalog,
)
.await
}

View file

@ -210,24 +210,21 @@ impl MutationStaging {
}
/// End-of-query: for each pending table, concat batches and commit via
/// `stage_append` or `stage_merge_insert` followed by `commit_staged`.
/// Merge with inline-committed entries. Return `(updates,
/// expected_versions)` for `commit_updates_on_branch_with_expected`.
/// **Phase A** of the two-phase commit: stage uncommitted fragments
/// for every table in `pending`. No Lance HEAD movement, no sidecar,
/// no manifest publish. Returns a [`StagedMutation`] carrying the
/// staged transactions so a future MR-686 queue acquisition step can
/// run between staging (slow S3 PUTs, no queue) and commit (fast,
/// under per-`(table_key, branch)` queue).
///
/// Sequential per-table — no cross-table dependency, but a parallel
/// version is a perf optimization for multi-table writes (loader with
/// many node + edge types). v1 ships sequential; the fan-out can land
/// in a follow-up.
pub(crate) async fn finalize(
/// Sequential per-table for now — parallelizing across independent
/// Lance datasets is a perf follow-up; same loop structure as the
/// pre-split `finalize`.
pub(crate) async fn stage_all(
self,
db: &crate::db::Omnigraph,
branch: Option<&str>,
sidecar_kind: SidecarKind,
) -> Result<(
Vec<SubTableUpdate>,
HashMap<String, u64>,
Option<RecoverySidecarHandle>,
)> {
_branch: Option<&str>,
) -> Result<StagedMutation> {
let MutationStaging {
expected_versions,
paths,
@ -235,63 +232,17 @@ impl MutationStaging {
inline_committed,
} = self;
let mut updates: Vec<SubTableUpdate> =
inline_committed.into_values().collect();
// Sidecar protocol: build the per-table pin list BEFORE any Lance
// commit_staged runs, then write the sidecar so a crash between
// Phase B (this loop's commit_staged calls) and Phase C (the
// manifest publish in the caller) is recoverable on next open.
// Skipped when `pending` is empty (delete-only mutation; the D₂
// parse-time rule keeps deletes out of this code path so this
// branch is reached only for the inline-committed-only case).
let pins: Vec<SidecarTablePin> = pending
.iter()
.map(|(table_key, _)| {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing path for table '{}'",
table_key,
))
})?;
let expected = *expected_versions.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing expected version for table '{}'",
table_key,
))
})?;
Ok::<SidecarTablePin, OmniError>(SidecarTablePin {
table_key: table_key.clone(),
table_path: path.full_path.clone(),
expected_version: expected,
post_commit_pin: expected + 1,
table_branch: path.table_branch.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
let sidecar_handle = if pins.is_empty() {
None
} else {
let sidecar = new_sidecar(
sidecar_kind,
branch.map(|s| s.to_string()),
db.audit_actor_id.clone(),
pins,
);
Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?)
};
let mut staged_entries: Vec<StagedTableEntry> = Vec::with_capacity(pending.len());
for (table_key, table) in pending {
let path = paths.get(&table_key).ok_or_else(|| {
let path = paths.get(&table_key).cloned().ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing path for table '{}'",
"MutationStaging::stage_all: missing path for table '{}'",
table_key
))
})?;
let expected = *expected_versions.get(&table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing expected version for table '{}'",
"MutationStaging::stage_all: missing expected version for table '{}'",
table_key
))
})?;
@ -335,8 +286,8 @@ impl MutationStaging {
}
};
// Commit via Lance's two-phase write: stage produces
// uncommitted fragments + transaction; commit advances HEAD.
// Stage produces uncommitted fragments + transaction. No
// Lance HEAD advance until `commit_all` runs `commit_staged`.
let staged = match table.mode {
PendingMode::Append => {
db.table_store().stage_append(&ds, combined, &[]).await?
@ -353,16 +304,286 @@ impl MutationStaging {
.await?
}
};
staged_entries.push(StagedTableEntry {
table_key,
path,
expected_version: expected,
dataset: ds,
staged_write: staged,
});
}
Ok(StagedMutation {
inline_committed,
staged: staged_entries,
expected_versions,
paths,
})
}
}
/// Output of [`MutationStaging::stage_all`]. Carries the staged Lance
/// transactions (Phase A complete; uncommitted fragments written) plus
/// the per-table metadata needed to write the recovery sidecar, run
/// `commit_staged` (Phase B), and produce the publisher's input.
///
/// Splitting `stage_all` and `commit_all` is the structural prerequisite
/// for MR-686: a future commit can drop queue acquisition + manifest-pin
/// revalidation between Phase A and Phase B without touching staging
/// logic.
pub(crate) struct StagedMutation {
/// Updates from delete-touching ops (D₂ parse-time rule keeps
/// pending and inline_committed disjoint per table). Tables here
/// have already advanced Lance HEAD via inline `delete_where`;
/// `commit_all` builds sidecar pins for these too so the
/// commit→publish residual is recoverable for delete-only paths
/// (third-agent Finding 3).
inline_committed: HashMap<String, SubTableUpdate>,
/// One entry per table that had pending batches successfully staged.
staged: Vec<StagedTableEntry>,
/// Pre-write manifest version per table — the publisher's CAS fence.
expected_versions: HashMap<String, u64>,
/// Per-table identifiers from `MutationStaging::paths`. Carried
/// through so `commit_all` can build sidecar pins for both staged
/// and inline-committed tables.
paths: HashMap<String, StagedTablePath>,
}
/// Per-table state captured during `stage_all` and consumed by
/// `commit_all`. Holds the opened `Dataset` so `commit_staged` doesn't
/// re-open, and the `StagedWrite` whose `transaction` `commit_staged`
/// will execute.
struct StagedTableEntry {
table_key: String,
path: StagedTablePath,
expected_version: u64,
dataset: lance::Dataset,
staged_write: crate::table_store::StagedWrite,
}
impl StagedMutation {
/// **Phase B** of the two-phase commit: acquire per-`(table_key,
/// branch)` queues, revalidate manifest pins, write the recovery
/// sidecar, run `commit_staged` per table to advance Lance HEAD, and
/// return the publisher's input plus the queue guards.
///
/// **Caller must hold the returned `_guards` Vec across the
/// subsequent manifest publish.** Releasing guards before publish
/// would let another writer interleave their commit_staged between
/// ours and our publish — which would correctly fail our CAS but
/// leave Lance HEAD advanced (the residual class MR-870 recovers
/// from). Holding the guards across publish keeps the residual
/// unreachable for op-execution failures on the happy path.
///
/// Revalidation: between `stage_all` and `commit_all`, another
/// writer (in the same process or another process sharing the
/// repo) may have committed to one of our touched tables, advancing
/// the manifest pin past our `expected_version`. We revalidate
/// under the queue and fail-fast with `manifest_conflict` before
/// any `commit_staged` so the orphaned uncommitted fragments stay
/// unreferenced (cleaned by `cleanup_old_versions`'s age sweep)
/// rather than being committed and creating a Lance-HEAD-ahead
/// residual.
pub(crate) async fn commit_all(
self,
db: &crate::db::Omnigraph,
branch: Option<&str>,
sidecar_kind: SidecarKind,
actor_id: Option<&str>,
) -> Result<(
Vec<SubTableUpdate>,
HashMap<String, u64>,
Option<RecoverySidecarHandle>,
Vec<tokio::sync::OwnedMutexGuard<()>>,
)> {
let StagedMutation {
inline_committed,
staged,
expected_versions,
paths,
} = self;
// Acquire per-(table_key, branch) queues for every touched
// table — both staged and inline-committed. Sorted by
// `acquire_many` internally so all multi-table writers
// (mutation, branch_merge, schema_apply, future MR-870
// recovery) agree on acquisition order — prevents lock-order
// inversion deadlock.
//
// For inline-committed tables (delete-only mutations), Lance
// HEAD has already advanced inside `delete_where` before
// `commit_all` runs. Holding the queue here doesn't prevent
// that interleaving (commit 6 will move queue acquisition into
// `delete_where`'s call site); it does prevent another writer
// from interleaving between our delete and our publish, which
// would otherwise leave a Lance-HEAD-ahead residual the
// delete-only sidecar (added below) would have to recover.
let mut queue_keys: Vec<(String, Option<String>)> = Vec::with_capacity(
staged.len() + inline_committed.len(),
);
for entry in &staged {
queue_keys.push((entry.table_key.clone(), entry.path.table_branch.clone()));
}
for table_key in inline_committed.keys() {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing path for inline-committed table '{}'",
table_key
))
})?;
queue_keys.push((table_key.clone(), path.table_branch.clone()));
}
let guards = db.write_queue().acquire_many(&queue_keys).await;
// Revalidate manifest pins. Read fresh per-branch snapshot —
// in-memory `db.snapshot()` may be stale if another writer
// committed since our stage. If any pin moved past our
// expected_version, fail-fast before commit_staged moves
// Lance HEAD.
//
// Both staged and inline-committed tables are revalidated.
// Inline-committed tables (delete-only path) had their Lance
// HEAD advanced before this point, but the *manifest pin*
// shouldn't have moved if no other writer interleaved. If it
// has, return manifest_conflict — the sidecar emitted below
// captures (expected, post) so the next open's recovery sweep
// can resolve the Lance-HEAD-vs-manifest divergence.
//
// Note: under PR 1b's intermediate state (global server RwLock
// in place), this revalidation is a no-op because no concurrent
// writer can run. Becomes load-bearing once PR 2 removes the
// global lock — see `.context/pr-1b-plan.md` Risk 3.
if !staged.is_empty() || !inline_committed.is_empty() {
let snapshot = db.snapshot_for_branch(branch).await?;
for entry in &staged {
let current = snapshot.entry(&entry.table_key).map(|e| e.table_version);
match current {
Some(v) if v == entry.expected_version => {}
Some(other) => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' pin moved from {} to {} between stage and commit",
entry.table_key, entry.expected_version, other,
)));
}
None => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
entry.table_key,
)));
}
}
}
for table_key in inline_committed.keys() {
let expected = expected_versions.get(table_key).copied().ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
))
})?;
let current = snapshot.entry(table_key).map(|e| e.table_version);
match current {
Some(v) if v == expected => {}
Some(other) => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' pin moved from {} to {} between inline-commit and publish",
table_key, expected, other,
)));
}
None => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
table_key,
)));
}
}
}
}
// Sidecar protocol: build the per-table pin list and write the
// sidecar BEFORE any Lance commit_staged runs, so a crash
// between Phase B (this loop) and Phase C (the caller's manifest
// publish) is recoverable on next open.
//
// Pins cover BOTH staged tables (Lance HEAD will advance below
// when `commit_staged` runs) AND inline-committed tables
// (Lance HEAD already advanced inside `delete_where` — we still
// need a sidecar so that an upcoming publish failure is
// recoverable on next open). This closes the third-agent
// Finding 3 hazard: delete-only mutations would otherwise skip
// the sidecar, leaving any commit→publish residual unreachable
// by recovery.
let mut pins: Vec<SidecarTablePin> = Vec::with_capacity(
staged.len() + inline_committed.len(),
);
for entry in &staged {
pins.push(SidecarTablePin {
table_key: entry.table_key.clone(),
table_path: entry.path.full_path.clone(),
expected_version: entry.expected_version,
post_commit_pin: entry.expected_version + 1,
table_branch: entry.path.table_branch.clone(),
});
}
for (table_key, update) in &inline_committed {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing path for inline-committed table '{}'",
table_key
))
})?;
let expected = *expected_versions.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
))
})?;
pins.push(SidecarTablePin {
table_key: table_key.clone(),
table_path: path.full_path.clone(),
expected_version: expected,
// For inline-committed tables, the post-commit pin is
// the actual post-delete version recorded by
// `record_inline`, NOT `expected + 1` — `delete_where`
// can advance HEAD by more than one version (e.g.,
// when Lance internally compacts deletion vectors).
post_commit_pin: update.table_version,
table_branch: path.table_branch.clone(),
});
}
let mut updates: Vec<SubTableUpdate> = inline_committed.into_values().collect();
let sidecar_handle = if pins.is_empty() {
None
} else {
let sidecar = new_sidecar(
sidecar_kind,
branch.map(|s| s.to_string()),
actor_id.map(str::to_string),
pins,
);
Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?)
};
for entry in staged {
let StagedTableEntry {
table_key,
path,
expected_version: _,
dataset,
staged_write,
} = entry;
let new_ds = db
.table_store()
.commit_staged(Arc::new(ds), staged.transaction)
.commit_staged(Arc::new(dataset), staged_write.transaction)
.await?;
let state = db
.table_store()
.table_state(&path.full_path, &new_ds)
.await?;
updates.push(SubTableUpdate {
table_key: table_key.clone(),
table_key,
table_version: state.version,
table_branch: path.table_branch.clone(),
row_count: state.row_count,
@ -370,7 +591,7 @@ impl MutationStaging {
});
}
Ok((updates, expected_versions, sidecar_handle))
Ok((updates, expected_versions, sidecar_handle, guards))
}
}

View file

@ -90,13 +90,8 @@ impl Omnigraph {
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<IngestResult> {
let previous_actor = self.audit_actor_id.clone();
self.audit_actor_id = actor_id.map(str::to_string);
let result = self
.ingest_with_current_actor(branch, from, data, mode)
.await;
self.audit_actor_id = previous_actor;
result
self.ingest_with_current_actor(branch, from, data, mode, actor_id)
.await
}
pub async fn ingest_file(
@ -127,6 +122,7 @@ impl Omnigraph {
from: Option<&str>,
data: &str,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<IngestResult> {
self.ensure_schema_state_valid().await?;
let target_branch =
@ -143,7 +139,7 @@ impl Omnigraph {
.await?;
}
let result = self.load(&target_branch, data, mode).await?;
let result = self.load_as(&target_branch, data, mode, actor_id).await?;
Ok(IngestResult {
branch: target_branch,
base_branch,
@ -154,6 +150,16 @@ impl Omnigraph {
}
pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
self.load_as(branch, data, mode, None).await
}
pub async fn load_as(
&mut self,
branch: &str,
data: &str,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<LoadResult> {
self.ensure_schema_state_valid().await?;
// Reject internal `__run__*` / system-prefixed branches at the
// public write boundary. Direct-publish paths assert this
@ -169,7 +175,7 @@ impl Omnigraph {
// Direct-to-target writes: no Run state machine, no `__run__` staging
// branch. Cross-table OCC is enforced by the publisher's
// `expected_table_versions` CAS inside `load_jsonl_reader`.
self.load_direct_on_branch(requested.as_deref(), data, mode)
self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
.await
}
@ -188,9 +194,10 @@ impl Omnigraph {
branch: Option<&str>,
data: &str,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<LoadResult> {
let reader = BufReader::new(Cursor::new(data.as_bytes()));
load_jsonl_reader(self, branch, reader, mode).await
load_jsonl_reader(self, branch, reader, mode, actor_id).await
}
}
@ -232,6 +239,7 @@ async fn load_jsonl_reader<R: BufRead>(
branch: Option<&str>,
reader: R,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<LoadResult> {
let catalog = db.catalog().clone();
@ -537,15 +545,19 @@ async fn load_jsonl_reader<R: BufRead>(
// Phase 4: Atomic manifest commit with publisher-level OCC.
if use_staging {
let (updates, expected_versions, sidecar_handle) = staging
.finalize(db, branch, crate::db::manifest::SidecarKind::Load)
let staged = staging.stage_all(db, branch).await?;
// `_queue_guards` holds per-(table_key, branch) write queues
// across the manifest publish below — see exec/mutation.rs for
// the rationale (interleaving prevention).
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
.commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
.await?;
// Same finalize → publisher residual as mutations: per-table
// staged commits have advanced Lance HEAD, but the manifest
// publish has not run yet. Reuse the mutation failpoint name so
// one failpoint pins the shared `MutationStaging` boundary.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
.await?;
// The recovery sidecar protects the per-table commit_staged →
// manifest publish window. Phase C succeeded — clean up
@ -574,6 +586,7 @@ async fn load_jsonl_reader<R: BufRead>(
branch,
&overwrite_updates,
&overwrite_expected,
actor_id,
)
.await?;
}