Merge branch 'main' into ragnorc/omnigraph-mcp-crate

Folds in v0.7.1 (release #290 + optimize/write-path/internal-table-compaction
fixes #288/#291/#297) under the MCP branch.

Conflict resolutions (5 files):
- crates/omnigraph-server/Cargo.toml: take main's 0.7.1 path-dep constraints;
  keep our omnigraph-mcp dep (bumped to 0.7.1) + http dep.
- crates/omnigraph-server/src/handlers.rs: keep our server_list_queries
  doc-comment (exposed @mcp(expose) subset, invoke_query-gated) — it supersedes
  main's pre-@mcp(expose) text, since this branch adds the per-query expose flag.
- docs/user/operations/server.md: keep our GET /queries description
  (invoke_query gate + @mcp(expose) exposure) over main's read-gated/list-all text.
- docs/dev/index.md: keep both in-flight RFC rows; renumber this branch's tenancy
  RFC 013 -> 014 (rfc-014-tenancy-cells.md) since main now owns RFC-013
  (rfc-013-write-path-latency.md). Title + index link updated; link-check green.
- openapi.json: regenerated from merged source (OMNIGRAPH_UPDATE_OPENAPI=1) — now
  info.version 0.7.1 with our invoke_query/@mcp schema.

Coherence: omnigraph-mcp bumped 0.7.0 -> 0.7.1 to match the workspace; Cargo.lock
updated. cargo build --workspace green; server/mcp/api-types/compiler suites green
(schema_routes.rs reopen-after-apply flakes under parallel IO on a near-full disk,
passes single-threaded — a pre-existing main test, unchanged by the merge).
This commit is contained in:
Ragnor Comerford 2026-06-23 18:26:45 +02:00
commit adc36adf32
No known key found for this signature in database
44 changed files with 3595 additions and 528 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-api-types"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "Shared HTTP wire DTOs for Omnigraph — request/response types and engine-result → DTO mappings used by both omnigraph-server and omnigraph-cli (RFC-009). Plain serde/utoipa types; no transport or server internals."
license = "MIT"
@ -9,8 +9,8 @@ homepage = "https://github.com/ModernRelay/omnigraph"
documentation = "https://docs.rs/omnigraph-api-types"
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
serde = { workspace = true }
serde_json = { workspace = true }
utoipa = { workspace = true }

View file

@ -405,8 +405,8 @@ pub struct QueryCatalogEntry {
pub params: Vec<ParamDescriptor>,
}
/// Response for `GET /queries`: the `mcp.expose` subset of a graph's
/// stored-query registry, each with typed parameters.
/// Response for `GET /queries`: every stored query in a graph's
/// registry, each with typed parameters.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct QueriesCatalogOutput {
pub queries: Vec<QueryCatalogEntry>,

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-cli"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "CLI for the Omnigraph graph database."
license = "MIT"
@ -13,12 +13,12 @@ name = "omnigraph"
path = "src/main.rs"
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
omnigraph-api-types = { path = "../omnigraph-api-types", version = "0.7.0" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.7.0" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.0" }
omnigraph-server = { path = "../omnigraph-server", version = "0.7.0" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
omnigraph-api-types = { path = "../omnigraph-api-types", version = "0.7.1" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.7.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.1" }
omnigraph-server = { path = "../omnigraph-server", version = "0.7.1" }
clap = { workspace = true }
color-eyre = { workspace = true }
serde = { workspace = true }

View file

@ -397,8 +397,10 @@ pub(crate) enum ClusterCommand {
#[arg(long)]
json: bool,
},
/// Apply the config-only (query/policy) subset of the plan to the local
/// cluster catalog. Graph/schema changes are deferred to a later stage.
/// Converge the cluster to its config: create graphs, apply schema updates
/// (soft drops), write stored-query/policy catalog resources, and execute
/// approved graph deletes, in one ordered run. Serving picks up the applied
/// revision after an `omnigraph-server --cluster` restart.
Apply {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-cluster"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "Cluster configuration validation, planning, and config-only apply for Omnigraph."
license = "MIT"
@ -14,8 +14,8 @@ documentation = "https://docs.rs/omnigraph-cluster"
failpoints = ["dep:fail", "fail/failpoints", "omnigraph/failpoints"]
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.1" }
fail = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-compiler"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "Schema/query compiler for Omnigraph. Zero Lance dependency."
license = "MIT"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-mcp"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "MCP (Model Context Protocol) Streamable-HTTP transport and backend seam for Omnigraph. Contains the rmcp dependency and defines the McpBackend trait the server implements; names no omnigraph engine/server type, so the dependency edge is server → mcp."
license = "MIT"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-policy"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "Policy / authorization layer for Omnigraph — Cedar-backed PolicyEngine, PolicyChecker trait, ResourceScope enum."
license = "MIT"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-server"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "HTTP server for the Omnigraph graph database."
license = "MIT"
@ -19,14 +19,14 @@ default = []
aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"]
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.0" }
omnigraph-api-types = { path = "../omnigraph-api-types", version = "0.7.0" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.7.0" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.1" }
omnigraph-api-types = { path = "../omnigraph-api-types", version = "0.7.1" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.7.1" }
# The MCP surface. rmcp is contained to omnigraph-mcp — the server carries NO
# direct rmcp dependency (verify: `cargo tree -p omnigraph-server -e normal | grep rmcp`).
omnigraph-mcp = { path = "../omnigraph-mcp", version = "0.7.0" }
omnigraph-mcp = { path = "../omnigraph-mcp", version = "0.7.1" }
axum = { workspace = true }
http = "1"
clap = { workspace = true }

View file

@ -1060,7 +1060,7 @@ pub(crate) async fn server_invoke_query(
tag = "queries",
operation_id = "list_queries",
responses(
(status = 200, description = "Stored-query catalog (the mcp.expose subset, with typed params)", body = QueriesCatalogOutput),
(status = 200, description = "Stored-query catalog (every stored query, with typed params)", body = QueriesCatalogOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-engine"
version = "0.7.0"
version = "0.7.1"
edition = "2024"
description = "Runtime engine for the Omnigraph graph database."
license = "MIT"
@ -16,8 +16,8 @@ default = []
failpoints = ["dep:fail", "fail/failpoints"]
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.1" }
lance = { workspace = true }
lance-datafusion = { workspace = true }
datafusion = { workspace = true }
@ -52,7 +52,7 @@ chrono = { workspace = true }
arc-swap = { workspace = true }
[dev-dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
lance-io = "7.0.0"

View file

@ -396,7 +396,7 @@ pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
}
fn graph_commit_actors_uri(root_uri: &str) -> String {
pub(crate) fn graph_commit_actors_uri(root_uri: &str) -> String {
format!(
"{}/{}",
root_uri.trim_end_matches('/'),

View file

@ -14,6 +14,10 @@ mod layout;
mod metadata;
#[path = "manifest/migrations.rs"]
mod migrations;
// Entirely test-only since RFC-013 step 3a: with both reads (Fix 2) and writes
// bypassing the Lance namespace, nothing in production routes through it; the
// `LanceNamespace` impls are retained only to validate the contract in unit tests.
#[cfg(test)]
#[path = "manifest/namespace.rs"]
mod namespace;
#[path = "manifest/publisher.rs"]
@ -24,11 +28,11 @@ mod recovery;
mod state;
use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash};
use layout::{open_manifest_dataset, table_uri_for_path, type_name_hash};
pub(crate) use layout::manifest_uri;
pub(crate) use metadata::TableVersionMetadata;
#[cfg(test)]
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};

View file

@ -15,7 +15,7 @@ pub(super) fn type_name_hash(name: &str) -> String {
format!("{:016x}", h)
}
pub(super) fn manifest_uri(root: &str) -> String {
pub(crate) fn manifest_uri(root: &str) -> String {
format!("{}/{}", root.trim_end_matches('/'), MANIFEST_DIR)
}
@ -76,6 +76,7 @@ pub(super) fn table_uri_for_path(root_uri: &str, table_path: &str, branch: Optio
}
}
#[cfg(test)]
pub(super) fn namespace_internal_error(message: impl Into<String>) -> LanceNamespaceError {
LanceNamespaceError::namespace_source(Box::new(std::io::Error::other(message.into())))
}

View file

@ -2,7 +2,9 @@ use std::collections::HashMap;
use lance::Dataset;
use lance_namespace::Error as LanceNamespaceError;
use lance_namespace::models::{CreateTableVersionRequest, TableVersion};
use lance_namespace::models::CreateTableVersionRequest;
#[cfg(test)]
use lance_namespace::models::TableVersion;
use serde::{Deserialize, Serialize};
use crate::error::{OmniError, Result};
@ -142,6 +144,7 @@ impl TableVersionMetadata {
self.to_namespace_version_with_details(version, None, None)
}
#[cfg(test)]
pub(super) fn to_namespace_version_with_details(
&self,
version: u64,

View file

@ -1,3 +1,10 @@
// Both the read namespace (BranchManifestNamespace) and the write namespace
// (StagedTableNamespace) are now test-only contract validation. Reads open
// sub-tables directly by location+version (SubTableEntry::open, Fix 2), and
// writes open the table head directly by URI (TableStore::open_dataset_head,
// RFC-013 step 3a), so nothing in production routes through the Lance namespace
// anymore. These impls are retained only to validate the LanceNamespace
// contract in unit tests.
use std::sync::Arc;
use async_trait::async_trait;
@ -16,30 +23,21 @@ use object_store::{
use crate::error::{OmniError, Result};
use super::layout::{namespace_internal_error, table_uri_for_path};
#[cfg(test)]
use super::layout::{open_manifest_dataset, table_id_to_key};
use super::metadata::TableVersionMetadata;
#[cfg(test)]
use super::metadata::{namespace_version_metadata, parse_namespace_version_request};
#[cfg(test)]
use super::layout::{
namespace_internal_error, open_manifest_dataset, table_id_to_key, table_uri_for_path,
};
use super::metadata::{
TableVersionMetadata, namespace_version_metadata, parse_namespace_version_request,
};
use super::publisher::GraphNamespacePublisher;
// The read namespace (BranchManifestNamespace) is test-only since Fix 2: reads
// open sub-tables directly by location+version (SubTableEntry::open), so nothing
// in production routes a read through the Lance namespace. The writes path uses
// StagedTableNamespace. These items are retained to validate the namespace
// contract in unit tests.
#[cfg(test)]
use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state};
#[cfg(test)]
#[derive(Debug, Clone)]
struct BranchManifestNamespace {
root_uri: String,
branch: Option<String>,
}
#[cfg(test)]
impl BranchManifestNamespace {
fn new(root_uri: &str, branch: Option<&str>) -> Self {
Self {
@ -146,7 +144,6 @@ impl StagedTableNamespace {
}
}
#[cfg(test)]
pub(crate) fn branch_manifest_namespace(
root_uri: &str,
branch: Option<&str>,
@ -165,27 +162,6 @@ pub(crate) fn staged_table_namespace(
))
}
async fn load_table_from_namespace(
namespace: Arc<dyn LanceNamespace>,
table_key: &str,
branch: Option<&str>,
version: Option<u64>,
) -> Result<Dataset> {
let builder = DatasetBuilder::from_namespace(namespace, vec![table_key.to_string()])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let builder = match (branch, version) {
(Some(branch), version) => builder.with_branch(branch, version),
(None, Some(version)) => builder.with_version(version),
(None, None) => builder,
};
builder
.load()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
#[cfg(test)]
#[async_trait]
impl LanceNamespace for BranchManifestNamespace {
fn namespace_id(&self) -> String {
@ -540,18 +516,3 @@ impl LanceNamespace for StagedTableNamespace {
Ok(response)
}
}
pub(crate) async fn open_table_head_for_write(
root_uri: &str,
table_key: &str,
table_path: &str,
branch: Option<&str>,
) -> Result<Dataset> {
load_table_from_namespace(
staged_table_namespace(root_uri, table_key, table_path, branch),
table_key,
branch,
None,
)
.await
}

View file

@ -248,10 +248,8 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
tasks
};
if table_tasks.is_empty() {
return Ok(Vec::new());
}
// NB: do NOT early-return when `table_tasks` is empty (a schema with no
// node/edge types) — the internal system tables below must still be compacted.
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
@ -279,7 +277,42 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
}
}
stats.into_iter().collect()
// Compact the internal system tables too (RFC-013 step 2). They are not
// catalog-tracked, so they take a separate, simpler path (`compact_internal_table`):
// compact in place, no manifest publish, no sidecar. Appended after the
// data-table stats so the data-table cache invalidation above is computed from
// data-table stats only; each internal compaction does its own coordinator
// refresh for cache coherence.
let mut all = stats;
// One source of truth for the internal system tables optimize compacts. The
// commit graph is THREE tables, not one: the DAG (`_graph_commits`), the actor
// map (`_graph_commit_actors`, appended by every *authenticated* write — the
// production server/CLI path always carries an actor), and the manifest. Missing
// any leaves an O(history) scan on a live write path. `__manifest` is always
// present (created at init); the two commit-graph tables may be absent (the
// coordinator opens them as `Option`, gated on existence — graphs predating the
// commit graph, and the actor table is itself optional), so guard each with the
// same existence check rather than letting `Dataset::open` error and fail the
// whole optimize.
let root = db.root_uri();
let internal_tables: [(&str, String); 3] = [
("__manifest", crate::db::manifest::manifest_uri(root)),
(
"_graph_commits",
crate::db::commit_graph::graph_commits_uri(root),
),
(
"_graph_commit_actors",
crate::db::commit_graph::graph_commit_actors_uri(root),
),
];
for (table_key, uri) in internal_tables {
if table_key == "__manifest" || db.storage_adapter().exists(&uri).await? {
all.push(compact_internal_table(db, table_key, uri).await);
}
}
all.into_iter().collect()
}
/// Compact one table and publish the compacted version to the `__manifest`.
@ -326,187 +359,306 @@ async fn optimize_one_table(
.acquire_many(&[(table_key.clone(), None)])
.await;
// `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`.
// The `TableStorage` trait deliberately does not surface it (the staged-write
// invariant covers writes; compaction is a separate concern). Unwrap the
// opaque `SnapshotHandle` via `into_dataset()` (`pub(crate)`, gated to the
// maintenance path).
let handle = db
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let mut ds = handle.into_dataset();
// Survive a CROSS-PROCESS race (a CLI `optimize` vs the served server): the
// in-process write queue above serializes only same-process writers, so we also
// retry. Two failure modes, two retry levels:
// * Outer loop — a genuine Lance `Rewrite`-vs-`Update/Delete` same-fragment
// conflict (compaction did NOT commit). Reopen at the new HEAD and re-plan,
// exactly as the internal-table path does. (Lance rebases the common disjoint
// case — a concurrent insert/delete on other fragments — for free, so this
// fires only on real overlap.)
// * Inner loop (Phase C) — the manifest advanced under us between our
// compaction and our publish. The compaction IS committed at Lance HEAD, so
// we must NOT reopen (that would trip the HEAD>manifest drift guard on our
// own work); instead re-read the current manifest version and either no-op
// (the manifest already moved past our version — being linear, it descends
// from and includes our compaction) or fast-forward to it. Monotonic, never
// the strict equality CAS that manufactured the bug.
//
// The Phase-A sidecar is written ONCE on the first productive attempt and reused
// across reopen attempts: every Phase-B commit is content-preserving, so a crash
// mid-retry leaves the table readable and recovery either rolls the observed HEAD
// forward (pin still matches the manifest) or safely rolls the compaction back.
let mut sidecar: Option<crate::db::manifest::RecoverySidecarHandle> = None;
// CAS baseline: the table's current manifest version, read under the queue
// (in-memory coordinator snapshot, no storage I/O — stable for this section).
let expected_version = db
.fresh_snapshot_for_branch(None)
.await?
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
// Tracks whether one of OUR Phase-B ops (auto-cleanup strip / compact / reindex)
// already committed and advanced Lance HEAD past the manifest in a prior attempt.
// Once true, a reopened `lance_head > manifest` is our own sidecar-covered work,
// NOT external drift — so the drift guard and the no-op early-return must not treat
// it as such (that would drop our committed work as uncovered drift).
let mut head_advanced = false;
let lance_head_version = ds.version().version;
if lance_head_version < expected_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, expected_version
)));
}
if lance_head_version > expected_version {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
manifest_version = expected_version,
lance_head_version,
"skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \
to classify and publish covered maintenance drift explicitly",
);
return Ok(TableOptimizeStats::skipped_for_drift(
table_key,
expected_version,
lance_head_version,
));
}
// Outer loop: open → plan → Phase B, reopening + re-planning on a retryable
// Lance conflict. Breaks with the committed snapshot once Phase B succeeds.
let mut attempt: u32 = 0;
let (snapshot, metrics, pending_indexes, committed) = loop {
attempt += 1;
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment).
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let will_compact = plan.num_tasks() > 0;
// Even when there is nothing to compact, the table may still have index
// work: rows appended since the index was built (e.g. via `ingest --mode
// merge`) are scanned unindexed until folded in (needs_reindex), OR a
// declared `@index` was never built — schema apply records the intent but
// defers the physical build (iss-848), so optimize is the operator-facing
// reconciler that materializes it (needs_index_create). Any of the three is
// enough to enter the publish path. If NONE, this table is a no-op and must
// NOT be pinned in a sidecar — a zero-commit pin classifies NoMovement on
// recovery and forces an all-or-nothing rollback of sibling tables'
// legitimate work. Uncovered pre-existing manifest/HEAD drift is skipped
// above and goes through explicit repair, so this only runs on a healthy
// table under the per-table queue + sidecar.
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
// needs_index_work_* checks "a declared index is missing AND row_count > 0",
// so empty tables stay no-ops (never pinned). It re-reads the head under the
// queue we already hold, so it is consistent with `ds`.
let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") {
super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None).await?
} else {
super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await?
};
if !will_compact && !needs_reindex && !needs_index_create {
return Ok(TableOptimizeStats::compacted(
table_key,
&CompactionMetrics::default(),
false,
));
}
// `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`.
// The `TableStorage` trait deliberately does not surface it; unwrap the
// opaque `SnapshotHandle` via `into_dataset()` (gated to the maintenance path).
let mut ds = db
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?
.into_dataset();
// Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or
// index optimize), so a crash before the manifest publish rolls forward on
// next open.
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
// optimize is system-attributed (no `optimize_as` actor API today).
None,
vec![crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: full_path.clone(),
expected_version,
// Lower bound — compaction commits N≥1 versions (reserve + rewrite);
// the classifier loose-matches SidecarKind::Optimize.
post_commit_pin: expected_version + 1,
// Optimize uses the loose match (drift is derived state), not
// BranchMerge's Phase-B confirmation — left None.
confirmed_version: None,
table_branch: None,
}],
);
let handle =
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;
// CAS baseline: the table's current manifest version, re-read each attempt
// (a reopen means the manifest may have advanced).
let expected_version = db
.fresh_snapshot_for_branch(None)
.await?
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
// Phase B: compaction (if any) then incremental index optimize — both
// advance Lance HEAD inside the sidecar window. `compact_files` rewrites
// fragments and drops them from existing index segments' coverage;
// `optimize_indices` folds the rewritten and any previously-unindexed
// fragments back in (Lance's incremental merge, not a full retrain). This
// is the same compact -> optimize_indices sequencing LanceDB's `optimize()`
// uses. `optimize_indices` is an inline-commit residual: lance-6.0.1
// exposes no uncommitted variant, so like `compact_files` it commits
// directly and relies on the sidecar for recovery.
let version_before = ds.version().version;
let metrics: CompactionMetrics = if will_compact {
compact_files(&mut ds, options, None)
let lance_head_version = ds.version().version;
if lance_head_version < expected_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, expected_version
)));
}
if !head_advanced && lance_head_version > expected_version {
// Pre-existing EXTERNAL uncovered drift (we have not advanced HEAD yet) —
// go through explicit repair. Once `head_advanced` is set, a reopened
// `lance_head > manifest` is our own prior Phase-B commit (sidecar-covered)
// that the publish below fast-forwards, NOT external drift, so this guard is
// skipped on those retries.
if let Some(h) = sidecar.take() {
let _ = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await;
}
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
manifest_version = expected_version,
lance_head_version,
"skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \
to classify and publish covered maintenance drift explicitly",
);
return Ok(TableOptimizeStats::skipped_for_drift(
table_key.clone(),
expected_version,
lance_head_version,
));
}
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment).
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
} else {
CompactionMetrics::default()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let will_compact = plan.num_tasks() > 0;
// Even with nothing to compact, the table may still have index work
// (needs_reindex: rows appended since the index was built; needs_index_create:
// a declared `@index` whose physical build schema apply deferred, iss-848).
// Any of the three enters the publish path. If NONE, this is a no-op and must
// NOT be pinned in a sidecar (a zero-commit pin classifies NoMovement on
// recovery and rolls back siblings).
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") {
super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None)
.await?
} else {
super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await?
};
if !will_compact && !needs_reindex && !needs_index_create {
if head_advanced {
// Nothing left to compact, but a prior attempt already advanced HEAD
// (e.g. the strip committed, then compaction conflicted, and the reopen
// is now already compacted). Publish that committed work instead of
// dropping it as uncovered drift.
break (
crate::storage_layer::SnapshotHandle::new(ds),
CompactionMetrics::default(),
Vec::new(),
true,
);
}
if let Some(h) = sidecar.take() {
let _ = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await;
}
return Ok(TableOptimizeStats::compacted(
table_key.clone(),
&CompactionMetrics::default(),
false,
));
}
// Phase A: recovery sidecar BEFORE any HEAD-advancing op, written once and
// reused across reopen attempts.
if sidecar.is_none() {
let sc = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
// optimize is system-attributed (no `optimize_as` actor API today).
None,
vec![crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: full_path.clone(),
expected_version,
// Lower bound — compaction commits N≥1 versions (reserve + rewrite);
// the classifier loose-matches SidecarKind::Optimize.
post_commit_pin: expected_version + 1,
confirmed_version: None,
table_branch: None,
}],
);
sidecar = Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sc).await?,
);
}
// Test seam: a concurrent (cross-process) writer can interleave here, before
// any Phase-B commit lands, to exercise the reopen+replan path.
crate::failpoints::maybe_fail("optimize.before_compact")?;
// Phase B: scrub stale auto_cleanup (keeps optimize non-destructive on a
// graph upgraded from a pre-v7 binary whose `compact_files`/`optimize_indices`
// commits would otherwise fire Lance's auto-cleanup GC hook), compact,
// incremental reindex, then materialize declared-but-missing indexes. Each is
// an inline-commit residual covered by the sidecar. A retryable Lance conflict
// here means a concurrent writer preempted an overlapping fragment → reopen at
// the new HEAD and re-plan. Baseline captured BEFORE the scrub so that if the
// scrub is the only commit, `committed` still triggers the Phase-C publish.
let version_before = ds.version().version;
match clear_stale_auto_cleanup_config(&mut ds).await {
// `true` ⇒ the strip committed and advanced HEAD past the manifest.
Ok(stripped) => head_advanced |= stripped,
Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => {
continue;
}
Err(e) => return Err(OmniError::Lance(e.to_string())),
}
let metrics: CompactionMetrics = if will_compact {
match compact_files(&mut ds, options, None).await {
Ok(m) => {
head_advanced = true;
m
}
Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => {
continue;
}
Err(e) => return Err(OmniError::Lance(e.to_string())),
}
} else {
CompactionMetrics::default()
};
// Test seam: inject one retryable reindex conflict AFTER compaction has
// committed (so HEAD is already ahead of the manifest from our own work),
// exercising the own-HEAD (not external) drift classification on the next
// reopened attempt.
if crate::failpoints::maybe_fail("optimize.inject_reindex_conflict").is_err()
&& attempt < COMPACTION_RETRY_BUDGET
{
continue;
}
match ds.optimize_indices(&OptimizeOptions::default()).await {
Ok(()) => {}
Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => {
continue;
}
Err(e) => {
return Err(OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)));
}
}
let catalog = db.catalog();
let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let pending_indexes: Vec<super::PendingIndex> =
super::table_ops::build_indices_on_dataset_for_catalog(
db,
&catalog,
&table_key,
&mut snapshot,
)
.await?;
// optimize_indices / index build may also have committed (folded fragments,
// built a deferred index). Any HEAD advance this attempt counts too.
let version_after = snapshot.dataset().version().version;
head_advanced |= version_after != version_before;
break (snapshot, metrics, pending_indexes, head_advanced);
};
ds.optimize_indices(&OptimizeOptions::default())
.await
.map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?;
// Materialize any declared-but-missing index over the just-compacted layout,
// reusing the build chokepoint (idempotent: skips existing indexes; fault-
// isolates an untrainable vector column into `pending` rather than failing).
// Run it UNCONDITIONALLY now that we are past the no-op gate — not only when
// `needs_index_create`. A table can enter this path for compaction or
// reindex while its sole missing index is an untrainable Vector column
// (which `needs_index_work_*` does not count as buildable work); calling the
// build here is what surfaces that column in `pending_indexes`, so optimize
// can't compact a table yet silently drop the deferred-index signal.
// Idempotent + cheap when there is nothing to build. Vector index creation
// is an inline-commit residual; the Optimize sidecar's loose post_commit_pin
// covers the extra commits.
let catalog = db.catalog();
let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let pending_indexes: Vec<super::PendingIndex> =
super::table_ops::build_indices_on_dataset_for_catalog(
db,
&catalog,
&table_key,
&mut snapshot,
)
.await?;
let version_after = snapshot.dataset().version().version;
let committed = version_after != version_before;
// Pin the per-writer Phase B → Phase C residual for optimize: Lance HEAD has
// advanced but the manifest publish below hasn't run.
// Pin the per-writer Phase B → Phase C residual: Lance HEAD has advanced but the
// manifest publish below hasn't run.
crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?;
// Phase C: publish the compacted version to the manifest (one CAS commit,
// expected = the version observed under the queue). On failure the sidecar
// is intentionally left for the open-time recovery sweep to roll forward.
// Phase C: monotonic fast-forward publish. The compaction is committed at Lance
// HEAD `N`; publish a manifest pointer that includes it. If a concurrent writer
// already advanced the manifest to ≥ N (it built on our compaction), there is
// nothing to do. Otherwise advance to N; a concurrent advance during this window
// is a retryable manifest conflict — re-read the current version and re-evaluate
// (NOT a reopen: the compaction is already committed).
if committed {
let state = db.storage().table_state(&full_path, &snapshot).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.clone(), expected_version);
db.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&[update], &expected, None)
.await?;
let mut published = false;
let mut last_conflict: Option<OmniError> = None;
for _ in 0..COMPACTION_RETRY_BUDGET {
let current = current_manifest_version(db, &table_key).await?;
if current >= state.version {
// The manifest already points at a version that includes our
// compaction (Lance versions are linear). Nothing to publish.
published = true;
break;
}
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata.clone(),
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.clone(), current);
match db
.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&[update], &expected, None)
.await
{
Ok(_) => {
published = true;
break;
}
// A retryable manifest conflict means the manifest moved under us —
// loop and re-read `current` (the top check converges if it now
// already includes our compaction). Record it for the exhaustion path.
Err(e) if is_retryable_manifest_conflict(&e) => last_conflict = Some(e),
// Leave the sidecar for the open-time recovery sweep to roll forward.
Err(e) => return Err(e),
}
}
if !published {
// Budget exhausted under sustained contention. The final conflict may
// itself mean a concurrent writer published a version that already
// includes our (content-preserving) compaction — the postcondition is
// "the manifest reflects our compaction," not "we won the CAS" — so
// re-check before surfacing an error (§6.6).
let current = current_manifest_version(db, &table_key).await?;
if current < state.version {
return Err(last_conflict.unwrap_or_else(|| {
OmniError::manifest_conflict(format!(
"optimize publish of {table_key} exhausted {COMPACTION_RETRY_BUDGET} \
retries against concurrent writers"
))
}));
}
}
}
// Phase D: delete the sidecar (best-effort; recovery resolves a leftover).
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it"
);
if let Some(h) = sidecar.take() {
if let Err(err) = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = h.operation_id.as_str(),
"optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it"
);
}
}
let mut stat = TableOptimizeStats::compacted(table_key, &metrics, committed);
@ -514,6 +666,196 @@ async fn optimize_one_table(
Ok(stat)
}
/// Bound on the app-level retry of an internal-table compaction against a
/// concurrent live writer (see [`is_retryable_lance_conflict`]).
const COMPACTION_RETRY_BUDGET: u32 = 5;
/// A Lance commit error that means "a concurrent writer preempted us; reload the
/// dataset and rerun." `compact_files` commits via `commit_compaction` ->
/// `apply_commit` *directly* — unlike the merge-insert path it is NOT wrapped in
/// `execute_with_retry`, so a `Rewrite`-vs-`Merge`/`Update`/`Delete` `check_txn`
/// conflict propagates raw instead of being rebased or converted to
/// `TooMuchWriteContention`. Lance's transaction spec prescribes that the
/// *application* reruns these, which is what `compact_internal_table` does — so a
/// maintenance compaction (a physical op) never fails a live write (a logical op),
/// invariant 7. (`TooMuchWriteContention` is included for the exhausted-retry form
/// some commit paths surface.)
fn is_retryable_lance_conflict(err: &lance::Error) -> bool {
matches!(
err,
lance::Error::RetryableCommitConflict { .. }
| lance::Error::CommitConflict { .. }
| lance::Error::TooMuchWriteContention { .. }
)
}
/// A manifest publish conflict that optimize's monotonic Phase-C loop re-evaluates
/// (re-read the current version, then no-op or fast-forward). Both shapes that reach
/// here are `Conflict`-kind and mean "the manifest moved under us; reconsider," never
/// a lost update: the typed `ExpectedVersionMismatch` (a concurrent writer advanced
/// the table) and the publisher's exhausted row-level CAS (`manifest_conflict`).
fn is_retryable_manifest_conflict(err: &OmniError) -> bool {
matches!(
err,
OmniError::Manifest(m) if m.kind == crate::error::ManifestErrorKind::Conflict
)
}
/// The table's current manifest version on `main` (0 if absent), read fresh. Used by
/// optimize's monotonic publish loop to decide no-op (`current >= N`) vs fast-forward.
async fn current_manifest_version(db: &Omnigraph, table_key: &str) -> Result<u64> {
Ok(db
.fresh_snapshot_for_branch(None)
.await?
.entry(table_key)
.map(|e| e.table_version)
.unwrap_or(0))
}
/// Remove any stored `lance.auto_cleanup.*` config from a table so compaction
/// stays **non-destructive by construction**. Used by both the internal-table
/// path ([`compact_internal_table`]) and the data-table path
/// ([`optimize_one_table`]).
///
/// `compact_files` / `optimize_indices` commit with a default `CommitConfig`
/// (`skip_auto_cleanup = false`) and `CompactionOptions` exposes no override, so on
/// a dataset whose stored config has `lance.auto_cleanup.interval` set, the
/// compaction/reindex commit would fire Lance's auto-cleanup hook (version GC) —
/// deletion of old versions, including ones `__manifest` pins for snapshots /
/// time-travel (data tables) or that hold lineage/time-travel state (internal
/// tables). New graphs create tables with `auto_cleanup: None` (`manifest/graph.rs`,
/// `commit_graph.rs`, and the data-table create path) so there is nothing to clear;
/// only pre-`auto_cleanup`-fix *upgraded* graphs carry the config. OmniGraph owns
/// version cleanup explicitly (`cleanup`), so Lance's hook is unwanted regardless —
/// clearing it both makes `optimize` non-destructive and aligns the table with the
/// new-graph posture. The `delete_config_keys` commit itself does not GC: the
/// resulting manifest no longer has the `interval` key, so the post-commit hook is a
/// no-op. Returns whether any config was cleared (it advances Lance HEAD iff so).
/// Recovery coverage differs by caller: the data-table path runs this inside the
/// Optimize sidecar window; the internal-table path needs none (it commits at HEAD
/// and is read at HEAD — the strip is a content-preserving config commit, so a crash
/// leaves the table readable and content-identical, see [`compact_internal_table`]).
async fn clear_stale_auto_cleanup_config(
ds: &mut lance::Dataset,
) -> std::result::Result<bool, lance::Error> {
let keys: Vec<String> = ds
.config()
.keys()
.filter(|k| k.starts_with("lance.auto_cleanup."))
.cloned()
.collect();
if keys.is_empty() {
return Ok(false);
}
// Merge-update with `None` values to delete the keys — the non-deprecated
// replacement for `delete_config_keys` (awaiting the builder merges rather
// than replacing the whole config map).
let entries: Vec<(&str, Option<&str>)> = keys.iter().map(|k| (k.as_str(), None)).collect();
ds.update_config(entries).await?;
Ok(true)
}
/// Compact one INTERNAL system table (`__manifest` / `_graph_commits` /
/// `_graph_commit_actors`) in place.
///
/// Unlike catalog data tables, the internal tables are not tracked in the
/// `__manifest` (they ARE the manifest / the lineage DAG): readers open them at
/// their latest Lance HEAD, so compaction just advances that HEAD and the next
/// reader transparently observes the compacted version. That makes this path much
/// simpler than [`optimize_one_table`] — no manifest publish (nothing to publish
/// to), and no recovery sidecar. The sidecar-free claim does NOT rest on
/// single-commit atomicity: `compact_files` can emit a `ReserveFragments` commit
/// before the final `Rewrite` (and the config strip is a separate commit before
/// both), so this advances HEAD over one or more commits. It needs no sidecar
/// because every one of those commits is content-preserving and the table is read
/// at HEAD — a crash at any point leaves the table readable and content-identical,
/// and the next `optimize` re-plans. Internal tables carry no Lance index (only
/// `object_id`'s unenforced-PK schema metadata), so no `optimize_indices`.
///
/// Concurrency: no application lock, but `compact_files` does NOT auto-retry a
/// semantic conflict — its `Operation::Rewrite` commits through `apply_commit`
/// directly (not the merge-insert `execute_with_retry` path), so a `Rewrite`
/// vs concurrent `Update`/`Merge`/`Delete` `check_txn` conflict propagates raw.
/// We own the retry here (see [`is_retryable_lance_conflict`]): on a retryable
/// conflict, reopen at the new HEAD and rerun. A follow-up coordinator `refresh`
/// makes the warm internal-table handles observe the compacted HEAD
/// deterministically (the version probe would also self-heal on the next read).
async fn compact_internal_table(
db: &Omnigraph,
table_key: &str,
uri: String,
) -> Result<TableOptimizeStats> {
// App-level retry against concurrent live writers. compact_files does NOT
// auto-retry a Rewrite-vs-live-write conflict (see is_retryable_lance_conflict),
// so optimize would otherwise fail spuriously on a live graph. On a retryable
// conflict we re-open at the new HEAD and rerun — the canonical Lance-consumer
// pattern. Each attempt opens fresh because the conflict means the version moved.
for attempt in 0..COMPACTION_RETRY_BUDGET {
let handle = db
.storage()
.open_dataset_head_for_write(table_key, &uri, None)
.await?;
let mut ds = handle.into_dataset();
// Keep optimize non-destructive by construction (see clear_stale_auto_cleanup_config).
// Returns whether it committed a config-strip (which advances Lance HEAD).
let cleared_config = match clear_stale_auto_cleanup_config(&mut ds).await {
Ok(cleared) => cleared,
Err(e) => {
if attempt + 1 < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e)
{
continue;
}
return Err(OmniError::Lance(e.to_string()));
}
};
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if plan.num_tasks() == 0 {
// No compaction work, but a config-strip still advanced HEAD — refresh
// the warm coordinator handles so they observe it deterministically
// (same cache-coherence step the successful-compaction path takes
// below; otherwise they stay pinned until the next version probe).
if cleared_config {
db.coordinator.write().await.refresh().await?;
}
return Ok(TableOptimizeStats::compacted(
table_key.to_string(),
&CompactionMetrics::default(),
false,
));
}
match compact_files(&mut ds, options, None).await {
Ok(metrics) => {
// Cache coherence: re-open the warm coordinator's internal-table
// handles at the compacted HEAD (they live in `db.coordinator`, not
// the data-table `runtime_cache`).
db.coordinator.write().await.refresh().await?;
return Ok(TableOptimizeStats::compacted(
table_key.to_string(),
&metrics,
true,
));
}
Err(e)
if attempt + 1 < COMPACTION_RETRY_BUDGET
&& is_retryable_lance_conflict(&e) =>
{
continue;
}
Err(e) => return Err(OmniError::Lance(e.to_string())),
}
}
Err(OmniError::manifest_conflict(format!(
"internal-table compaction of {table_key} exhausted {COMPACTION_RETRY_BUDGET} \
retries against concurrent writers"
)))
}
/// Run Lance `cleanup_old_versions` on every node + edge table on `main`,
/// using [`CleanupPolicyOptions`]. The latest manifest is always preserved
/// regardless (Lance invariant).
@ -912,6 +1254,26 @@ mod tests {
use crate::failpoints::ScopedFailPoint;
use crate::loader::{LoadMode, load_jsonl};
/// The internal-table compaction retry classifier: a concurrent live writer
/// preempting our `Rewrite` is retryable (Lance prescribes app-rerun, and
/// compact_files does not auto-retry it); a non-conflict error is not (must not
/// be masked by a blind retry).
#[test]
fn retryable_lance_conflicts_are_classified() {
assert!(is_retryable_lance_conflict(
&lance::Error::retryable_commit_conflict_source(
1,
Box::new(std::io::Error::other("preempted by concurrent write")),
)
));
assert!(is_retryable_lance_conflict(
&lance::Error::too_much_write_contention("contended")
));
assert!(!is_retryable_lance_conflict(&lance::Error::invalid_input(
"not a conflict"
)));
}
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {

View file

@ -24,7 +24,7 @@ use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::sync::Arc;
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
use crate::db::manifest::TableVersionMetadata;
use crate::db::{Snapshot, SubTableEntry};
use crate::error::{OmniError, Result};
use crate::storage_layer::ForkOutcome;
@ -160,9 +160,15 @@ impl TableStore {
dataset_uri: &str,
branch: Option<&str>,
) -> Result<Dataset> {
let ds = Dataset::open(dataset_uri)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Direct open by URI (O(1) latest-resolution). Routed through the tracked
// opener so a cost test counts it via the per-query `table_wrapper`
// (no-op in production — the task-local is unset, so this is exactly
// `Dataset::open(uri)`).
let ds = crate::instrumentation::open_dataset_tracked(
dataset_uri,
crate::instrumentation::table_wrapper(),
)
.await?;
match branch {
Some(branch) if branch != "main" => ds
.checkout_branch(branch)
@ -178,8 +184,14 @@ impl TableStore {
dataset_uri: &str,
branch: Option<&str>,
) -> Result<Dataset> {
let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
// RFC-013 step 3a: open writes via the direct opener (O(1)) instead of the
// lance-namespace builder, which re-resolved the table's version chain
// O(depth) per write. The namespace is a catalog/discovery layer, not a
// per-open hot-path component (RFC §2.4); the manifest already holds the
// location, and `ensure_expected_version` still validates head == pinned
// for strict ops. `table_key` retained for signature stability.
let _ = table_key;
self.open_dataset_head(dataset_uri, branch).await
}
pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {

View file

@ -3089,6 +3089,7 @@ edge WorksAt: Person -> Company
/// forward on next open so the manifest tracks the Lance HEAD — and the healed
/// table must then accept a schema apply (the original bug's victim).
#[tokio::test]
#[serial(optimize)]
async fn optimize_phase_b_failure_recovered_on_next_open() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
@ -3178,6 +3179,242 @@ async fn optimize_phase_b_failure_recovered_on_next_open() {
.expect("schema apply after optimize recovery must succeed");
}
/// Cross-process race (the prod bug): a served write advances the manifest on the
/// same table while a SEPARATE `optimize` process is paused between its compaction
/// and its manifest publish. The in-process write queue does NOT serialize across
/// processes, so optimize's equality-CAS publish (expected = its pre-compaction
/// version) finds the manifest already advanced. optimize must CONVERGE — the
/// concurrent write built on top of the compacted HEAD, so the compaction is
/// already reflected — not fail with "expected X but current Y". RED before the
/// monotonic-publish fix.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial(optimize)]
async fn optimize_survives_concurrent_insert_advancing_manifest() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
// Pause optimize BEFORE it compacts, so the concurrent insert lands while
// HEAD == manifest (no in-flight optimize drift for the writer to trip on); the
// insert advances the manifest, then optimize compacts on top and must converge
// its publish over the advanced manifest rather than fail the equality CAS.
let failpoint = ScopedFailPoint::new("optimize.before_compact", "pause");
let uri_opt = uri.clone();
let optimize = tokio::spawn(async move {
let db = Omnigraph::open(&uri_opt).await.unwrap();
db.optimize().await
});
// Wait until optimize reaches the pause (its Optimize sidecar is on disk).
assert!(
wait_for_sidecar(dir.path()).await,
"optimize never reached the pre-compact pause",
);
// Concurrent insert on the SAME table via a SEPARATE handle (= separate
// in-process write queue = a different process) advances the manifest.
{
let db_b = Omnigraph::open(&uri).await.unwrap();
db_b.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "eve")], &[("$age", 34)]),
)
.await
.unwrap();
}
drop(failpoint); // release optimize
let result = tokio::time::timeout(std::time::Duration::from_secs(20), optimize)
.await
.expect("optimize task hung")
.unwrap();
result.expect("optimize must survive a concurrent same-table write (cross-process)");
// No lost write: 4 seed + eve all present; graph remains re-optimizable.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
5,
"concurrent insert must not be lost",
);
db.optimize()
.await
.expect("graph must remain healthy / re-optimizable");
}
/// Cross-process race: a served DELETE commits on the same table while a SEPARATE
/// `optimize` process is parked just before its compaction. Lance rebases the
/// compaction past the delete cleanly (so this surfaces as a manifest-CAS mismatch
/// at publish, not a Lance `Rewrite` conflict — the genuine `Rewrite`-vs-`Rewrite`
/// overlap is the rarer many-fragment/concurrent-compaction case, covered by the
/// shared `is_retryable_lance_conflict` retry the internal-table path already
/// exercises). optimize must converge its publish over the advanced manifest and
/// preserve the delete. RED before the fix.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial(optimize)]
async fn optimize_survives_concurrent_delete_before_compaction() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
// Pause optimize BEFORE its compaction commits.
let failpoint = ScopedFailPoint::new("optimize.before_compact", "pause");
let uri_opt = uri.clone();
let optimize = tokio::spawn(async move {
let db = Omnigraph::open(&uri_opt).await.unwrap();
db.optimize().await
});
assert!(
wait_for_sidecar(dir.path()).await,
"optimize never reached the pre-compact pause",
);
// Concurrent DELETE of an existing row writes a deletion vector onto the
// fragment optimize is about to compact → optimize's Rewrite overlap-conflicts
// at the Lance level ("Rewrite … preempted by concurrent Delete/Update").
{
let db_b = Omnigraph::open(&uri).await.unwrap();
db_b.mutate(
"main",
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "alice")], &[]),
)
.await
.unwrap();
}
drop(failpoint); // release optimize
let result = tokio::time::timeout(std::time::Duration::from_secs(20), optimize)
.await
.expect("optimize task hung")
.unwrap();
result.expect("optimize must reopen+replan past a concurrent overlapping delete");
// No lost write: alice's delete persisted (3 rows); graph remains re-optimizable.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
3,
"the concurrent delete must persist (alice removed)",
);
db.optimize()
.await
.expect("graph must remain healthy / re-optimizable");
}
/// Regression: the outer compaction retry loop must NOT misclassify optimize's OWN
/// committed Phase-B work as external drift. Attempt 1 compacts (HEAD → V+1); if a
/// LATER Phase-B op (reindex) then hits a retryable conflict, the reopened attempt
/// sees Lance HEAD ahead of the manifest — from OUR compaction, not an external
/// writer. The drift guard must skip it (we hold the sidecar) and converge, not
/// delete the sidecar and return `skipped_for_drift` (which would strand uncovered
/// drift). Reproduced by injecting one retryable reindex conflict after the compact.
#[tokio::test]
#[serial(optimize)]
async fn optimize_retry_does_not_misclassify_own_head_drift() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
// Inject exactly one retryable reindex conflict: attempt 1 compacts (HEAD+1) then
// "conflicts" on reindex → retry; attempt 2 reopens with HEAD ahead of the manifest
// from our own compaction — the misclassification trigger.
let _failpoint = ScopedFailPoint::new("optimize.inject_reindex_conflict", "1*return");
let db = Omnigraph::open(&uri).await.unwrap();
let stats = db
.optimize()
.await
.expect("optimize must converge, not misclassify its own HEAD drift");
let person = stats
.iter()
.find(|s| s.table_key == "node:Person")
.expect("node:Person stat present");
assert!(
person.skipped.is_none(),
"node:Person must converge, not skipped_for_drift: {:?}",
person.skipped,
);
// No uncovered drift stranded: a follow-up optimize is clean and all rows read.
let stats2 = db.optimize().await.unwrap();
let person2 = stats2
.iter()
.find(|s| s.table_key == "node:Person")
.unwrap();
assert!(
person2.skipped.is_none(),
"follow-up optimize must be clean (no stranded drift): {:?}",
person2.skipped,
);
assert_eq!(helpers::count_rows(&db, "node:Person").await, 4);
}
/// Poll until `optimize` has written its recovery sidecar (i.e. reached Phase B
/// and is about to / has compacted), signalling it is parked at its failpoint.
async fn wait_for_sidecar(root: &std::path::Path) -> bool {
let recovery_dir = root.join("__recovery");
for _ in 0..1000 {
if recovery_dir.exists()
&& std::fs::read_dir(&recovery_dir)
.map(|d| d.count() > 0)
.unwrap_or(false)
{
return true;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
false
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_phase_b_failure_recovered_on_next_open() {

View file

@ -0,0 +1,377 @@
//! Shared cost-budget test harness (RFC-013) — the single place the IO-counting
//! plumbing lives, so `warm_read_cost.rs`, `write_cost.rs`, and the S3 variant
//! assert in one vocabulary instead of duplicating `probes()` + raw `IOTracker`
//! reads. Three clean abstractions: structured counts, a `measure` primitive, a
//! named flat-assertion, plus store-agnostic backend fixtures.
//!
//! The data-table wrapper is a **path-classifying** counter (`PrefixCounter`), not a
//! plain `IOTracker`: it splits each read into the **opener** term (latest-version
//! resolution — reads of `_versions/`/`.manifest` objects) vs the **scan** term
//! (data-fragment reads, `data/`/`*.lance`). That lets a cost test isolate the
//! opener (RFC-013 step 3a's target, O(1) after the bypass) from the merge-insert/RI
//! scan (O(fragment-count), compaction's domain) even though both ride the same
//! `Dataset` — without controlling the fixture (no compaction needed). `__manifest`
//! and `_graph_commits` keep the plain `IOTracker` (no sub-prefixes worth splitting).
#![allow(dead_code)]
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::stream::BoxStream;
use lance::io::WrappingObjectStore;
use lance_io::utils::tracking_store::IOTracker;
use object_store::path::Path;
use object_store::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
};
use omnigraph::db::Omnigraph;
use omnigraph::instrumentation::{
MergeWriteProbes, QueryIoProbes, with_merge_write_probes, with_query_io_probes,
};
use omnigraph::loader::{LoadMode, load_jsonl};
use super::{MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, init_and_load, mixed_params};
/// Object-store op counts for one measured operation, by table class — the
/// vocabulary cost tests assert in (vs raw `IOTracker::stats().read_iops`).
#[derive(Debug, Clone, Copy, Default)]
pub struct IoCounts {
/// Per-table DATA opens (node/edge tables). The dominant write-path term.
pub data_reads: u64,
pub data_writes: u64,
/// DATA-table reads attributed to latest-version resolution (`_versions/`,
/// `.manifest`). This is the **opener** term step 3a flattened — isolated from
/// the scan, so it can be gated directly without compacting the fixture.
pub data_opener_reads: u64,
/// DATA-table reads attributed to data fragments (`data/`, `*.lance`) — the
/// merge-insert/RI **scan**, which grows with fragment count (compaction's
/// domain, not the opener).
pub data_scan_reads: u64,
/// `__manifest` registry scans (publish state).
pub manifest_reads: u64,
/// `_graph_commits` lineage scans.
pub commit_graph_reads: u64,
/// Version-probe invocations (the cheap freshness check).
pub version_probes: u64,
}
impl IoCounts {
pub fn total_reads(&self) -> u64 {
self.data_reads + self.manifest_reads + self.commit_graph_reads
}
}
/// Which staged-write primitives an operation invoked (from `MergeWriteProbes`).
#[derive(Debug, Clone, Copy, Default)]
pub struct StagedCounts {
pub stage_append: u64,
pub stage_merge_insert: u64,
pub create_vector_index: u64,
pub scan_staged_combined: u64,
}
// ── Path-classifying data-table read counter ──
/// How a data-table object read is attributed.
enum ReadClass {
/// Latest-version resolution: `_versions/`, `.manifest`, `_latest`.
Opener,
/// Data fragments: `data/`, `*.lance`.
Scan,
/// Anything else (indices, deletion files, …) — counted in the total only.
Other,
}
/// Classify a Lance object path by its role in a write open. Lance's on-object
/// layout is identical on local FS and S3, so this split is backend-independent.
fn classify(path: &Path) -> ReadClass {
let p = path.as_ref();
if p.contains("_versions") || p.ends_with(".manifest") || p.contains("_latest") {
ReadClass::Opener
} else if p.contains("/data/") || p.starts_with("data/") || p.ends_with(".lance") {
ReadClass::Scan
} else {
ReadClass::Other
}
}
#[derive(Debug, Default, Clone, Copy)]
struct PrefixCounts {
reads: u64,
writes: u64,
opener_reads: u64,
scan_reads: u64,
}
/// A `WrappingObjectStore` that counts reads/writes and attributes each read to the
/// opener vs scan term by object-key prefix. Shares its tally via `Arc<Mutex<…>>` so
/// the wrapped store (handed to Lance) and the test read the same counters.
#[derive(Debug, Default, Clone)]
struct PrefixCounter(Arc<Mutex<PrefixCounts>>);
impl PrefixCounter {
fn record_read(&self, location: &Path) {
let mut c = self.0.lock().unwrap();
c.reads += 1;
match classify(location) {
ReadClass::Opener => c.opener_reads += 1,
ReadClass::Scan => c.scan_reads += 1,
ReadClass::Other => {}
}
}
fn record_write(&self) {
self.0.lock().unwrap().writes += 1;
}
fn snapshot(&self) -> PrefixCounts {
*self.0.lock().unwrap()
}
}
impl WrappingObjectStore for PrefixCounter {
fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(PrefixCountingStore {
target,
counter: self.clone(),
})
}
}
/// The wrapped `ObjectStore` that records each call into a [`PrefixCounter`].
/// Implements only the required core `ObjectStore` methods (object_store 0.13: the
/// convenience surface — `get`/`put`/`head`/`get_range`/… — lives on
/// `ObjectStoreExt` and is provided by a blanket impl that routes through `get_opts`
/// / `put_opts`, so every read/write is still counted here). Per-read path
/// classification is the only addition over a plain pass-through.
#[derive(Debug)]
struct PrefixCountingStore {
target: Arc<dyn ObjectStore>,
counter: PrefixCounter,
}
impl fmt::Display for PrefixCountingStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "PrefixCountingStore({})", self.target)
}
}
#[async_trait]
impl ObjectStore for PrefixCountingStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> OSResult<PutResult> {
self.counter.record_write();
self.target.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> OSResult<Box<dyn MultipartUpload>> {
self.counter.record_write();
self.target.put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
self.counter.record_read(location);
self.target.get_opts(location, options).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, OSResult<Path>>,
) -> BoxStream<'static, OSResult<Path>> {
self.target.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
self.counter.record_read(&prefix.cloned().unwrap_or_default());
self.target.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, OSResult<ObjectMeta>> {
self.counter.record_read(&prefix.cloned().unwrap_or_default());
self.target.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
self.counter.record_read(&prefix.cloned().unwrap_or_default());
self.target.list_with_delimiter(prefix).await
}
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> OSResult<()> {
self.counter.record_write();
self.target.copy_opts(from, to, options).await
}
}
/// The tracker handles backing one measurement; read once into [`IoCounts`].
struct ProbeHandles {
manifest: IOTracker,
commit_graph: IOTracker,
table: PrefixCounter,
probe_count: Arc<AtomicU64>,
}
impl ProbeHandles {
fn install() -> (QueryIoProbes, Self) {
let h = ProbeHandles {
manifest: IOTracker::default(),
commit_graph: IOTracker::default(),
table: PrefixCounter::default(),
probe_count: Arc::new(AtomicU64::new(0)),
};
let probes = QueryIoProbes {
manifest_wrapper: Some(Arc::new(h.manifest.clone()) as Arc<dyn WrappingObjectStore>),
commit_graph_wrapper: Some(
Arc::new(h.commit_graph.clone()) as Arc<dyn WrappingObjectStore>
),
table_wrapper: Some(Arc::new(h.table.clone()) as Arc<dyn WrappingObjectStore>),
probe_count: Arc::clone(&h.probe_count),
};
(probes, h)
}
fn counts(&self) -> IoCounts {
let t = self.table.snapshot();
IoCounts {
data_reads: t.reads,
data_writes: t.writes,
data_opener_reads: t.opener_reads,
data_scan_reads: t.scan_reads,
manifest_reads: self.manifest.stats().read_iops,
commit_graph_reads: self.commit_graph.stats().read_iops,
version_probes: self.probe_count.load(Ordering::Relaxed),
}
}
}
/// Run `op` under object-store IO counting; return its output + the counts.
/// The only place the `QueryIoProbes` task-local + tracker wiring lives.
pub async fn measure<F: Future>(op: F) -> (F::Output, IoCounts) {
let (probes, handles) = ProbeHandles::install();
let out = with_query_io_probes(probes, op).await;
(out, handles.counts())
}
/// Like [`measure`], but also capture which staged-write primitives ran
/// (composes the two task-locals cleanly).
pub async fn measure_with_staged<F: Future>(op: F) -> (F::Output, IoCounts, StagedCounts) {
let (probes, handles) = ProbeHandles::install();
let merge = MergeWriteProbes::default();
let out = with_merge_write_probes(merge.clone(), with_query_io_probes(probes, op)).await;
let staged = StagedCounts {
stage_append: merge.stage_append_calls(),
stage_merge_insert: merge.stage_merge_insert_calls(),
create_vector_index: merge.create_vector_index_calls(),
scan_staged_combined: merge.scan_staged_combined_calls(),
};
(out, handles.counts(), staged)
}
/// Assert a per-depth metric is flat: the deepest sample must not exceed the
/// shallowest by more than `slack`. `select` picks the field; `what` names it in
/// the failure message. The shape every depth-swept cost gate uses.
pub fn assert_flat(
curve: &[(u64, IoCounts)],
select: impl Fn(&IoCounts) -> u64,
slack: u64,
what: &str,
) {
assert!(curve.len() >= 2, "assert_flat needs >= 2 depth points");
let (d_lo, lo) = (curve[0].0, select(&curve[0].1));
let (d_hi, hi) = (curve[curve.len() - 1].0, select(&curve[curve.len() - 1].1));
assert!(
hi <= lo + slack,
"{what} grew with history: depth {d_lo} = {lo} -> depth {d_hi} = {hi} (slack {slack})"
);
}
/// Assert a per-depth metric *does* grow with history by at least `min_delta` — the
/// dual of [`assert_flat`], used to prove a term is genuinely history-dependent (so a
/// flat sibling term isn't flat merely because nothing was measured).
pub fn assert_grows(
curve: &[(u64, IoCounts)],
select: impl Fn(&IoCounts) -> u64,
min_delta: u64,
what: &str,
) {
assert!(curve.len() >= 2, "assert_grows needs >= 2 depth points");
let (d_lo, lo) = (curve[0].0, select(&curve[0].1));
let (d_hi, hi) = (curve[curve.len() - 1].0, select(&curve[curve.len() - 1].1));
assert!(
hi >= lo + min_delta,
"{what} did not grow as expected: depth {d_lo} = {lo} -> depth {d_hi} = {hi} (min delta {min_delta})"
);
}
/// Measure one committing `insert_person` to `main` — the canonical write the cost
/// gates sweep over commit-history depth. Shared by `write_cost.rs` and
/// `write_cost_s3.rs` so the measured write is defined once.
pub async fn measure_insert(db: &mut Omnigraph, tag: &str) -> IoCounts {
let (res, io) = measure(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", tag)], &[("$age", 30)]),
))
.await;
res.unwrap();
io
}
/// Like [`measure_insert`] but carries an actor, so the write appends to and reads
/// `_graph_commit_actors.lance` — the authenticated (server/CLI) write path. The
/// commit-graph IO wrapper covers both `_graph_commits` and `_graph_commit_actors`,
/// so `IoCounts::commit_graph_reads` includes the actor-table scan on this path.
pub async fn measure_insert_as(db: &mut Omnigraph, tag: &str, actor: &str) -> IoCounts {
let (res, io) = measure(db.mutate_as(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", tag)], &[("$age", 30)]),
Some(actor),
))
.await;
res.unwrap();
io
}
// ── Backend fixtures — one knob, store-agnostic body ──
/// Local tempdir graph (default; deterministic, every-PR).
pub async fn local_graph(dir: &tempfile::TempDir) -> Omnigraph {
init_and_load(dir).await
}
/// Emulated-S3 graph, bucket-gated. Returns `None` **only** when
/// `OMNIGRAPH_S3_TEST_BUCKET` is unset, so the caller logs + skips — the
/// `tests/s3_storage.rs` graceful-skip pattern. Once the bucket *is* configured
/// (the rustfs CI job), any `init`/seed failure is a real failure and panics
/// rather than silently skipping — otherwise a down/misconfigured store would let
/// a bucket-gated gate pass vacuously. `name` disambiguates the prefix.
pub async fn s3_graph(name: &str) -> Option<Omnigraph> {
let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
let uri = format!("s3://{bucket}/cost-tests/{name}-{}", std::process::id());
let mut db = Omnigraph::init(&uri, TEST_SCHEMA)
.await
.expect("OMNIGRAPH_S3_TEST_BUCKET is set but S3 graph init failed");
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.expect("OMNIGRAPH_S3_TEST_BUCKET is set but S3 seed load failed");
Some(db)
}

View file

@ -1,5 +1,6 @@
#![allow(dead_code)]
pub mod cost;
pub mod recovery;
use arrow_array::{Array, RecordBatch, StringArray};
@ -181,6 +182,22 @@ pub async fn commit_many(db: &mut Omnigraph, n: usize) {
}
}
/// Like [`commit_many`] but every commit carries an actor, so it grows
/// `_graph_commit_actors.lance` too — the authenticated (server/CLI) write path.
pub async fn commit_many_as(db: &mut Omnigraph, n: usize, actor: &str) {
for i in 0..n {
db.mutate_as(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", &format!("commit_many_as_{i}"))], &[("$age", 30)]),
Some(actor),
)
.await
.unwrap();
}
}
pub async fn snapshot_main(db: &Omnigraph) -> Result<Snapshot> {
db.snapshot_of(ReadTarget::branch("main")).await
}

View file

@ -991,3 +991,80 @@ async fn unenforced_primary_key_is_immutable_once_set() {
revisit migrate_v1_to_v2's field-guard and re-pin docs/dev/lance.md."
);
}
// --- Guard 20: camelCase @index equality routes to the scalar index (#283) ----
//
// The #283 read-pushdown fix builds the filter column with datafusion `ident()`
// (case-preserving) instead of `col()` (SQL identifier normalization, which
// lowercases an unquoted name). The correctness tests in literal_filters.rs /
// writes.rs prove the right rows come back, but a result-only assertion also
// passes on a full-scan fallback — exactly the gap testing.md warns about. This
// guard pins the *plan*: an equality on a camelCase BTREE column must compile to
// a `ScalarIndexQuery` under the fix's expr shape, and must NOT under the old
// `col()` shape (which lowercases `repoName` → a nonexistent `reponame`). A
// regression that breaks camelCase index routing — or a revert to `col()` —
// turns this red instead of silently degrading to a full scan.
#[tokio::test]
async fn camelcase_index_equality_routes_to_scalar_index() {
use datafusion::physical_plan::displayable;
use datafusion::prelude::{col, ident, lit};
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("camelcase_index.lance");
let uri = uri.to_str().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("repoName", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(StringArray::from(vec![
"acme", "globex", "initech", "umbrella",
])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Create,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
let mut ds = Dataset::write(reader, uri, Some(params)).await.unwrap();
ds.create_index_builder(&["repoName"], IndexType::BTree, &ScalarIndexParams::default())
.replace(true)
.await
.unwrap();
async fn plan_str(ds: &Dataset, filter: datafusion::prelude::Expr) -> lance::Result<String> {
let mut scanner = ds.scan();
scanner.filter_expr(filter);
let plan = scanner.create_plan().await?;
Ok(format!("{}", displayable(plan.as_ref()).indent(true)))
}
// The fix's shape: ident() preserves case → resolves `repoName` → index.
let used = plan_str(&ds, ident("repoName").eq(lit("acme")))
.await
.expect("ident(\"repoName\") must plan against the case-preserved schema");
assert!(
used.contains("ScalarIndexQuery"),
"camelCase @index equality must route to the scalar index (not full scan), got:\n{used}"
);
// The pre-fix shape: col() normalizes `repoName` → `reponame`, which does not
// exist in the case-sensitive schema, so planning fails. This is precisely
// why `col()` could never reach the index and surfaced the #283 runtime error
// — it could not silently full-scan past the index either.
let err = plan_str(&ds, col("repoName").eq(lit("acme"))).await;
assert!(
err.is_err(),
"col() lowercases repoName→reponame against a case-sensitive schema; \
planning must fail rather than resolve, confirming ident() is required \
for camelCase index routing. got plan:\n{err:?}"
);
}

View file

@ -94,13 +94,23 @@ async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() {
let stats = db.optimize().await.unwrap();
// Schema declares 2 nodes + 2 edges = 4 tables. Compaction should run on
// each but find nothing to merge.
assert_eq!(stats.len(), 4);
// Schema declares 2 nodes + 2 edges = 4 data tables, plus the 3 internal
// system tables (`__manifest`, `_graph_commits`, `_graph_commit_actors`) optimize
// also compacts (RFC-013 step 2) = 7. Compaction should run on each but find
// nothing to merge.
assert_eq!(stats.len(), 7);
for s in &stats {
assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key);
assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key);
}
// The internal tables are present and reported as no-ops on an empty graph.
for key in ["__manifest", "_graph_commits", "_graph_commit_actors"] {
let s = stats
.iter()
.find(|s| s.table_key == key)
.unwrap_or_else(|| panic!("optimize stats missing internal table {key}"));
assert!(!s.committed, "{key} should be a no-op on an empty graph");
}
}
#[tokio::test]
@ -133,6 +143,224 @@ async fn optimize_after_load_then_again_is_idempotent() {
}
}
/// RFC-013 step 2: `optimize` compacts the internal system tables
/// (`__manifest`, `_graph_commits`), which accumulate one fragment per commit.
/// After compaction they shed fragments, write no recovery sidecar (a single
/// atomic Lance commit — no HEAD-before-publish gap), and the graph stays
/// coherent for subsequent reads + strict writes.
#[tokio::test]
async fn optimize_compacts_internal_tables() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Build version-history depth so the internal tables accumulate fragments.
for i in 0..20 {
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", &format!("p{i}"))], &[("$age", 30)]),
)
.await
.unwrap();
}
let stats = db.optimize().await.unwrap();
for key in ["__manifest", "_graph_commits"] {
let s = stats
.iter()
.find(|s| s.table_key == key)
.unwrap_or_else(|| panic!("optimize stats missing internal table {key}"));
assert!(s.committed, "{key} should compact after 20 commits");
assert!(
s.fragments_removed > 0,
"{key} should shed fragments, removed {}",
s.fragments_removed
);
}
// Internal compaction leaks no recovery sidecar.
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let leftover: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
assert!(
leftover.is_empty(),
"optimize leaked recovery sidecars: {leftover:?}"
);
}
// Coherent after internal compaction: reads + a strict write still work.
assert!(count_rows(&db, "node:Person").await > 0);
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "after_compact")], &[("$age", 40)]),
)
.await
.unwrap();
}
/// `optimize` must not fail on a graph that has no `_graph_commits.lance` — a valid
/// state the coordinator opens as `commit_graph = None` (graphs predating the commit
/// graph). Without the existence guard, `Dataset::open` on the absent table errors
/// and fails the whole optimize. Regression for the missing-existence-guard.
///
/// Uses an EMPTY graph deliberately: a graph with data would publish during
/// optimize, and a publish records a graph commit that recreates `_graph_commits`
/// before the guard runs — masking the bug. With no data, nothing recreates it, so
/// the table stays absent through the guard.
#[tokio::test]
async fn optimize_tolerates_absent_graph_commits_table() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
// Simulate a graph with no commit-graph dataset.
std::fs::remove_dir_all(dir.path().join("_graph_commits.lance")).unwrap();
// Coordinator tolerates the absence; optimize must succeed (the guard skips the
// absent table rather than letting `Dataset::open` error) and omit its stat.
let db = Omnigraph::open(uri).await.unwrap();
let stats = db.optimize().await.unwrap();
assert!(
stats.iter().any(|s| s.table_key == "__manifest"),
"__manifest must still be compacted"
);
assert!(
!stats.iter().any(|s| s.table_key == "_graph_commits"),
"absent _graph_commits must be skipped, not opened (would error)"
);
}
/// `optimize` must stay NON-DESTRUCTIVE on a pre-`auto_cleanup`-fix upgraded graph:
/// `compact_files` would otherwise fire the dataset's stored `lance.auto_cleanup.*`
/// hook (version GC) during the compaction commit. Internal-table compaction clears
/// that stale config first, so no versions are deleted. Without the clear, the
/// aggressive policy below GCs old versions and the count drops.
#[tokio::test]
async fn optimize_clears_stale_auto_cleanup_and_preserves_versions() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
for i in 0..5 {
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", &format!("v{i}"))], &[("$age", 30)]),
)
.await
.unwrap();
}
let manifest_uri = format!("{}/__manifest", dir.path().to_str().unwrap());
// Simulate an upgraded graph: an aggressive stored auto_cleanup config that, if
// it fired during compaction, would GC old versions.
{
let mut ds = Dataset::open(&manifest_uri).await.unwrap();
ds.update_config([
("lance.auto_cleanup.interval", Some("1")),
("lance.auto_cleanup.older_than", Some("0s")),
])
.await
.unwrap();
}
let versions_before = Dataset::open(&manifest_uri)
.await
.unwrap()
.versions()
.await
.unwrap()
.len();
db.optimize().await.unwrap();
let ds = Dataset::open(&manifest_uri).await.unwrap();
// (a) the stale auto_cleanup config was cleared (non-destructive by construction).
assert!(
!ds.config().keys().any(|k| k.starts_with("lance.auto_cleanup.")),
"optimize must clear stale auto_cleanup config; config = {:?}",
ds.config()
);
// (b) no version GC: every pre-optimize version survives (compaction + the
// config-clear each add versions, so the count only grows).
let versions_after = ds.versions().await.unwrap().len();
assert!(
versions_after >= versions_before,
"optimize must not GC __manifest versions: before={versions_before} after={versions_after}"
);
}
/// The same non-destructive guarantee on a DATA (node/edge) table, not just the
/// internal tables. `optimize_one_table` runs `compact_files` / `optimize_indices`
/// with a default `CommitConfig` (`skip_auto_cleanup = false`); on an upgraded
/// graph whose Person table still carries the pre-v7 `lance.auto_cleanup.*` config,
/// those commits would fire Lance's version-GC hook and prune `__manifest`-pinned
/// data-table versions. The path must strip that config first. Without the strip,
/// the aggressive policy below GCs old versions and the config survives the run.
#[tokio::test]
async fn optimize_clears_stale_auto_cleanup_on_data_tables_too() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap().trim_end_matches('/').to_string();
let mut db = init_and_load(&dir).await;
add_person_fragments(&mut db).await; // multiple fragments → will_compact
// Simulate an upgraded graph: set an aggressive stored auto_cleanup config on
// the Person table. This is an out-of-band Lance commit (an `UpdateConfig` that
// advances HEAD past the manifest), so realign the manifest with a forced repair
// first — otherwise optimize skips the table as uncovered drift and never
// reaches the scrub. (Forced because UpdateConfig is not verified maintenance.)
let (_, _, person_full) = person_manifest_and_head(&db, &root).await;
{
let mut ds = Dataset::open(&person_full).await.unwrap();
ds.update_config([
("lance.auto_cleanup.interval", Some("1")),
("lance.auto_cleanup.older_than", Some("0s")),
])
.await
.unwrap();
}
db.repair(RepairOptions {
confirm: true,
force: true,
})
.await
.unwrap();
let versions_before = Dataset::open(&person_full)
.await
.unwrap()
.versions()
.await
.unwrap()
.len();
let rows_before = count_rows(&db, "node:Person").await;
db.optimize().await.unwrap();
let ds = Dataset::open(&person_full).await.unwrap();
// (a) the stale auto_cleanup config was cleared (non-destructive by construction).
assert!(
!ds.config().keys().any(|k| k.starts_with("lance.auto_cleanup.")),
"optimize must clear stale auto_cleanup config on data tables; config = {:?}",
ds.config()
);
// (b) no version GC: every pre-optimize version survives (compaction + the
// config-clear each add versions, so the count only grows).
let versions_after = ds.versions().await.unwrap().len();
assert!(
versions_after >= versions_before,
"optimize must not GC Person versions: before={versions_before} after={versions_after}"
);
// (c) data is intact — the run rewrote fragments, it did not drop rows.
assert_eq!(count_rows(&db, "node:Person").await, rows_before);
}
// PR3 (Workstream B): an existing scalar index does not cover fragments
// appended after it was built (build_indices is existence-gated), so those
// rows are scanned unindexed. `optimize` must fold them back in via Lance's

View file

@ -1,51 +1,22 @@
//! Cost-budget tests for the warm read path (Fix 1): a warm same-branch read
//! must perform no manifest or commit-graph opens, measured with Lance's
//! `IOTracker` at the object-store boundary (the LanceDB IO-counted-test
//! pattern; see docs/dev/testing.md). Guards invariant 15 (read cost bounded by
//! work, not history) for snapshot resolution, and invariant 6 (a warm reader
//! still observes external commits).
//! must perform no manifest or commit-graph opens, measured via the shared
//! `helpers::cost` harness at the object-store boundary (the LanceDB
//! IO-counted-test pattern; see docs/dev/testing.md). Guards invariant 15 (read
//! cost bounded by work, not history) for snapshot resolution, and invariant 6
//! (a warm reader still observes external commits).
mod helpers;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use arrow_array::{Array, StringArray};
use lance::io::WrappingObjectStore;
use lance_io::utils::tracking_store::IOTracker;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes};
use omnigraph_compiler::result::QueryResult;
use helpers::cost::measure;
use helpers::{
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
mutate_branch, mutate_main, params,
};
/// IO probes plus the tracker handles to read `read_iops` after the query.
/// Returns `(probes, manifest, commit_graph, table, probe_count)` — `table`
/// counts per-table data opens (the cache-miss path), so a cost test can assert
/// N opens on a cold read and 0 on a warm repeat (Fix 3).
fn probes() -> (
QueryIoProbes,
IOTracker,
IOTracker,
IOTracker,
Arc<AtomicU64>,
) {
let manifest = IOTracker::default();
let commit_graph = IOTracker::default();
let table = IOTracker::default();
let probe_count = Arc::new(AtomicU64::new(0));
let probes = QueryIoProbes {
manifest_wrapper: Some(Arc::new(manifest.clone()) as Arc<dyn WrappingObjectStore>),
commit_graph_wrapper: Some(Arc::new(commit_graph.clone()) as Arc<dyn WrappingObjectStore>),
table_wrapper: Some(Arc::new(table.clone()) as Arc<dyn WrappingObjectStore>),
probe_count: Arc::clone(&probe_count),
};
(probes, manifest, commit_graph, table, probe_count)
}
fn first_column_strings(result: &QueryResult) -> Vec<String> {
if result.num_rows() == 0 {
return Vec::new();
@ -75,18 +46,14 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
// Deep history: warm-read resolution cost must be flat in commit count.
commit_many(&mut db, 20).await;
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
// A warm same-branch read opens nothing from the internal tables, even at
// commit-history depth. Fix 1 reuses the coordinator (no re-open: 0
@ -95,18 +62,15 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
// per-table __manifest scan is gone too. Pre-fix, each of these is a deep scan
// of an internal table that grows with commit count.
assert_eq!(
manifest.stats().read_iops,
0,
io.manifest_reads, 0,
"warm same-branch read must not scan __manifest (resolution or per-table)"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"warm same-branch read must not open the commit graph (no coordinator re-open)"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
1,
io.version_probes, 1,
"warm same-branch read performs exactly one version probe"
);
}
@ -121,22 +85,17 @@ async fn multi_table_query_does_no_manifest_scans() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let (probes_in, manifest, _commit_graph, _table, _probe) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"age_stats",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"age_stats",
&params(&[]),
))
.await;
out.unwrap();
assert_eq!(
manifest.stats().read_iops,
0,
io.manifest_reads, 0,
"a multi-table read must not scan __manifest once per touched table"
);
}
@ -278,32 +237,25 @@ async fn warm_branch_read_does_no_manifest_scans() {
// Bind the handle's coordinator to the branch so reads of it take the warm path.
db.sync_branch("feature").await.unwrap();
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
assert_eq!(
manifest.stats().read_iops,
0,
io.manifest_reads, 0,
"warm branch read must not scan __manifest (branch-owned table opened by location)"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"warm branch read must not open the commit graph"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
1,
io.version_probes, 1,
"warm branch read performs exactly one version probe"
);
}
@ -369,18 +321,14 @@ async fn warm_read_on_recreated_branch_observes_new_incarnation() {
"test setup must exercise branch incarnation reuse at one Lance version"
);
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
let new_feature = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "MainOnly")]),
),
)
.await
.unwrap();
let (new_feature, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "MainOnly")]),
))
.await;
let new_feature = new_feature.unwrap();
assert_eq!(
new_feature.num_rows(),
@ -388,17 +336,15 @@ async fn warm_read_on_recreated_branch_observes_new_incarnation() {
"warm reader must refresh to the recreated branch incarnation"
);
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"recreated branch must re-read the manifest after the incarnation probe"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"same-branch incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
@ -469,39 +415,33 @@ async fn recreated_branch_owned_table_handle_uses_table_etag() {
"test setup must force table handle identity to differ only by e_tag"
);
let (probes_in, manifest, commit_graph, table, probe_count) = probes();
let new_person = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "NewOnly")]),
),
)
.await
.unwrap();
let (new_person, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "NewOnly")]),
))
.await;
let new_person = new_person.unwrap();
assert_eq!(
new_person.num_rows(),
1,
"warm reader must open the recreated branch-owned table incarnation"
);
assert!(
table.stats().read_iops > 0,
io.data_reads > 0,
"table e_tag must force a held-handle cache miss for the recreated table"
);
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"recreated branch must refresh the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"same-branch table-incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under each lock"
);
@ -594,35 +534,29 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() {
"test setup must force graph-index identity to differ only by snapshot incarnation"
);
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
let new_friends = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "NewWalker")]),
),
)
.await
.unwrap();
let (new_friends, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "NewWalker")]),
))
.await;
let new_friends = new_friends.unwrap();
assert_eq!(
first_column_strings(&new_friends),
vec!["Bob"],
"traversal must use the recreated branch's topology, not stale cached graph index"
);
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"recreated branch traversal must refresh the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"same-branch traversal incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under each lock"
);
@ -673,31 +607,25 @@ async fn stale_read_refreshes_manifest_only() {
.await
.unwrap();
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(reader.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"stale read must re-read the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"stale refresh must be manifest-only (no commit-graph scan)"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
@ -721,55 +649,40 @@ async fn repeat_warm_read_reuses_table_handles() {
commit_many(&mut db, 10).await;
// Cold first read: opens the touched table.
let (p1, _m1, _c1, table1, _pr1) = probes();
with_query_io_probes(
p1,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (cold_out, cold) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
cold_out.unwrap();
assert!(
table1.stats().read_iops > 0,
cold.data_reads > 0,
"the cold first read must open the table"
);
// Warm repeat: the held handle is reused, so no open happens through this
// query's table wrapper.
let (p2, manifest2, commit_graph2, table2, probe2) = probes();
with_query_io_probes(
p2,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
// query's table wrapper. A fresh `measure()` isolates the warm repeat's cost.
let (warm_out, warm) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
warm_out.unwrap();
assert_eq!(
table2.stats().read_iops,
0,
warm.data_reads, 0,
"a warm repeat read must reuse the held handle (0 table opens)"
);
assert_eq!(warm.manifest_reads, 0, "warm repeat read: 0 manifest opens");
assert_eq!(
manifest2.stats().read_iops,
0,
"warm repeat read: 0 manifest opens"
);
assert_eq!(
commit_graph2.stats().read_iops,
0,
warm.commit_graph_reads, 0,
"warm repeat read: 0 commit-graph opens"
);
assert_eq!(
probe2.load(Ordering::Relaxed),
1,
warm.version_probes, 1,
"warm repeat read: exactly one version probe"
);
}
@ -807,20 +720,16 @@ async fn write_invalidates_table_cache_for_changed_table() {
.unwrap();
// The next read re-opens Person at the new version (cache miss).
let (p, _m, _c, table, _pr) = probes();
with_query_io_probes(
p,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
assert!(
table.stats().read_iops > 0,
io.data_reads > 0,
"a read after a write to the table must re-open it (version-keyed miss)"
);

View file

@ -0,0 +1,171 @@
//! Cost-budget tests for the WRITE path (RFC-013 step 1) — the safety/latency
//! twin of `warm_read_cost.rs`, on the shared `helpers::cost` harness. A
//! committing write's per-table opens and internal-table scans must be bounded
//! and **flat across commit-history depth**, measured at the object-store
//! boundary. Guards invariant 15 (cost bounded by work, not history) on writes.
//!
//! **Backend split (see docs/dev/testing.md / RFC-013).** This file runs on
//! **local FS** and gates the **internal-table** term (`__manifest`/`_graph_commits`
//! fragment scans, ~+18/depth — O(fragments) on any backend, step 2's target).
//!
//! The **data-table opener** term (step 3a's win) is a per-object-store-RPC
//! phenomenon and is NOT gated here: local-FS latest-resolution is cheap whether
//! the open goes through the namespace builder or direct-by-URI, so the
//! namespace→direct switch is invisible on local. Measured: the local data-table
//! read count grows with depth too (~+0.9/depth), but that is a *different* term —
//! the merge-insert/RI scan reading O(depth) **fragments**, unchanged by the
//! opener switch (depth-100 = 92 ops both before and after step 3a, same slope)
//! and reduced only by compaction. The opener term shows up only on a real object
//! store (per-version GETs, ~+12/depth → flat after step 3a), so it is gated in
//! `write_cost_s3.rs` (bucket-gated). Same `measure`/`IoCounts` harness, different
//! backend; each term gated where it actually manifests.
#![recursion_limit = "512"]
mod helpers;
use helpers::cost::{
IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_insert_as,
measure_with_staged,
};
use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, mixed_params};
// ── (A) The internal-table LOCK — the acceptance test for step 2 (compaction) ──
//
// `__manifest` / `_graph_commits` / `_graph_commit_actors` scans on a write must be
// O(1) in commit-history depth **on a compacted graph**. Without internal-table
// compaction these scans are O(fragments) and grow forever; step 2 brings all three
// internal tables into `db.optimize()`, so after compaction the per-write scan is
// flat. The test runs the **authenticated (actorful) write path** — every commit
// carries an actor, so it grows `_graph_commit_actors.lance` too (the production
// server/CLI path); the commit-graph IO wrapper covers both that and `_graph_commits`,
// so `commit_graph_reads` includes the actor-table scan. It compacts at each depth
// checkpoint before measuring — pinning the production invariant "a periodically-
// compacted graph's write cost does not grow with version history."
#[tokio::test]
async fn internal_table_scans_are_flat_in_history() {
const ACTOR: &str = "act-cost-gate";
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many_as(&mut db, (d - current) as usize, ACTOR).await;
current = d;
}
// Step 2: compaction folds all three internal tables' O(depth) fragments back
// to a small constant, so the following write's scan of them is flat.
db.optimize().await.unwrap();
let io = measure_insert_as(&mut db, &format!("lock_{d}"), ACTOR).await;
current += 1; // the measured write advanced depth by one
eprintln!(
"depth~{d}: data={} __manifest={} _graph_commits+actors={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads
);
curve.push((d, io));
}
assert_flat(&curve, |c| c.manifest_reads, 4, "__manifest scan");
// commit_graph_reads covers BOTH _graph_commits and _graph_commit_actors (shared
// wrapper), so this also gates the actor table on the authenticated path.
assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits + _graph_commit_actors scan");
}
// The data-table OPENER history-gate (opener flat across depth) lives in
// `write_cost_s3.rs` — its history-dependence is an S3-only phenomenon. But the
// *probe that isolates* the opener (the `PrefixCounter` split) is validated here,
// every-PR, on local FS:
/// Proves the `PrefixCounter` opener/scan split: a committing write's data-table
/// reads divide into a **flat opener** term and a **growing scan** term. This pins
/// (a) the classifier actually attributes reads to the opener bucket (non-zero, so a
/// flat assertion isn't vacuously flat-at-zero), and (b) the local data-table growth
/// is the merge-insert/RI fragment scan, not the opener — which is *why* the S3
/// gate asserts `data_opener_reads`, not total `data_reads`. (On local FS the opener
/// is O(1) regardless of step 3a; the opener's history-dependence is gated on S3.)
#[tokio::test]
async fn data_table_reads_split_into_flat_opener_and_growing_scan() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("split_{d}")).await;
current += 1;
eprintln!(
"depth~{d}: opener={} scan={} data_total={}",
io.data_opener_reads, io.data_scan_reads, io.data_reads
);
curve.push((d, io));
}
assert!(
curve[0].1.data_opener_reads > 0,
"opener reads must be > 0 — the classifier missed version-resolution reads, \
so a flat opener assertion would be vacuous"
);
assert_flat(&curve, |c| c.data_opener_reads, 4, "local data-table opener");
assert_grows(&curve, |c| c.data_scan_reads, 20, "local data-table scan");
}
// ── (B) Green-today regression guards — run on every PR ──
/// A single insert's *data-table* write cost is O(1): the table commit is a small
/// constant number of writes, independent of history.
#[tokio::test]
async fn single_insert_data_write_is_bounded() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "w").await;
eprintln!("single insert: data_writes={}", io.data_writes);
assert!(io.data_writes <= 4, "data-table write_iops should be a small constant, got {}", io.data_writes);
}
/// At a fixed shallow depth, the per-write object-store read count is below a
/// documented ceiling. Fails the moment a change *adds* a round-trip on the write
/// path — the "no new round-trip" guard (calibrated: ~50 at depth ~5).
#[tokio::test]
async fn write_op_count_ceiling_at_shallow_depth() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "ceil").await;
eprintln!(
"depth~5: data={} __manifest={} _graph_commits={} total_reads={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads, io.total_reads()
);
const CEILING: u64 = 80;
assert!(
io.total_reads() <= CEILING,
"per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added",
io.total_reads()
);
}
// ── (C) Fitness assert via the staged-write probes ──
/// A keyed `Person` insert routes through `stage_merge_insert` exactly once, does
/// no `stage_append`, and no inline vector-index build. Pins the structural shape.
#[tokio::test]
async fn keyed_insert_routes_through_merge_insert_only() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let (res, _io, staged) = measure_with_staged(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "fit")], &[("$age", 30)]),
))
.await;
res.unwrap();
assert_eq!(staged.stage_merge_insert, 1, "keyed Person insert stages one merge-insert");
assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append");
assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert");
}

View file

@ -0,0 +1,71 @@
//! S3 (object-store) cost-budget gate for the WRITE path — the bucket-gated twin of
//! `write_cost.rs` that proves RFC-013 **step 3a's data-table opener win**. On the
//! shared `helpers::cost` harness (`measure`/`IoCounts`/`assert_flat`/`s3_graph`).
//!
//! The opener term is an **object-store-RPC phenomenon**: latest-version resolution
//! costs per-version GETs/HEADs on S3 (O(depth) before step 3a, when writes routed
//! through the lance-namespace builder), which local FS cannot reproduce (one cheap
//! `read_dir` regardless). After step 3a (direct-by-URI opens), the per-write
//! **data-table read count is FLAT across commit-history depth** — the measured 70%
//! win. This file is the red→green acceptance for that term (it would be RED on the
//! pre-3a `from_namespace` opener); `write_cost.rs` gates the internal-table term on
//! local every-PR.
//!
//! **Isolating the opener (important):** total `data_reads` is not opener-only — the
//! same wrapped `Dataset` backs the merge-insert/RI **scan**, which reads
//! O(fragment-count) and grows with history for a *different* reason (compaction's
//! domain, not the opener; this is the term that made the *local* data-table count
//! grow). The shared harness's `PrefixCounter` attributes each read by object-key
//! prefix, so this gate asserts `data_opener_reads` (reads of `_versions/`/`.manifest`)
//! **directly** — no compaction or fixture massaging needed. After step 3a the opener
//! is O(1) regardless of version-history depth; before it grew ~+12/depth (RFC §2.4
//! [M]). (See `write_cost.rs` for the local test that proves the split itself —
//! opener flat, scan growing.)
//!
//! Skips gracefully without `OMNIGRAPH_S3_TEST_BUCKET` (the `tests/s3_storage.rs`
//! pattern); runs for real in the rustfs CI job (`.github/workflows/ci.yml`).
#![recursion_limit = "512"]
mod helpers;
use helpers::cost::{IoCounts, assert_flat, measure_insert, s3_graph};
use helpers::commit_many;
/// After step 3a the data-table opener term is flat across depth on a real object
/// store (the measured win). RED on the pre-3a namespace-builder opener (O(depth)
/// per-version resolution).
#[tokio::test]
async fn data_table_opener_is_flat_in_history_on_s3() {
let Some(mut db) = s3_graph("write-cost-opener").await else {
eprintln!(
"SKIP data_table_opener_is_flat_in_history_on_s3: OMNIGRAPH_S3_TEST_BUCKET \
unset (or store unreachable) the S3 opener gate needs an object store"
);
return;
};
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 50] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("s3_{d}")).await;
current += 1;
eprintln!(
"depth~{d}: opener={} scan={} data_total={} __manifest={} _graph_commits={}",
io.data_opener_reads,
io.data_scan_reads,
io.data_reads,
io.manifest_reads,
io.commit_graph_reads
);
curve.push((d, io));
}
// The opener (latest-version resolution) is O(1) after step 3a (direct-by-URI),
// isolated from the scan by the PrefixCounter. Slack absorbs object-store variance;
// the pre-3a builder grew this ~+12/depth (RFC §2.4 [M]).
assert_flat(&curve, |c| c.data_opener_reads, 8, "S3 data-table opener");
}