write-path cost gate + opener bypass (#288)

* docs(rfc): RFC-013 write-path latency design + index link

* perf(engine): open write-path tables directly, bypassing the namespace builder

Write opens routed through DatasetBuilder::from_namespace, whose describe_table
opened the whole dataset just to return a location and then re-resolved the
latest version — an O(commit-depth) double latest-resolution per table open that
missed Lance's O(1) version-hint fast path. On an object store this dominated
write latency (~70%, RFC-013 section 2.4).

TableStore::open_dataset_head_for_write now delegates to the direct opener
(open_dataset_head: Dataset::open by URI + checkout_branch, routed through the
tracked opener so cost tests can count it; a no-op in production). The manifest
already holds every sub-table's location, so the namespace catalog lookup was
redundant; ensure_expected_version still validates head == pinned for strict ops.
This completes PR #268's open-by-location migration on the write side.

With both reads (PR #268) and now writes bypassing it, nothing in production
routes through the per-table Lance namespace. The dead open chain
(load_table_from_namespace, open_table_head_for_write) is deleted and the
StagedTableNamespace contract apparatus is gated #[cfg(test)], mirroring the
already-test-only read namespace; __manifest commit coordination
(GraphNamespacePublisher) is a separate component and is unaffected.

See docs/dev/rfc-013-write-path-latency.md sections 2.4 and 9 (step 3a).

* test(engine): write-path cost-budget gate on a shared harness

Adds tests/helpers/cost.rs, a store-agnostic cost harness (IoCounts/StagedCounts,
measure/measure_with_staged, assert_flat, local_graph/s3_graph) that the read-side
warm_read_cost.rs, write_cost.rs, and write_cost_s3.rs share, so the IOTracker /
task-local plumbing lives in exactly one place instead of duplicated per test.

write_cost.rs (local, every-PR) gates the internal-table scan term flat in
commit-history depth (a RED #[ignore]'d LOCK, the acceptance for bringing the
internal tables into compaction) plus green guards: a single insert's data writes
are bounded, a per-write read-op ceiling fails the moment a round-trip is added,
and a keyed insert routes through stage_merge_insert once with no stage_append or
vector-index build.

write_cost_s3.rs (bucket-gated, rustfs CI) gates the data-table opener term flat
across depth — the object-store-RPC phenomenon local FS cannot reproduce, and the
red->green proof of the opener bypass. Wired into the rustfs_integration CI job
and its path filter.

Guards the "hot-path cost is bounded by work, not history" invariant on writes.
See docs/dev/rfc-013-write-path-latency.md section 5.1, docs/dev/testing.md.

* docs(rfc): RFC-013 step 3a landed; write-skew coupling; cost-gate test map

- Section 9: mark step 1 (gate + harness) and step 3a (opener bypass) landed;
  record the per-table namespace retirement to test-only and the corrected
  measurement note (the opener win is S3-only; the local data-table growth is the
  merge-insert/RI fragment scan, a compaction term, not the opener).
- Sections 7.1/6/11/5.5/10: correct the cross-table write-skew analysis after a
  prototype proved the scoped expected-set fix is a no-op against the per-object_id
  manifest (disjoint writers never share a row, so Lance never conflicts, the
  publisher never retries, and the expected check is a non-atomic pre-check
  evaluated once against stale state). The fix needs a shared contention row
  (Phase-7 graph_head / a minimal head row / commit-time re-validation), so it is
  coupled to that row, not standalone; that contention is load-bearing for
  correctness, not a drawback. Split the concurrent face (read-set + head) from the
  sequential face (inbound-RI validation on node removal) -- two different fixes.
- testing.md: add write_cost.rs / helpers/cost.rs / write_cost_s3.rs to the test
  map; document the local-vs-S3 backend split; extend the cost-budget checklist
  item to the write/open path and point at the shared harness.

* test(engine): isolate the opener in the S3 cost gate; fail loud on S3 setup errors

Addresses two PR review findings on the bucket-gated write_cost_s3 gate:

- The data-table opener was not isolated: `data_reads` also counts the
  merge-insert/RI scan, which reads O(fragment-count) and so grows with history
  for a different reason (compaction's domain, not the opener) -- the same term
  that made the local data-table count grow. The flat assertion would false-RED
  or misattribute scan growth to the opener on rustfs. Fix: compact (db.optimize)
  before each measurement so the table holds ~1 fragment, bounding the scan and
  leaving the opener's latest-version resolution as the only history-varying term.
  Compaction preserves version history, so the opener still faces a deep
  _versions/ chain -- the thing under test.

- s3_graph used `.ok()?`, so when OMNIGRAPH_S3_TEST_BUCKET was set but the store
  was down/misconfigured, init/seed failures collapsed to None and the gate
  skipped + passed vacuously. Fix: skip only when the bucket env var is absent;
  once it is set, init/seed failures panic (mirrors tests/s3_storage.rs).

* test(engine): isolate the S3 opener with a per-prefix IO probe (correct-by-design)

Replaces the fixture-bounded isolation (compact-before-measure) from the prior
commit with the root fix: a path-classifying ObjectStore wrapper (PrefixCounter)
that attributes each data-table read to the opener term (_versions/.manifest) vs
the scan term (data/*.lance). IoCounts now exposes data_opener_reads /
data_scan_reads, so write_cost_s3 asserts the opener flat *directly* -- no
compaction or fixture massaging, and the assertion measures the opener, not the
conflated total. Closes the "harness conflates two IO terms" class: any cost test
(read or write) can now isolate the opener.

PrefixCounter implements only the object_store 0.13 core ObjectStore methods; the
convenience surface (get/put/head/...) routes through get_opts/put_opts via
ObjectStoreExt's blanket impl, so every read/write is still counted.

Validated locally (every-PR) by write_cost::data_table_reads_split_into_flat_opener_
and_growing_scan: opener stays flat (7 -> 3) while scan grows (11 -> 91) and
opener + scan == data_total exactly -- proving the classifier and confirming the
local data-table growth is the fragment scan, not the opener. warm_read_cost
(12 tests) stays green under the shared-harness change.

* refactor(tests): remove cost-harness duplication and namespace cfg(test) noise

Branch self-review (no behavior change) — pay down three liabilities the
write-path work left:

- warm_read_cost.rs kept its own probes() (three IOTrackers + a QueryIoProbes +
  a probe counter) and read raw .stats().read_iops — duplicating the shared
  helpers::cost harness this branch introduced. Migrated all 12 tests onto
  measure()/IoCounts; deleted the local probes(). (This also makes IoCounts'
  version_probes field used rather than dead.)

- insert_cost was copy-pasted verbatim into write_cost.rs and write_cost_s3.rs.
  Hoisted to helpers::cost::measure_insert so the measured write is defined once.

- The per-table Lance namespace (namespace.rs) became entirely test-only after
  step 3a, but was gated with ~22 per-item #[cfg(test)] attributes. Collapsed to
  a single `#[cfg(test)] mod namespace;` and stripped the per-item attributes;
  merged the import groups the gating had split.

Verified: lib in-source 162 passed; write_cost 4 + warm_read_cost 12 passed;
forbidden_apis passed.
This commit is contained in:
Ragnor Comerford 2026-06-20 13:31:15 +02:00 committed by GitHub
parent b38b36e48f
commit f6d2cc03e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1959 additions and 267 deletions

View file

@ -14,6 +14,10 @@ mod layout;
mod metadata;
#[path = "manifest/migrations.rs"]
mod migrations;
// Entirely test-only since RFC-013 step 3a: with both reads (Fix 2) and writes
// bypassing the Lance namespace, nothing in production routes through it; the
// `LanceNamespace` impls are retained only to validate the contract in unit tests.
#[cfg(test)]
#[path = "manifest/namespace.rs"]
mod namespace;
#[path = "manifest/publisher.rs"]
@ -28,7 +32,6 @@ use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_
pub(crate) use metadata::TableVersionMetadata;
#[cfg(test)]
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};

View file

@ -76,6 +76,7 @@ pub(super) fn table_uri_for_path(root_uri: &str, table_path: &str, branch: Optio
}
}
#[cfg(test)]
pub(super) fn namespace_internal_error(message: impl Into<String>) -> LanceNamespaceError {
LanceNamespaceError::namespace_source(Box::new(std::io::Error::other(message.into())))
}

View file

@ -2,7 +2,9 @@ use std::collections::HashMap;
use lance::Dataset;
use lance_namespace::Error as LanceNamespaceError;
use lance_namespace::models::{CreateTableVersionRequest, TableVersion};
use lance_namespace::models::CreateTableVersionRequest;
#[cfg(test)]
use lance_namespace::models::TableVersion;
use serde::{Deserialize, Serialize};
use crate::error::{OmniError, Result};
@ -142,6 +144,7 @@ impl TableVersionMetadata {
self.to_namespace_version_with_details(version, None, None)
}
#[cfg(test)]
pub(super) fn to_namespace_version_with_details(
&self,
version: u64,

View file

@ -1,3 +1,10 @@
// Both the read namespace (BranchManifestNamespace) and the write namespace
// (StagedTableNamespace) are now test-only contract validation. Reads open
// sub-tables directly by location+version (SubTableEntry::open, Fix 2), and
// writes open the table head directly by URI (TableStore::open_dataset_head,
// RFC-013 step 3a), so nothing in production routes through the Lance namespace
// anymore. These impls are retained only to validate the LanceNamespace
// contract in unit tests.
use std::sync::Arc;
use async_trait::async_trait;
@ -16,30 +23,21 @@ use object_store::{
use crate::error::{OmniError, Result};
use super::layout::{namespace_internal_error, table_uri_for_path};
#[cfg(test)]
use super::layout::{open_manifest_dataset, table_id_to_key};
use super::metadata::TableVersionMetadata;
#[cfg(test)]
use super::metadata::{namespace_version_metadata, parse_namespace_version_request};
#[cfg(test)]
use super::layout::{
namespace_internal_error, open_manifest_dataset, table_id_to_key, table_uri_for_path,
};
use super::metadata::{
TableVersionMetadata, namespace_version_metadata, parse_namespace_version_request,
};
use super::publisher::GraphNamespacePublisher;
// The read namespace (BranchManifestNamespace) is test-only since Fix 2: reads
// open sub-tables directly by location+version (SubTableEntry::open), so nothing
// in production routes a read through the Lance namespace. The writes path uses
// StagedTableNamespace. These items are retained to validate the namespace
// contract in unit tests.
#[cfg(test)]
use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state};
#[cfg(test)]
#[derive(Debug, Clone)]
struct BranchManifestNamespace {
root_uri: String,
branch: Option<String>,
}
#[cfg(test)]
impl BranchManifestNamespace {
fn new(root_uri: &str, branch: Option<&str>) -> Self {
Self {
@ -146,7 +144,6 @@ impl StagedTableNamespace {
}
}
#[cfg(test)]
pub(crate) fn branch_manifest_namespace(
root_uri: &str,
branch: Option<&str>,
@ -165,27 +162,6 @@ pub(crate) fn staged_table_namespace(
))
}
async fn load_table_from_namespace(
namespace: Arc<dyn LanceNamespace>,
table_key: &str,
branch: Option<&str>,
version: Option<u64>,
) -> Result<Dataset> {
let builder = DatasetBuilder::from_namespace(namespace, vec![table_key.to_string()])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let builder = match (branch, version) {
(Some(branch), version) => builder.with_branch(branch, version),
(None, Some(version)) => builder.with_version(version),
(None, None) => builder,
};
builder
.load()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
#[cfg(test)]
#[async_trait]
impl LanceNamespace for BranchManifestNamespace {
fn namespace_id(&self) -> String {
@ -540,18 +516,3 @@ impl LanceNamespace for StagedTableNamespace {
Ok(response)
}
}
pub(crate) async fn open_table_head_for_write(
root_uri: &str,
table_key: &str,
table_path: &str,
branch: Option<&str>,
) -> Result<Dataset> {
load_table_from_namespace(
staged_table_namespace(root_uri, table_key, table_path, branch),
table_key,
branch,
None,
)
.await
}

View file

@ -24,7 +24,7 @@ use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::sync::Arc;
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
use crate::db::manifest::TableVersionMetadata;
use crate::db::{Snapshot, SubTableEntry};
use crate::error::{OmniError, Result};
use crate::storage_layer::ForkOutcome;
@ -160,9 +160,15 @@ impl TableStore {
dataset_uri: &str,
branch: Option<&str>,
) -> Result<Dataset> {
let ds = Dataset::open(dataset_uri)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Direct open by URI (O(1) latest-resolution). Routed through the tracked
// opener so a cost test counts it via the per-query `table_wrapper`
// (no-op in production — the task-local is unset, so this is exactly
// `Dataset::open(uri)`).
let ds = crate::instrumentation::open_dataset_tracked(
dataset_uri,
crate::instrumentation::table_wrapper(),
)
.await?;
match branch {
Some(branch) if branch != "main" => ds
.checkout_branch(branch)
@ -178,8 +184,14 @@ impl TableStore {
dataset_uri: &str,
branch: Option<&str>,
) -> Result<Dataset> {
let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
// RFC-013 step 3a: open writes via the direct opener (O(1)) instead of the
// lance-namespace builder, which re-resolved the table's version chain
// O(depth) per write. The namespace is a catalog/discovery layer, not a
// per-open hot-path component (RFC §2.4); the manifest already holds the
// location, and `ensure_expected_version` still validates head == pinned
// for strict ops. `table_key` retained for signature stability.
let _ = table_key;
self.open_dataset_head(dataset_uri, branch).await
}
pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {

View file

@ -0,0 +1,360 @@
//! Shared cost-budget test harness (RFC-013) — the single place the IO-counting
//! plumbing lives, so `warm_read_cost.rs`, `write_cost.rs`, and the S3 variant
//! assert in one vocabulary instead of duplicating `probes()` + raw `IOTracker`
//! reads. Three clean abstractions: structured counts, a `measure` primitive, a
//! named flat-assertion, plus store-agnostic backend fixtures.
//!
//! The data-table wrapper is a **path-classifying** counter (`PrefixCounter`), not a
//! plain `IOTracker`: it splits each read into the **opener** term (latest-version
//! resolution — reads of `_versions/`/`.manifest` objects) vs the **scan** term
//! (data-fragment reads, `data/`/`*.lance`). That lets a cost test isolate the
//! opener (RFC-013 step 3a's target, O(1) after the bypass) from the merge-insert/RI
//! scan (O(fragment-count), compaction's domain) even though both ride the same
//! `Dataset` — without controlling the fixture (no compaction needed). `__manifest`
//! and `_graph_commits` keep the plain `IOTracker` (no sub-prefixes worth splitting).
#![allow(dead_code)]
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::stream::BoxStream;
use lance::io::WrappingObjectStore;
use lance_io::utils::tracking_store::IOTracker;
use object_store::path::Path;
use object_store::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
};
use omnigraph::db::Omnigraph;
use omnigraph::instrumentation::{
MergeWriteProbes, QueryIoProbes, with_merge_write_probes, with_query_io_probes,
};
use omnigraph::loader::{LoadMode, load_jsonl};
use super::{MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, init_and_load, mixed_params};
/// Object-store op counts for one measured operation, by table class — the
/// vocabulary cost tests assert in (vs raw `IOTracker::stats().read_iops`).
#[derive(Debug, Clone, Copy, Default)]
pub struct IoCounts {
/// Per-table DATA opens (node/edge tables). The dominant write-path term.
pub data_reads: u64,
pub data_writes: u64,
/// DATA-table reads attributed to latest-version resolution (`_versions/`,
/// `.manifest`). This is the **opener** term step 3a flattened — isolated from
/// the scan, so it can be gated directly without compacting the fixture.
pub data_opener_reads: u64,
/// DATA-table reads attributed to data fragments (`data/`, `*.lance`) — the
/// merge-insert/RI **scan**, which grows with fragment count (compaction's
/// domain, not the opener).
pub data_scan_reads: u64,
/// `__manifest` registry scans (publish state).
pub manifest_reads: u64,
/// `_graph_commits` lineage scans.
pub commit_graph_reads: u64,
/// Version-probe invocations (the cheap freshness check).
pub version_probes: u64,
}
impl IoCounts {
pub fn total_reads(&self) -> u64 {
self.data_reads + self.manifest_reads + self.commit_graph_reads
}
}
/// Which staged-write primitives an operation invoked (from `MergeWriteProbes`).
#[derive(Debug, Clone, Copy, Default)]
pub struct StagedCounts {
pub stage_append: u64,
pub stage_merge_insert: u64,
pub create_vector_index: u64,
pub scan_staged_combined: u64,
}
// ── Path-classifying data-table read counter ──
/// How a data-table object read is attributed.
enum ReadClass {
/// Latest-version resolution: `_versions/`, `.manifest`, `_latest`.
Opener,
/// Data fragments: `data/`, `*.lance`.
Scan,
/// Anything else (indices, deletion files, …) — counted in the total only.
Other,
}
/// Classify a Lance object path by its role in a write open. Lance's on-object
/// layout is identical on local FS and S3, so this split is backend-independent.
fn classify(path: &Path) -> ReadClass {
let p = path.as_ref();
if p.contains("_versions") || p.ends_with(".manifest") || p.contains("_latest") {
ReadClass::Opener
} else if p.contains("/data/") || p.starts_with("data/") || p.ends_with(".lance") {
ReadClass::Scan
} else {
ReadClass::Other
}
}
#[derive(Debug, Default, Clone, Copy)]
struct PrefixCounts {
reads: u64,
writes: u64,
opener_reads: u64,
scan_reads: u64,
}
/// A `WrappingObjectStore` that counts reads/writes and attributes each read to the
/// opener vs scan term by object-key prefix. Shares its tally via `Arc<Mutex<…>>` so
/// the wrapped store (handed to Lance) and the test read the same counters.
#[derive(Debug, Default, Clone)]
struct PrefixCounter(Arc<Mutex<PrefixCounts>>);
impl PrefixCounter {
fn record_read(&self, location: &Path) {
let mut c = self.0.lock().unwrap();
c.reads += 1;
match classify(location) {
ReadClass::Opener => c.opener_reads += 1,
ReadClass::Scan => c.scan_reads += 1,
ReadClass::Other => {}
}
}
fn record_write(&self) {
self.0.lock().unwrap().writes += 1;
}
fn snapshot(&self) -> PrefixCounts {
*self.0.lock().unwrap()
}
}
impl WrappingObjectStore for PrefixCounter {
fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(PrefixCountingStore {
target,
counter: self.clone(),
})
}
}
/// The wrapped `ObjectStore` that records each call into a [`PrefixCounter`].
/// Implements only the required core `ObjectStore` methods (object_store 0.13: the
/// convenience surface — `get`/`put`/`head`/`get_range`/… — lives on
/// `ObjectStoreExt` and is provided by a blanket impl that routes through `get_opts`
/// / `put_opts`, so every read/write is still counted here). Per-read path
/// classification is the only addition over a plain pass-through.
#[derive(Debug)]
struct PrefixCountingStore {
target: Arc<dyn ObjectStore>,
counter: PrefixCounter,
}
impl fmt::Display for PrefixCountingStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "PrefixCountingStore({})", self.target)
}
}
#[async_trait]
impl ObjectStore for PrefixCountingStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> OSResult<PutResult> {
self.counter.record_write();
self.target.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> OSResult<Box<dyn MultipartUpload>> {
self.counter.record_write();
self.target.put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
self.counter.record_read(location);
self.target.get_opts(location, options).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, OSResult<Path>>,
) -> BoxStream<'static, OSResult<Path>> {
self.target.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
self.counter.record_read(&prefix.cloned().unwrap_or_default());
self.target.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, OSResult<ObjectMeta>> {
self.counter.record_read(&prefix.cloned().unwrap_or_default());
self.target.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
self.counter.record_read(&prefix.cloned().unwrap_or_default());
self.target.list_with_delimiter(prefix).await
}
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> OSResult<()> {
self.counter.record_write();
self.target.copy_opts(from, to, options).await
}
}
/// The tracker handles backing one measurement; read once into [`IoCounts`].
struct ProbeHandles {
manifest: IOTracker,
commit_graph: IOTracker,
table: PrefixCounter,
probe_count: Arc<AtomicU64>,
}
impl ProbeHandles {
fn install() -> (QueryIoProbes, Self) {
let h = ProbeHandles {
manifest: IOTracker::default(),
commit_graph: IOTracker::default(),
table: PrefixCounter::default(),
probe_count: Arc::new(AtomicU64::new(0)),
};
let probes = QueryIoProbes {
manifest_wrapper: Some(Arc::new(h.manifest.clone()) as Arc<dyn WrappingObjectStore>),
commit_graph_wrapper: Some(
Arc::new(h.commit_graph.clone()) as Arc<dyn WrappingObjectStore>
),
table_wrapper: Some(Arc::new(h.table.clone()) as Arc<dyn WrappingObjectStore>),
probe_count: Arc::clone(&h.probe_count),
};
(probes, h)
}
fn counts(&self) -> IoCounts {
let t = self.table.snapshot();
IoCounts {
data_reads: t.reads,
data_writes: t.writes,
data_opener_reads: t.opener_reads,
data_scan_reads: t.scan_reads,
manifest_reads: self.manifest.stats().read_iops,
commit_graph_reads: self.commit_graph.stats().read_iops,
version_probes: self.probe_count.load(Ordering::Relaxed),
}
}
}
/// Run `op` under object-store IO counting; return its output + the counts.
/// The only place the `QueryIoProbes` task-local + tracker wiring lives.
pub async fn measure<F: Future>(op: F) -> (F::Output, IoCounts) {
let (probes, handles) = ProbeHandles::install();
let out = with_query_io_probes(probes, op).await;
(out, handles.counts())
}
/// Like [`measure`], but also capture which staged-write primitives ran
/// (composes the two task-locals cleanly).
pub async fn measure_with_staged<F: Future>(op: F) -> (F::Output, IoCounts, StagedCounts) {
let (probes, handles) = ProbeHandles::install();
let merge = MergeWriteProbes::default();
let out = with_merge_write_probes(merge.clone(), with_query_io_probes(probes, op)).await;
let staged = StagedCounts {
stage_append: merge.stage_append_calls(),
stage_merge_insert: merge.stage_merge_insert_calls(),
create_vector_index: merge.create_vector_index_calls(),
scan_staged_combined: merge.scan_staged_combined_calls(),
};
(out, handles.counts(), staged)
}
/// Assert a per-depth metric is flat: the deepest sample must not exceed the
/// shallowest by more than `slack`. `select` picks the field; `what` names it in
/// the failure message. The shape every depth-swept cost gate uses.
pub fn assert_flat(
curve: &[(u64, IoCounts)],
select: impl Fn(&IoCounts) -> u64,
slack: u64,
what: &str,
) {
assert!(curve.len() >= 2, "assert_flat needs >= 2 depth points");
let (d_lo, lo) = (curve[0].0, select(&curve[0].1));
let (d_hi, hi) = (curve[curve.len() - 1].0, select(&curve[curve.len() - 1].1));
assert!(
hi <= lo + slack,
"{what} grew with history: depth {d_lo} = {lo} -> depth {d_hi} = {hi} (slack {slack})"
);
}
/// Assert a per-depth metric *does* grow with history by at least `min_delta` — the
/// dual of [`assert_flat`], used to prove a term is genuinely history-dependent (so a
/// flat sibling term isn't flat merely because nothing was measured).
pub fn assert_grows(
curve: &[(u64, IoCounts)],
select: impl Fn(&IoCounts) -> u64,
min_delta: u64,
what: &str,
) {
assert!(curve.len() >= 2, "assert_grows needs >= 2 depth points");
let (d_lo, lo) = (curve[0].0, select(&curve[0].1));
let (d_hi, hi) = (curve[curve.len() - 1].0, select(&curve[curve.len() - 1].1));
assert!(
hi >= lo + min_delta,
"{what} did not grow as expected: depth {d_lo} = {lo} -> depth {d_hi} = {hi} (min delta {min_delta})"
);
}
/// Measure one committing `insert_person` to `main` — the canonical write the cost
/// gates sweep over commit-history depth. Shared by `write_cost.rs` and
/// `write_cost_s3.rs` so the measured write is defined once.
pub async fn measure_insert(db: &mut Omnigraph, tag: &str) -> IoCounts {
let (res, io) = measure(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", tag)], &[("$age", 30)]),
))
.await;
res.unwrap();
io
}
// ── Backend fixtures — one knob, store-agnostic body ──
/// Local tempdir graph (default; deterministic, every-PR).
pub async fn local_graph(dir: &tempfile::TempDir) -> Omnigraph {
init_and_load(dir).await
}
/// Emulated-S3 graph, bucket-gated. Returns `None` **only** when
/// `OMNIGRAPH_S3_TEST_BUCKET` is unset, so the caller logs + skips — the
/// `tests/s3_storage.rs` graceful-skip pattern. Once the bucket *is* configured
/// (the rustfs CI job), any `init`/seed failure is a real failure and panics
/// rather than silently skipping — otherwise a down/misconfigured store would let
/// a bucket-gated gate pass vacuously. `name` disambiguates the prefix.
pub async fn s3_graph(name: &str) -> Option<Omnigraph> {
let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
let uri = format!("s3://{bucket}/cost-tests/{name}-{}", std::process::id());
let mut db = Omnigraph::init(&uri, TEST_SCHEMA)
.await
.expect("OMNIGRAPH_S3_TEST_BUCKET is set but S3 graph init failed");
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.expect("OMNIGRAPH_S3_TEST_BUCKET is set but S3 seed load failed");
Some(db)
}

View file

@ -1,5 +1,6 @@
#![allow(dead_code)]
pub mod cost;
pub mod recovery;
use arrow_array::{Array, RecordBatch, StringArray};

View file

@ -1,51 +1,22 @@
//! Cost-budget tests for the warm read path (Fix 1): a warm same-branch read
//! must perform no manifest or commit-graph opens, measured with Lance's
//! `IOTracker` at the object-store boundary (the LanceDB IO-counted-test
//! pattern; see docs/dev/testing.md). Guards invariant 15 (read cost bounded by
//! work, not history) for snapshot resolution, and invariant 6 (a warm reader
//! still observes external commits).
//! must perform no manifest or commit-graph opens, measured via the shared
//! `helpers::cost` harness at the object-store boundary (the LanceDB
//! IO-counted-test pattern; see docs/dev/testing.md). Guards invariant 15 (read
//! cost bounded by work, not history) for snapshot resolution, and invariant 6
//! (a warm reader still observes external commits).
mod helpers;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use arrow_array::{Array, StringArray};
use lance::io::WrappingObjectStore;
use lance_io::utils::tracking_store::IOTracker;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes};
use omnigraph_compiler::result::QueryResult;
use helpers::cost::measure;
use helpers::{
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
mutate_branch, mutate_main, params,
};
/// IO probes plus the tracker handles to read `read_iops` after the query.
/// Returns `(probes, manifest, commit_graph, table, probe_count)` — `table`
/// counts per-table data opens (the cache-miss path), so a cost test can assert
/// N opens on a cold read and 0 on a warm repeat (Fix 3).
fn probes() -> (
QueryIoProbes,
IOTracker,
IOTracker,
IOTracker,
Arc<AtomicU64>,
) {
let manifest = IOTracker::default();
let commit_graph = IOTracker::default();
let table = IOTracker::default();
let probe_count = Arc::new(AtomicU64::new(0));
let probes = QueryIoProbes {
manifest_wrapper: Some(Arc::new(manifest.clone()) as Arc<dyn WrappingObjectStore>),
commit_graph_wrapper: Some(Arc::new(commit_graph.clone()) as Arc<dyn WrappingObjectStore>),
table_wrapper: Some(Arc::new(table.clone()) as Arc<dyn WrappingObjectStore>),
probe_count: Arc::clone(&probe_count),
};
(probes, manifest, commit_graph, table, probe_count)
}
fn first_column_strings(result: &QueryResult) -> Vec<String> {
if result.num_rows() == 0 {
return Vec::new();
@ -75,18 +46,14 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
// Deep history: warm-read resolution cost must be flat in commit count.
commit_many(&mut db, 20).await;
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
// A warm same-branch read opens nothing from the internal tables, even at
// commit-history depth. Fix 1 reuses the coordinator (no re-open: 0
@ -95,18 +62,15 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
// per-table __manifest scan is gone too. Pre-fix, each of these is a deep scan
// of an internal table that grows with commit count.
assert_eq!(
manifest.stats().read_iops,
0,
io.manifest_reads, 0,
"warm same-branch read must not scan __manifest (resolution or per-table)"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"warm same-branch read must not open the commit graph (no coordinator re-open)"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
1,
io.version_probes, 1,
"warm same-branch read performs exactly one version probe"
);
}
@ -121,22 +85,17 @@ async fn multi_table_query_does_no_manifest_scans() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let (probes_in, manifest, _commit_graph, _table, _probe) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"age_stats",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"age_stats",
&params(&[]),
))
.await;
out.unwrap();
assert_eq!(
manifest.stats().read_iops,
0,
io.manifest_reads, 0,
"a multi-table read must not scan __manifest once per touched table"
);
}
@ -278,32 +237,25 @@ async fn warm_branch_read_does_no_manifest_scans() {
// Bind the handle's coordinator to the branch so reads of it take the warm path.
db.sync_branch("feature").await.unwrap();
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
assert_eq!(
manifest.stats().read_iops,
0,
io.manifest_reads, 0,
"warm branch read must not scan __manifest (branch-owned table opened by location)"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"warm branch read must not open the commit graph"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
1,
io.version_probes, 1,
"warm branch read performs exactly one version probe"
);
}
@ -369,18 +321,14 @@ async fn warm_read_on_recreated_branch_observes_new_incarnation() {
"test setup must exercise branch incarnation reuse at one Lance version"
);
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
let new_feature = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "MainOnly")]),
),
)
.await
.unwrap();
let (new_feature, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "MainOnly")]),
))
.await;
let new_feature = new_feature.unwrap();
assert_eq!(
new_feature.num_rows(),
@ -388,17 +336,15 @@ async fn warm_read_on_recreated_branch_observes_new_incarnation() {
"warm reader must refresh to the recreated branch incarnation"
);
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"recreated branch must re-read the manifest after the incarnation probe"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"same-branch incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
@ -469,39 +415,33 @@ async fn recreated_branch_owned_table_handle_uses_table_etag() {
"test setup must force table handle identity to differ only by e_tag"
);
let (probes_in, manifest, commit_graph, table, probe_count) = probes();
let new_person = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "NewOnly")]),
),
)
.await
.unwrap();
let (new_person, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "NewOnly")]),
))
.await;
let new_person = new_person.unwrap();
assert_eq!(
new_person.num_rows(),
1,
"warm reader must open the recreated branch-owned table incarnation"
);
assert!(
table.stats().read_iops > 0,
io.data_reads > 0,
"table e_tag must force a held-handle cache miss for the recreated table"
);
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"recreated branch must refresh the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"same-branch table-incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under each lock"
);
@ -594,35 +534,29 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() {
"test setup must force graph-index identity to differ only by snapshot incarnation"
);
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
let new_friends = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "NewWalker")]),
),
)
.await
.unwrap();
let (new_friends, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "NewWalker")]),
))
.await;
let new_friends = new_friends.unwrap();
assert_eq!(
first_column_strings(&new_friends),
vec!["Bob"],
"traversal must use the recreated branch's topology, not stale cached graph index"
);
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"recreated branch traversal must refresh the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"same-branch traversal incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under each lock"
);
@ -673,31 +607,25 @@ async fn stale_read_refreshes_manifest_only() {
.await
.unwrap();
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(reader.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
assert!(
manifest.stats().read_iops > 0,
io.manifest_reads > 0,
"stale read must re-read the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
io.commit_graph_reads, 0,
"stale refresh must be manifest-only (no commit-graph scan)"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
io.version_probes, 2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
@ -721,55 +649,40 @@ async fn repeat_warm_read_reuses_table_handles() {
commit_many(&mut db, 10).await;
// Cold first read: opens the touched table.
let (p1, _m1, _c1, table1, _pr1) = probes();
with_query_io_probes(
p1,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (cold_out, cold) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
cold_out.unwrap();
assert!(
table1.stats().read_iops > 0,
cold.data_reads > 0,
"the cold first read must open the table"
);
// Warm repeat: the held handle is reused, so no open happens through this
// query's table wrapper.
let (p2, manifest2, commit_graph2, table2, probe2) = probes();
with_query_io_probes(
p2,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
// query's table wrapper. A fresh `measure()` isolates the warm repeat's cost.
let (warm_out, warm) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
warm_out.unwrap();
assert_eq!(
table2.stats().read_iops,
0,
warm.data_reads, 0,
"a warm repeat read must reuse the held handle (0 table opens)"
);
assert_eq!(warm.manifest_reads, 0, "warm repeat read: 0 manifest opens");
assert_eq!(
manifest2.stats().read_iops,
0,
"warm repeat read: 0 manifest opens"
);
assert_eq!(
commit_graph2.stats().read_iops,
0,
warm.commit_graph_reads, 0,
"warm repeat read: 0 commit-graph opens"
);
assert_eq!(
probe2.load(Ordering::Relaxed),
1,
warm.version_probes, 1,
"warm repeat read: exactly one version probe"
);
}
@ -807,20 +720,16 @@ async fn write_invalidates_table_cache_for_changed_table() {
.unwrap();
// The next read re-opens Person at the new version (cache miss).
let (p, _m, _c, table, _pr) = probes();
with_query_io_probes(
p,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
))
.await;
out.unwrap();
assert!(
table.stats().read_iops > 0,
io.data_reads > 0,
"a read after a write to the table must re-open it (version-keyed miss)"
);

View file

@ -0,0 +1,159 @@
//! Cost-budget tests for the WRITE path (RFC-013 step 1) — the safety/latency
//! twin of `warm_read_cost.rs`, on the shared `helpers::cost` harness. A
//! committing write's per-table opens and internal-table scans must be bounded
//! and **flat across commit-history depth**, measured at the object-store
//! boundary. Guards invariant 15 (cost bounded by work, not history) on writes.
//!
//! **Backend split (see docs/dev/testing.md / RFC-013).** This file runs on
//! **local FS** and gates the **internal-table** term (`__manifest`/`_graph_commits`
//! fragment scans, ~+18/depth — O(fragments) on any backend, step 2's target).
//!
//! The **data-table opener** term (step 3a's win) is a per-object-store-RPC
//! phenomenon and is NOT gated here: local-FS latest-resolution is cheap whether
//! the open goes through the namespace builder or direct-by-URI, so the
//! namespace→direct switch is invisible on local. Measured: the local data-table
//! read count grows with depth too (~+0.9/depth), but that is a *different* term —
//! the merge-insert/RI scan reading O(depth) **fragments**, unchanged by the
//! opener switch (depth-100 = 92 ops both before and after step 3a, same slope)
//! and reduced only by compaction. The opener term shows up only on a real object
//! store (per-version GETs, ~+12/depth → flat after step 3a), so it is gated in
//! `write_cost_s3.rs` (bucket-gated). Same `measure`/`IoCounts` harness, different
//! backend; each term gated where it actually manifests.
#![recursion_limit = "512"]
mod helpers;
use helpers::cost::{
IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_with_staged,
};
use helpers::{MUTATION_QUERIES, commit_many, mixed_params};
// ── (A) The internal-table LOCK — RED today, the acceptance test for step 2 ──
//
// `__manifest` / `_graph_commits` scans must be O(1) in commit-history depth.
// RED today (O(fragments), uncompacted). Un-ignore when step 2 (internal-table
// compaction) lands — it must go green flat. (The data-table term is the S3
// gate's, `write_cost_s3.rs`; local-FS hides it.)
#[tokio::test]
#[ignore = "RED until step 2 (internal-table compaction): __manifest/_graph_commits scans are O(fragments) today — RFC-013 §0/§2.2. Un-ignore there as the red→green acceptance test."]
async fn internal_table_scans_are_flat_in_history() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("lock_{d}")).await;
current += 1; // the measured write advanced depth by one
eprintln!(
"depth~{d}: data={} __manifest={} _graph_commits={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads
);
curve.push((d, io));
}
assert_flat(&curve, |c| c.manifest_reads, 4, "__manifest scan");
assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits scan");
}
// The data-table OPENER history-gate (opener flat across depth) lives in
// `write_cost_s3.rs` — its history-dependence is an S3-only phenomenon. But the
// *probe that isolates* the opener (the `PrefixCounter` split) is validated here,
// every-PR, on local FS:
/// Proves the `PrefixCounter` opener/scan split: a committing write's data-table
/// reads divide into a **flat opener** term and a **growing scan** term. This pins
/// (a) the classifier actually attributes reads to the opener bucket (non-zero, so a
/// flat assertion isn't vacuously flat-at-zero), and (b) the local data-table growth
/// is the merge-insert/RI fragment scan, not the opener — which is *why* the S3
/// gate asserts `data_opener_reads`, not total `data_reads`. (On local FS the opener
/// is O(1) regardless of step 3a; the opener's history-dependence is gated on S3.)
#[tokio::test]
async fn data_table_reads_split_into_flat_opener_and_growing_scan() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("split_{d}")).await;
current += 1;
eprintln!(
"depth~{d}: opener={} scan={} data_total={}",
io.data_opener_reads, io.data_scan_reads, io.data_reads
);
curve.push((d, io));
}
assert!(
curve[0].1.data_opener_reads > 0,
"opener reads must be > 0 — the classifier missed version-resolution reads, \
so a flat opener assertion would be vacuous"
);
assert_flat(&curve, |c| c.data_opener_reads, 4, "local data-table opener");
assert_grows(&curve, |c| c.data_scan_reads, 20, "local data-table scan");
}
// ── (B) Green-today regression guards — run on every PR ──
/// A single insert's *data-table* write cost is O(1): the table commit is a small
/// constant number of writes, independent of history.
#[tokio::test]
async fn single_insert_data_write_is_bounded() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "w").await;
eprintln!("single insert: data_writes={}", io.data_writes);
assert!(io.data_writes <= 4, "data-table write_iops should be a small constant, got {}", io.data_writes);
}
/// At a fixed shallow depth, the per-write object-store read count is below a
/// documented ceiling. Fails the moment a change *adds* a round-trip on the write
/// path — the "no new round-trip" guard (calibrated: ~50 at depth ~5).
#[tokio::test]
async fn write_op_count_ceiling_at_shallow_depth() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "ceil").await;
eprintln!(
"depth~5: data={} __manifest={} _graph_commits={} total_reads={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads, io.total_reads()
);
const CEILING: u64 = 80;
assert!(
io.total_reads() <= CEILING,
"per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added",
io.total_reads()
);
}
// ── (C) Fitness assert via the staged-write probes ──
/// A keyed `Person` insert routes through `stage_merge_insert` exactly once, does
/// no `stage_append`, and no inline vector-index build. Pins the structural shape.
#[tokio::test]
async fn keyed_insert_routes_through_merge_insert_only() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let (res, _io, staged) = measure_with_staged(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "fit")], &[("$age", 30)]),
))
.await;
res.unwrap();
assert_eq!(staged.stage_merge_insert, 1, "keyed Person insert stages one merge-insert");
assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append");
assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert");
}

View file

@ -0,0 +1,71 @@
//! S3 (object-store) cost-budget gate for the WRITE path — the bucket-gated twin of
//! `write_cost.rs` that proves RFC-013 **step 3a's data-table opener win**. On the
//! shared `helpers::cost` harness (`measure`/`IoCounts`/`assert_flat`/`s3_graph`).
//!
//! The opener term is an **object-store-RPC phenomenon**: latest-version resolution
//! costs per-version GETs/HEADs on S3 (O(depth) before step 3a, when writes routed
//! through the lance-namespace builder), which local FS cannot reproduce (one cheap
//! `read_dir` regardless). After step 3a (direct-by-URI opens), the per-write
//! **data-table read count is FLAT across commit-history depth** — the measured 70%
//! win. This file is the red→green acceptance for that term (it would be RED on the
//! pre-3a `from_namespace` opener); `write_cost.rs` gates the internal-table term on
//! local every-PR.
//!
//! **Isolating the opener (important):** total `data_reads` is not opener-only — the
//! same wrapped `Dataset` backs the merge-insert/RI **scan**, which reads
//! O(fragment-count) and grows with history for a *different* reason (compaction's
//! domain, not the opener; this is the term that made the *local* data-table count
//! grow). The shared harness's `PrefixCounter` attributes each read by object-key
//! prefix, so this gate asserts `data_opener_reads` (reads of `_versions/`/`.manifest`)
//! **directly** — no compaction or fixture massaging needed. After step 3a the opener
//! is O(1) regardless of version-history depth; before it grew ~+12/depth (RFC §2.4
//! [M]). (See `write_cost.rs` for the local test that proves the split itself —
//! opener flat, scan growing.)
//!
//! Skips gracefully without `OMNIGRAPH_S3_TEST_BUCKET` (the `tests/s3_storage.rs`
//! pattern); runs for real in the rustfs CI job (`.github/workflows/ci.yml`).
#![recursion_limit = "512"]
mod helpers;
use helpers::cost::{IoCounts, assert_flat, measure_insert, s3_graph};
use helpers::commit_many;
/// After step 3a the data-table opener term is flat across depth on a real object
/// store (the measured win). RED on the pre-3a namespace-builder opener (O(depth)
/// per-version resolution).
#[tokio::test]
async fn data_table_opener_is_flat_in_history_on_s3() {
let Some(mut db) = s3_graph("write-cost-opener").await else {
eprintln!(
"SKIP data_table_opener_is_flat_in_history_on_s3: OMNIGRAPH_S3_TEST_BUCKET \
unset (or store unreachable) the S3 opener gate needs an object store"
);
return;
};
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 50] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("s3_{d}")).await;
current += 1;
eprintln!(
"depth~{d}: opener={} scan={} data_total={} __manifest={} _graph_commits={}",
io.data_opener_reads,
io.data_scan_reads,
io.data_reads,
io.manifest_reads,
io.commit_graph_reads
);
curve.push((d, io));
}
// The opener (latest-version resolution) is O(1) after step 3a (direct-by-URI),
// isolated from the scan by the PrefixCounter. Slack absorbs object-store variance;
// the pre-3a builder grew this ~+12/depth (RFC §2.4 [M]).
assert_flat(&curve, |c| c.data_opener_reads, 8, "S3 data-table opener");
}