mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-24 02:38:06 +02:00
write-path cost gate + opener bypass (#288)
* docs(rfc): RFC-013 write-path latency design + index link * perf(engine): open write-path tables directly, bypassing the namespace builder Write opens routed through DatasetBuilder::from_namespace, whose describe_table opened the whole dataset just to return a location and then re-resolved the latest version — an O(commit-depth) double latest-resolution per table open that missed Lance's O(1) version-hint fast path. On an object store this dominated write latency (~70%, RFC-013 section 2.4). TableStore::open_dataset_head_for_write now delegates to the direct opener (open_dataset_head: Dataset::open by URI + checkout_branch, routed through the tracked opener so cost tests can count it; a no-op in production). The manifest already holds every sub-table's location, so the namespace catalog lookup was redundant; ensure_expected_version still validates head == pinned for strict ops. This completes PR #268's open-by-location migration on the write side. With both reads (PR #268) and now writes bypassing it, nothing in production routes through the per-table Lance namespace. The dead open chain (load_table_from_namespace, open_table_head_for_write) is deleted and the StagedTableNamespace contract apparatus is gated #[cfg(test)], mirroring the already-test-only read namespace; __manifest commit coordination (GraphNamespacePublisher) is a separate component and is unaffected. See docs/dev/rfc-013-write-path-latency.md sections 2.4 and 9 (step 3a). * test(engine): write-path cost-budget gate on a shared harness Adds tests/helpers/cost.rs, a store-agnostic cost harness (IoCounts/StagedCounts, measure/measure_with_staged, assert_flat, local_graph/s3_graph) that the read-side warm_read_cost.rs, write_cost.rs, and write_cost_s3.rs share, so the IOTracker / task-local plumbing lives in exactly one place instead of duplicated per test. write_cost.rs (local, every-PR) gates the internal-table scan term flat in commit-history depth (a RED #[ignore]'d LOCK, the acceptance for bringing the internal tables into compaction) plus green guards: a single insert's data writes are bounded, a per-write read-op ceiling fails the moment a round-trip is added, and a keyed insert routes through stage_merge_insert once with no stage_append or vector-index build. write_cost_s3.rs (bucket-gated, rustfs CI) gates the data-table opener term flat across depth — the object-store-RPC phenomenon local FS cannot reproduce, and the red->green proof of the opener bypass. Wired into the rustfs_integration CI job and its path filter. Guards the "hot-path cost is bounded by work, not history" invariant on writes. See docs/dev/rfc-013-write-path-latency.md section 5.1, docs/dev/testing.md. * docs(rfc): RFC-013 step 3a landed; write-skew coupling; cost-gate test map - Section 9: mark step 1 (gate + harness) and step 3a (opener bypass) landed; record the per-table namespace retirement to test-only and the corrected measurement note (the opener win is S3-only; the local data-table growth is the merge-insert/RI fragment scan, a compaction term, not the opener). - Sections 7.1/6/11/5.5/10: correct the cross-table write-skew analysis after a prototype proved the scoped expected-set fix is a no-op against the per-object_id manifest (disjoint writers never share a row, so Lance never conflicts, the publisher never retries, and the expected check is a non-atomic pre-check evaluated once against stale state). The fix needs a shared contention row (Phase-7 graph_head / a minimal head row / commit-time re-validation), so it is coupled to that row, not standalone; that contention is load-bearing for correctness, not a drawback. Split the concurrent face (read-set + head) from the sequential face (inbound-RI validation on node removal) -- two different fixes. - testing.md: add write_cost.rs / helpers/cost.rs / write_cost_s3.rs to the test map; document the local-vs-S3 backend split; extend the cost-budget checklist item to the write/open path and point at the shared harness. * test(engine): isolate the opener in the S3 cost gate; fail loud on S3 setup errors Addresses two PR review findings on the bucket-gated write_cost_s3 gate: - The data-table opener was not isolated: `data_reads` also counts the merge-insert/RI scan, which reads O(fragment-count) and so grows with history for a different reason (compaction's domain, not the opener) -- the same term that made the local data-table count grow. The flat assertion would false-RED or misattribute scan growth to the opener on rustfs. Fix: compact (db.optimize) before each measurement so the table holds ~1 fragment, bounding the scan and leaving the opener's latest-version resolution as the only history-varying term. Compaction preserves version history, so the opener still faces a deep _versions/ chain -- the thing under test. - s3_graph used `.ok()?`, so when OMNIGRAPH_S3_TEST_BUCKET was set but the store was down/misconfigured, init/seed failures collapsed to None and the gate skipped + passed vacuously. Fix: skip only when the bucket env var is absent; once it is set, init/seed failures panic (mirrors tests/s3_storage.rs). * test(engine): isolate the S3 opener with a per-prefix IO probe (correct-by-design) Replaces the fixture-bounded isolation (compact-before-measure) from the prior commit with the root fix: a path-classifying ObjectStore wrapper (PrefixCounter) that attributes each data-table read to the opener term (_versions/.manifest) vs the scan term (data/*.lance). IoCounts now exposes data_opener_reads / data_scan_reads, so write_cost_s3 asserts the opener flat *directly* -- no compaction or fixture massaging, and the assertion measures the opener, not the conflated total. Closes the "harness conflates two IO terms" class: any cost test (read or write) can now isolate the opener. PrefixCounter implements only the object_store 0.13 core ObjectStore methods; the convenience surface (get/put/head/...) routes through get_opts/put_opts via ObjectStoreExt's blanket impl, so every read/write is still counted. Validated locally (every-PR) by write_cost::data_table_reads_split_into_flat_opener_ and_growing_scan: opener stays flat (7 -> 3) while scan grows (11 -> 91) and opener + scan == data_total exactly -- proving the classifier and confirming the local data-table growth is the fragment scan, not the opener. warm_read_cost (12 tests) stays green under the shared-harness change. * refactor(tests): remove cost-harness duplication and namespace cfg(test) noise Branch self-review (no behavior change) — pay down three liabilities the write-path work left: - warm_read_cost.rs kept its own probes() (three IOTrackers + a QueryIoProbes + a probe counter) and read raw .stats().read_iops — duplicating the shared helpers::cost harness this branch introduced. Migrated all 12 tests onto measure()/IoCounts; deleted the local probes(). (This also makes IoCounts' version_probes field used rather than dead.) - insert_cost was copy-pasted verbatim into write_cost.rs and write_cost_s3.rs. Hoisted to helpers::cost::measure_insert so the measured write is defined once. - The per-table Lance namespace (namespace.rs) became entirely test-only after step 3a, but was gated with ~22 per-item #[cfg(test)] attributes. Collapsed to a single `#[cfg(test)] mod namespace;` and stripped the per-item attributes; merged the import groups the gating had split. Verified: lib in-source 162 passed; write_cost 4 + warm_read_cost 12 passed; forbidden_apis passed.
This commit is contained in:
parent
b38b36e48f
commit
f6d2cc03e3
14 changed files with 1959 additions and 267 deletions
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
|
|
@ -88,7 +88,8 @@ jobs:
|
||||||
.github/workflows/ci.yml|Cargo.toml|Cargo.lock|crates/*/Cargo.toml) run_rustfs_ci=true ;;
|
.github/workflows/ci.yml|Cargo.toml|Cargo.lock|crates/*/Cargo.toml) run_rustfs_ci=true ;;
|
||||||
crates/omnigraph/src/storage.rs) run_rustfs_ci=true ;;
|
crates/omnigraph/src/storage.rs) run_rustfs_ci=true ;;
|
||||||
crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;;
|
crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;;
|
||||||
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
|
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/write_cost_s3.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
|
||||||
|
crates/omnigraph/src/table_store.rs|crates/omnigraph/src/instrumentation.rs) run_rustfs_ci=true ;;
|
||||||
crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;;
|
crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;;
|
||||||
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
|
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
|
||||||
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
|
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
|
||||||
|
|
@ -372,6 +373,9 @@ jobs:
|
||||||
- name: Run RustFS storage tests
|
- name: Run RustFS storage tests
|
||||||
run: cargo test --locked -p omnigraph-engine --test s3_storage -- --nocapture
|
run: cargo test --locked -p omnigraph-engine --test s3_storage -- --nocapture
|
||||||
|
|
||||||
|
- name: Run RustFS write-path cost gate (RFC-013 step 3a opener)
|
||||||
|
run: cargo test --locked -p omnigraph-engine --test write_cost_s3 -- --nocapture
|
||||||
|
|
||||||
- name: Run RustFS server smoke
|
- name: Run RustFS server smoke
|
||||||
# No name filter: every test in the s3 target is bucket-gated, and a
|
# No name filter: every test in the s3 target is bucket-gated, and a
|
||||||
# filter matching nothing passes vacuously (which silently ran zero
|
# filter matching nothing passes vacuously (which silently ran zero
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,10 @@ mod layout;
|
||||||
mod metadata;
|
mod metadata;
|
||||||
#[path = "manifest/migrations.rs"]
|
#[path = "manifest/migrations.rs"]
|
||||||
mod migrations;
|
mod migrations;
|
||||||
|
// Entirely test-only since RFC-013 step 3a: with both reads (Fix 2) and writes
|
||||||
|
// bypassing the Lance namespace, nothing in production routes through it; the
|
||||||
|
// `LanceNamespace` impls are retained only to validate the contract in unit tests.
|
||||||
|
#[cfg(test)]
|
||||||
#[path = "manifest/namespace.rs"]
|
#[path = "manifest/namespace.rs"]
|
||||||
mod namespace;
|
mod namespace;
|
||||||
#[path = "manifest/publisher.rs"]
|
#[path = "manifest/publisher.rs"]
|
||||||
|
|
@ -28,7 +32,6 @@ use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_
|
||||||
pub(crate) use metadata::TableVersionMetadata;
|
pub(crate) use metadata::TableVersionMetadata;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
|
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
|
||||||
pub(crate) use namespace::open_table_head_for_write;
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use namespace::{branch_manifest_namespace, staged_table_namespace};
|
use namespace::{branch_manifest_namespace, staged_table_namespace};
|
||||||
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
|
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
|
||||||
|
|
|
||||||
|
|
@ -76,6 +76,7 @@ pub(super) fn table_uri_for_path(root_uri: &str, table_path: &str, branch: Optio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
pub(super) fn namespace_internal_error(message: impl Into<String>) -> LanceNamespaceError {
|
pub(super) fn namespace_internal_error(message: impl Into<String>) -> LanceNamespaceError {
|
||||||
LanceNamespaceError::namespace_source(Box::new(std::io::Error::other(message.into())))
|
LanceNamespaceError::namespace_source(Box::new(std::io::Error::other(message.into())))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,9 @@ use std::collections::HashMap;
|
||||||
|
|
||||||
use lance::Dataset;
|
use lance::Dataset;
|
||||||
use lance_namespace::Error as LanceNamespaceError;
|
use lance_namespace::Error as LanceNamespaceError;
|
||||||
use lance_namespace::models::{CreateTableVersionRequest, TableVersion};
|
use lance_namespace::models::CreateTableVersionRequest;
|
||||||
|
#[cfg(test)]
|
||||||
|
use lance_namespace::models::TableVersion;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::error::{OmniError, Result};
|
use crate::error::{OmniError, Result};
|
||||||
|
|
@ -142,6 +144,7 @@ impl TableVersionMetadata {
|
||||||
self.to_namespace_version_with_details(version, None, None)
|
self.to_namespace_version_with_details(version, None, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
pub(super) fn to_namespace_version_with_details(
|
pub(super) fn to_namespace_version_with_details(
|
||||||
&self,
|
&self,
|
||||||
version: u64,
|
version: u64,
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,10 @@
|
||||||
|
// Both the read namespace (BranchManifestNamespace) and the write namespace
|
||||||
|
// (StagedTableNamespace) are now test-only contract validation. Reads open
|
||||||
|
// sub-tables directly by location+version (SubTableEntry::open, Fix 2), and
|
||||||
|
// writes open the table head directly by URI (TableStore::open_dataset_head,
|
||||||
|
// RFC-013 step 3a), so nothing in production routes through the Lance namespace
|
||||||
|
// anymore. These impls are retained only to validate the LanceNamespace
|
||||||
|
// contract in unit tests.
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
@ -16,30 +23,21 @@ use object_store::{
|
||||||
|
|
||||||
use crate::error::{OmniError, Result};
|
use crate::error::{OmniError, Result};
|
||||||
|
|
||||||
use super::layout::{namespace_internal_error, table_uri_for_path};
|
use super::layout::{
|
||||||
#[cfg(test)]
|
namespace_internal_error, open_manifest_dataset, table_id_to_key, table_uri_for_path,
|
||||||
use super::layout::{open_manifest_dataset, table_id_to_key};
|
};
|
||||||
use super::metadata::TableVersionMetadata;
|
use super::metadata::{
|
||||||
#[cfg(test)]
|
TableVersionMetadata, namespace_version_metadata, parse_namespace_version_request,
|
||||||
use super::metadata::{namespace_version_metadata, parse_namespace_version_request};
|
};
|
||||||
#[cfg(test)]
|
|
||||||
use super::publisher::GraphNamespacePublisher;
|
use super::publisher::GraphNamespacePublisher;
|
||||||
// The read namespace (BranchManifestNamespace) is test-only since Fix 2: reads
|
|
||||||
// open sub-tables directly by location+version (SubTableEntry::open), so nothing
|
|
||||||
// in production routes a read through the Lance namespace. The writes path uses
|
|
||||||
// StagedTableNamespace. These items are retained to validate the namespace
|
|
||||||
// contract in unit tests.
|
|
||||||
#[cfg(test)]
|
|
||||||
use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state};
|
use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state};
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct BranchManifestNamespace {
|
struct BranchManifestNamespace {
|
||||||
root_uri: String,
|
root_uri: String,
|
||||||
branch: Option<String>,
|
branch: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
impl BranchManifestNamespace {
|
impl BranchManifestNamespace {
|
||||||
fn new(root_uri: &str, branch: Option<&str>) -> Self {
|
fn new(root_uri: &str, branch: Option<&str>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|
@ -146,7 +144,6 @@ impl StagedTableNamespace {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub(crate) fn branch_manifest_namespace(
|
pub(crate) fn branch_manifest_namespace(
|
||||||
root_uri: &str,
|
root_uri: &str,
|
||||||
branch: Option<&str>,
|
branch: Option<&str>,
|
||||||
|
|
@ -165,27 +162,6 @@ pub(crate) fn staged_table_namespace(
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn load_table_from_namespace(
|
|
||||||
namespace: Arc<dyn LanceNamespace>,
|
|
||||||
table_key: &str,
|
|
||||||
branch: Option<&str>,
|
|
||||||
version: Option<u64>,
|
|
||||||
) -> Result<Dataset> {
|
|
||||||
let builder = DatasetBuilder::from_namespace(namespace, vec![table_key.to_string()])
|
|
||||||
.await
|
|
||||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
|
||||||
let builder = match (branch, version) {
|
|
||||||
(Some(branch), version) => builder.with_branch(branch, version),
|
|
||||||
(None, Some(version)) => builder.with_version(version),
|
|
||||||
(None, None) => builder,
|
|
||||||
};
|
|
||||||
builder
|
|
||||||
.load()
|
|
||||||
.await
|
|
||||||
.map_err(|e| OmniError::Lance(e.to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl LanceNamespace for BranchManifestNamespace {
|
impl LanceNamespace for BranchManifestNamespace {
|
||||||
fn namespace_id(&self) -> String {
|
fn namespace_id(&self) -> String {
|
||||||
|
|
@ -540,18 +516,3 @@ impl LanceNamespace for StagedTableNamespace {
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn open_table_head_for_write(
|
|
||||||
root_uri: &str,
|
|
||||||
table_key: &str,
|
|
||||||
table_path: &str,
|
|
||||||
branch: Option<&str>,
|
|
||||||
) -> Result<Dataset> {
|
|
||||||
load_table_from_namespace(
|
|
||||||
staged_table_namespace(root_uri, table_key, table_path, branch),
|
|
||||||
table_key,
|
|
||||||
branch,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
|
||||||
use lance_table::rowids::{RowIdSequence, write_row_ids};
|
use lance_table::rowids::{RowIdSequence, write_row_ids};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
|
use crate::db::manifest::TableVersionMetadata;
|
||||||
use crate::db::{Snapshot, SubTableEntry};
|
use crate::db::{Snapshot, SubTableEntry};
|
||||||
use crate::error::{OmniError, Result};
|
use crate::error::{OmniError, Result};
|
||||||
use crate::storage_layer::ForkOutcome;
|
use crate::storage_layer::ForkOutcome;
|
||||||
|
|
@ -160,9 +160,15 @@ impl TableStore {
|
||||||
dataset_uri: &str,
|
dataset_uri: &str,
|
||||||
branch: Option<&str>,
|
branch: Option<&str>,
|
||||||
) -> Result<Dataset> {
|
) -> Result<Dataset> {
|
||||||
let ds = Dataset::open(dataset_uri)
|
// Direct open by URI (O(1) latest-resolution). Routed through the tracked
|
||||||
.await
|
// opener so a cost test counts it via the per-query `table_wrapper`
|
||||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
// (no-op in production — the task-local is unset, so this is exactly
|
||||||
|
// `Dataset::open(uri)`).
|
||||||
|
let ds = crate::instrumentation::open_dataset_tracked(
|
||||||
|
dataset_uri,
|
||||||
|
crate::instrumentation::table_wrapper(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
match branch {
|
match branch {
|
||||||
Some(branch) if branch != "main" => ds
|
Some(branch) if branch != "main" => ds
|
||||||
.checkout_branch(branch)
|
.checkout_branch(branch)
|
||||||
|
|
@ -178,8 +184,14 @@ impl TableStore {
|
||||||
dataset_uri: &str,
|
dataset_uri: &str,
|
||||||
branch: Option<&str>,
|
branch: Option<&str>,
|
||||||
) -> Result<Dataset> {
|
) -> Result<Dataset> {
|
||||||
let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
|
// RFC-013 step 3a: open writes via the direct opener (O(1)) instead of the
|
||||||
open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
|
// lance-namespace builder, which re-resolved the table's version chain
|
||||||
|
// O(depth) per write. The namespace is a catalog/discovery layer, not a
|
||||||
|
// per-open hot-path component (RFC §2.4); the manifest already holds the
|
||||||
|
// location, and `ensure_expected_version` still validates head == pinned
|
||||||
|
// for strict ops. `table_key` retained for signature stability.
|
||||||
|
let _ = table_key;
|
||||||
|
self.open_dataset_head(dataset_uri, branch).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
|
pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
|
||||||
|
|
|
||||||
360
crates/omnigraph/tests/helpers/cost.rs
Normal file
360
crates/omnigraph/tests/helpers/cost.rs
Normal file
|
|
@ -0,0 +1,360 @@
|
||||||
|
//! Shared cost-budget test harness (RFC-013) — the single place the IO-counting
|
||||||
|
//! plumbing lives, so `warm_read_cost.rs`, `write_cost.rs`, and the S3 variant
|
||||||
|
//! assert in one vocabulary instead of duplicating `probes()` + raw `IOTracker`
|
||||||
|
//! reads. Three clean abstractions: structured counts, a `measure` primitive, a
|
||||||
|
//! named flat-assertion, plus store-agnostic backend fixtures.
|
||||||
|
//!
|
||||||
|
//! The data-table wrapper is a **path-classifying** counter (`PrefixCounter`), not a
|
||||||
|
//! plain `IOTracker`: it splits each read into the **opener** term (latest-version
|
||||||
|
//! resolution — reads of `_versions/`/`.manifest` objects) vs the **scan** term
|
||||||
|
//! (data-fragment reads, `data/`/`*.lance`). That lets a cost test isolate the
|
||||||
|
//! opener (RFC-013 step 3a's target, O(1) after the bypass) from the merge-insert/RI
|
||||||
|
//! scan (O(fragment-count), compaction's domain) even though both ride the same
|
||||||
|
//! `Dataset` — without controlling the fixture (no compaction needed). `__manifest`
|
||||||
|
//! and `_graph_commits` keep the plain `IOTracker` (no sub-prefixes worth splitting).
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
use lance::io::WrappingObjectStore;
|
||||||
|
use lance_io::utils::tracking_store::IOTracker;
|
||||||
|
use object_store::path::Path;
|
||||||
|
use object_store::{
|
||||||
|
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
|
||||||
|
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
|
||||||
|
};
|
||||||
|
|
||||||
|
use omnigraph::db::Omnigraph;
|
||||||
|
use omnigraph::instrumentation::{
|
||||||
|
MergeWriteProbes, QueryIoProbes, with_merge_write_probes, with_query_io_probes,
|
||||||
|
};
|
||||||
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||||
|
|
||||||
|
use super::{MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, init_and_load, mixed_params};
|
||||||
|
|
||||||
|
/// Object-store op counts for one measured operation, by table class — the
|
||||||
|
/// vocabulary cost tests assert in (vs raw `IOTracker::stats().read_iops`).
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub struct IoCounts {
|
||||||
|
/// Per-table DATA opens (node/edge tables). The dominant write-path term.
|
||||||
|
pub data_reads: u64,
|
||||||
|
pub data_writes: u64,
|
||||||
|
/// DATA-table reads attributed to latest-version resolution (`_versions/`,
|
||||||
|
/// `.manifest`). This is the **opener** term step 3a flattened — isolated from
|
||||||
|
/// the scan, so it can be gated directly without compacting the fixture.
|
||||||
|
pub data_opener_reads: u64,
|
||||||
|
/// DATA-table reads attributed to data fragments (`data/`, `*.lance`) — the
|
||||||
|
/// merge-insert/RI **scan**, which grows with fragment count (compaction's
|
||||||
|
/// domain, not the opener).
|
||||||
|
pub data_scan_reads: u64,
|
||||||
|
/// `__manifest` registry scans (publish state).
|
||||||
|
pub manifest_reads: u64,
|
||||||
|
/// `_graph_commits` lineage scans.
|
||||||
|
pub commit_graph_reads: u64,
|
||||||
|
/// Version-probe invocations (the cheap freshness check).
|
||||||
|
pub version_probes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IoCounts {
|
||||||
|
pub fn total_reads(&self) -> u64 {
|
||||||
|
self.data_reads + self.manifest_reads + self.commit_graph_reads
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Which staged-write primitives an operation invoked (from `MergeWriteProbes`).
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub struct StagedCounts {
|
||||||
|
pub stage_append: u64,
|
||||||
|
pub stage_merge_insert: u64,
|
||||||
|
pub create_vector_index: u64,
|
||||||
|
pub scan_staged_combined: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Path-classifying data-table read counter ──
|
||||||
|
|
||||||
|
/// How a data-table object read is attributed.
|
||||||
|
enum ReadClass {
|
||||||
|
/// Latest-version resolution: `_versions/`, `.manifest`, `_latest`.
|
||||||
|
Opener,
|
||||||
|
/// Data fragments: `data/`, `*.lance`.
|
||||||
|
Scan,
|
||||||
|
/// Anything else (indices, deletion files, …) — counted in the total only.
|
||||||
|
Other,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Classify a Lance object path by its role in a write open. Lance's on-object
|
||||||
|
/// layout is identical on local FS and S3, so this split is backend-independent.
|
||||||
|
fn classify(path: &Path) -> ReadClass {
|
||||||
|
let p = path.as_ref();
|
||||||
|
if p.contains("_versions") || p.ends_with(".manifest") || p.contains("_latest") {
|
||||||
|
ReadClass::Opener
|
||||||
|
} else if p.contains("/data/") || p.starts_with("data/") || p.ends_with(".lance") {
|
||||||
|
ReadClass::Scan
|
||||||
|
} else {
|
||||||
|
ReadClass::Other
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone, Copy)]
|
||||||
|
struct PrefixCounts {
|
||||||
|
reads: u64,
|
||||||
|
writes: u64,
|
||||||
|
opener_reads: u64,
|
||||||
|
scan_reads: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `WrappingObjectStore` that counts reads/writes and attributes each read to the
|
||||||
|
/// opener vs scan term by object-key prefix. Shares its tally via `Arc<Mutex<…>>` so
|
||||||
|
/// the wrapped store (handed to Lance) and the test read the same counters.
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
struct PrefixCounter(Arc<Mutex<PrefixCounts>>);
|
||||||
|
|
||||||
|
impl PrefixCounter {
|
||||||
|
fn record_read(&self, location: &Path) {
|
||||||
|
let mut c = self.0.lock().unwrap();
|
||||||
|
c.reads += 1;
|
||||||
|
match classify(location) {
|
||||||
|
ReadClass::Opener => c.opener_reads += 1,
|
||||||
|
ReadClass::Scan => c.scan_reads += 1,
|
||||||
|
ReadClass::Other => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_write(&self) {
|
||||||
|
self.0.lock().unwrap().writes += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self) -> PrefixCounts {
|
||||||
|
*self.0.lock().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WrappingObjectStore for PrefixCounter {
|
||||||
|
fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
|
||||||
|
Arc::new(PrefixCountingStore {
|
||||||
|
target,
|
||||||
|
counter: self.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The wrapped `ObjectStore` that records each call into a [`PrefixCounter`].
|
||||||
|
/// Implements only the required core `ObjectStore` methods (object_store 0.13: the
|
||||||
|
/// convenience surface — `get`/`put`/`head`/`get_range`/… — lives on
|
||||||
|
/// `ObjectStoreExt` and is provided by a blanket impl that routes through `get_opts`
|
||||||
|
/// / `put_opts`, so every read/write is still counted here). Per-read path
|
||||||
|
/// classification is the only addition over a plain pass-through.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct PrefixCountingStore {
|
||||||
|
target: Arc<dyn ObjectStore>,
|
||||||
|
counter: PrefixCounter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for PrefixCountingStore {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "PrefixCountingStore({})", self.target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ObjectStore for PrefixCountingStore {
|
||||||
|
async fn put_opts(
|
||||||
|
&self,
|
||||||
|
location: &Path,
|
||||||
|
payload: PutPayload,
|
||||||
|
opts: PutOptions,
|
||||||
|
) -> OSResult<PutResult> {
|
||||||
|
self.counter.record_write();
|
||||||
|
self.target.put_opts(location, payload, opts).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn put_multipart_opts(
|
||||||
|
&self,
|
||||||
|
location: &Path,
|
||||||
|
opts: PutMultipartOptions,
|
||||||
|
) -> OSResult<Box<dyn MultipartUpload>> {
|
||||||
|
self.counter.record_write();
|
||||||
|
self.target.put_multipart_opts(location, opts).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
|
||||||
|
self.counter.record_read(location);
|
||||||
|
self.target.get_opts(location, options).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delete_stream(
|
||||||
|
&self,
|
||||||
|
locations: BoxStream<'static, OSResult<Path>>,
|
||||||
|
) -> BoxStream<'static, OSResult<Path>> {
|
||||||
|
self.target.delete_stream(locations)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
|
||||||
|
self.counter.record_read(&prefix.cloned().unwrap_or_default());
|
||||||
|
self.target.list(prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_with_offset(
|
||||||
|
&self,
|
||||||
|
prefix: Option<&Path>,
|
||||||
|
offset: &Path,
|
||||||
|
) -> BoxStream<'static, OSResult<ObjectMeta>> {
|
||||||
|
self.counter.record_read(&prefix.cloned().unwrap_or_default());
|
||||||
|
self.target.list_with_offset(prefix, offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
|
||||||
|
self.counter.record_read(&prefix.cloned().unwrap_or_default());
|
||||||
|
self.target.list_with_delimiter(prefix).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> OSResult<()> {
|
||||||
|
self.counter.record_write();
|
||||||
|
self.target.copy_opts(from, to, options).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The tracker handles backing one measurement; read once into [`IoCounts`].
|
||||||
|
struct ProbeHandles {
|
||||||
|
manifest: IOTracker,
|
||||||
|
commit_graph: IOTracker,
|
||||||
|
table: PrefixCounter,
|
||||||
|
probe_count: Arc<AtomicU64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProbeHandles {
|
||||||
|
fn install() -> (QueryIoProbes, Self) {
|
||||||
|
let h = ProbeHandles {
|
||||||
|
manifest: IOTracker::default(),
|
||||||
|
commit_graph: IOTracker::default(),
|
||||||
|
table: PrefixCounter::default(),
|
||||||
|
probe_count: Arc::new(AtomicU64::new(0)),
|
||||||
|
};
|
||||||
|
let probes = QueryIoProbes {
|
||||||
|
manifest_wrapper: Some(Arc::new(h.manifest.clone()) as Arc<dyn WrappingObjectStore>),
|
||||||
|
commit_graph_wrapper: Some(
|
||||||
|
Arc::new(h.commit_graph.clone()) as Arc<dyn WrappingObjectStore>
|
||||||
|
),
|
||||||
|
table_wrapper: Some(Arc::new(h.table.clone()) as Arc<dyn WrappingObjectStore>),
|
||||||
|
probe_count: Arc::clone(&h.probe_count),
|
||||||
|
};
|
||||||
|
(probes, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn counts(&self) -> IoCounts {
|
||||||
|
let t = self.table.snapshot();
|
||||||
|
IoCounts {
|
||||||
|
data_reads: t.reads,
|
||||||
|
data_writes: t.writes,
|
||||||
|
data_opener_reads: t.opener_reads,
|
||||||
|
data_scan_reads: t.scan_reads,
|
||||||
|
manifest_reads: self.manifest.stats().read_iops,
|
||||||
|
commit_graph_reads: self.commit_graph.stats().read_iops,
|
||||||
|
version_probes: self.probe_count.load(Ordering::Relaxed),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run `op` under object-store IO counting; return its output + the counts.
|
||||||
|
/// The only place the `QueryIoProbes` task-local + tracker wiring lives.
|
||||||
|
pub async fn measure<F: Future>(op: F) -> (F::Output, IoCounts) {
|
||||||
|
let (probes, handles) = ProbeHandles::install();
|
||||||
|
let out = with_query_io_probes(probes, op).await;
|
||||||
|
(out, handles.counts())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Like [`measure`], but also capture which staged-write primitives ran
|
||||||
|
/// (composes the two task-locals cleanly).
|
||||||
|
pub async fn measure_with_staged<F: Future>(op: F) -> (F::Output, IoCounts, StagedCounts) {
|
||||||
|
let (probes, handles) = ProbeHandles::install();
|
||||||
|
let merge = MergeWriteProbes::default();
|
||||||
|
let out = with_merge_write_probes(merge.clone(), with_query_io_probes(probes, op)).await;
|
||||||
|
let staged = StagedCounts {
|
||||||
|
stage_append: merge.stage_append_calls(),
|
||||||
|
stage_merge_insert: merge.stage_merge_insert_calls(),
|
||||||
|
create_vector_index: merge.create_vector_index_calls(),
|
||||||
|
scan_staged_combined: merge.scan_staged_combined_calls(),
|
||||||
|
};
|
||||||
|
(out, handles.counts(), staged)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Assert a per-depth metric is flat: the deepest sample must not exceed the
|
||||||
|
/// shallowest by more than `slack`. `select` picks the field; `what` names it in
|
||||||
|
/// the failure message. The shape every depth-swept cost gate uses.
|
||||||
|
pub fn assert_flat(
|
||||||
|
curve: &[(u64, IoCounts)],
|
||||||
|
select: impl Fn(&IoCounts) -> u64,
|
||||||
|
slack: u64,
|
||||||
|
what: &str,
|
||||||
|
) {
|
||||||
|
assert!(curve.len() >= 2, "assert_flat needs >= 2 depth points");
|
||||||
|
let (d_lo, lo) = (curve[0].0, select(&curve[0].1));
|
||||||
|
let (d_hi, hi) = (curve[curve.len() - 1].0, select(&curve[curve.len() - 1].1));
|
||||||
|
assert!(
|
||||||
|
hi <= lo + slack,
|
||||||
|
"{what} grew with history: depth {d_lo} = {lo} -> depth {d_hi} = {hi} (slack {slack})"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Assert a per-depth metric *does* grow with history by at least `min_delta` — the
|
||||||
|
/// dual of [`assert_flat`], used to prove a term is genuinely history-dependent (so a
|
||||||
|
/// flat sibling term isn't flat merely because nothing was measured).
|
||||||
|
pub fn assert_grows(
|
||||||
|
curve: &[(u64, IoCounts)],
|
||||||
|
select: impl Fn(&IoCounts) -> u64,
|
||||||
|
min_delta: u64,
|
||||||
|
what: &str,
|
||||||
|
) {
|
||||||
|
assert!(curve.len() >= 2, "assert_grows needs >= 2 depth points");
|
||||||
|
let (d_lo, lo) = (curve[0].0, select(&curve[0].1));
|
||||||
|
let (d_hi, hi) = (curve[curve.len() - 1].0, select(&curve[curve.len() - 1].1));
|
||||||
|
assert!(
|
||||||
|
hi >= lo + min_delta,
|
||||||
|
"{what} did not grow as expected: depth {d_lo} = {lo} -> depth {d_hi} = {hi} (min delta {min_delta})"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Measure one committing `insert_person` to `main` — the canonical write the cost
|
||||||
|
/// gates sweep over commit-history depth. Shared by `write_cost.rs` and
|
||||||
|
/// `write_cost_s3.rs` so the measured write is defined once.
|
||||||
|
pub async fn measure_insert(db: &mut Omnigraph, tag: &str) -> IoCounts {
|
||||||
|
let (res, io) = measure(db.mutate(
|
||||||
|
"main",
|
||||||
|
MUTATION_QUERIES,
|
||||||
|
"insert_person",
|
||||||
|
&mixed_params(&[("$name", tag)], &[("$age", 30)]),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
res.unwrap();
|
||||||
|
io
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Backend fixtures — one knob, store-agnostic body ──
|
||||||
|
|
||||||
|
/// Local tempdir graph (default; deterministic, every-PR).
|
||||||
|
pub async fn local_graph(dir: &tempfile::TempDir) -> Omnigraph {
|
||||||
|
init_and_load(dir).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Emulated-S3 graph, bucket-gated. Returns `None` **only** when
|
||||||
|
/// `OMNIGRAPH_S3_TEST_BUCKET` is unset, so the caller logs + skips — the
|
||||||
|
/// `tests/s3_storage.rs` graceful-skip pattern. Once the bucket *is* configured
|
||||||
|
/// (the rustfs CI job), any `init`/seed failure is a real failure and panics
|
||||||
|
/// rather than silently skipping — otherwise a down/misconfigured store would let
|
||||||
|
/// a bucket-gated gate pass vacuously. `name` disambiguates the prefix.
|
||||||
|
pub async fn s3_graph(name: &str) -> Option<Omnigraph> {
|
||||||
|
let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
|
||||||
|
let uri = format!("s3://{bucket}/cost-tests/{name}-{}", std::process::id());
|
||||||
|
let mut db = Omnigraph::init(&uri, TEST_SCHEMA)
|
||||||
|
.await
|
||||||
|
.expect("OMNIGRAPH_S3_TEST_BUCKET is set but S3 graph init failed");
|
||||||
|
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
|
||||||
|
.await
|
||||||
|
.expect("OMNIGRAPH_S3_TEST_BUCKET is set but S3 seed load failed");
|
||||||
|
Some(db)
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
pub mod cost;
|
||||||
pub mod recovery;
|
pub mod recovery;
|
||||||
|
|
||||||
use arrow_array::{Array, RecordBatch, StringArray};
|
use arrow_array::{Array, RecordBatch, StringArray};
|
||||||
|
|
|
||||||
|
|
@ -1,51 +1,22 @@
|
||||||
//! Cost-budget tests for the warm read path (Fix 1): a warm same-branch read
|
//! Cost-budget tests for the warm read path (Fix 1): a warm same-branch read
|
||||||
//! must perform no manifest or commit-graph opens, measured with Lance's
|
//! must perform no manifest or commit-graph opens, measured via the shared
|
||||||
//! `IOTracker` at the object-store boundary (the LanceDB IO-counted-test
|
//! `helpers::cost` harness at the object-store boundary (the LanceDB
|
||||||
//! pattern; see docs/dev/testing.md). Guards invariant 15 (read cost bounded by
|
//! IO-counted-test pattern; see docs/dev/testing.md). Guards invariant 15 (read
|
||||||
//! work, not history) for snapshot resolution, and invariant 6 (a warm reader
|
//! cost bounded by work, not history) for snapshot resolution, and invariant 6
|
||||||
//! still observes external commits).
|
//! (a warm reader still observes external commits).
|
||||||
|
|
||||||
mod helpers;
|
mod helpers;
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
|
|
||||||
use arrow_array::{Array, StringArray};
|
use arrow_array::{Array, StringArray};
|
||||||
use lance::io::WrappingObjectStore;
|
|
||||||
use lance_io::utils::tracking_store::IOTracker;
|
|
||||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||||
use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes};
|
|
||||||
use omnigraph_compiler::result::QueryResult;
|
use omnigraph_compiler::result::QueryResult;
|
||||||
|
|
||||||
|
use helpers::cost::measure;
|
||||||
use helpers::{
|
use helpers::{
|
||||||
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
|
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
|
||||||
mutate_branch, mutate_main, params,
|
mutate_branch, mutate_main, params,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// IO probes plus the tracker handles to read `read_iops` after the query.
|
|
||||||
/// Returns `(probes, manifest, commit_graph, table, probe_count)` — `table`
|
|
||||||
/// counts per-table data opens (the cache-miss path), so a cost test can assert
|
|
||||||
/// N opens on a cold read and 0 on a warm repeat (Fix 3).
|
|
||||||
fn probes() -> (
|
|
||||||
QueryIoProbes,
|
|
||||||
IOTracker,
|
|
||||||
IOTracker,
|
|
||||||
IOTracker,
|
|
||||||
Arc<AtomicU64>,
|
|
||||||
) {
|
|
||||||
let manifest = IOTracker::default();
|
|
||||||
let commit_graph = IOTracker::default();
|
|
||||||
let table = IOTracker::default();
|
|
||||||
let probe_count = Arc::new(AtomicU64::new(0));
|
|
||||||
let probes = QueryIoProbes {
|
|
||||||
manifest_wrapper: Some(Arc::new(manifest.clone()) as Arc<dyn WrappingObjectStore>),
|
|
||||||
commit_graph_wrapper: Some(Arc::new(commit_graph.clone()) as Arc<dyn WrappingObjectStore>),
|
|
||||||
table_wrapper: Some(Arc::new(table.clone()) as Arc<dyn WrappingObjectStore>),
|
|
||||||
probe_count: Arc::clone(&probe_count),
|
|
||||||
};
|
|
||||||
(probes, manifest, commit_graph, table, probe_count)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn first_column_strings(result: &QueryResult) -> Vec<String> {
|
fn first_column_strings(result: &QueryResult) -> Vec<String> {
|
||||||
if result.num_rows() == 0 {
|
if result.num_rows() == 0 {
|
||||||
return Vec::new();
|
return Vec::new();
|
||||||
|
|
@ -75,18 +46,14 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
|
||||||
// Deep history: warm-read resolution cost must be flat in commit count.
|
// Deep history: warm-read resolution cost must be flat in commit count.
|
||||||
commit_many(&mut db, 20).await;
|
commit_many(&mut db, 20).await;
|
||||||
|
|
||||||
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
|
let (out, io) = measure(db.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("main"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
db.query(
|
"total_people",
|
||||||
ReadTarget::branch("main"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"total_people",
|
.await;
|
||||||
¶ms(&[]),
|
out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// A warm same-branch read opens nothing from the internal tables, even at
|
// A warm same-branch read opens nothing from the internal tables, even at
|
||||||
// commit-history depth. Fix 1 reuses the coordinator (no re-open: 0
|
// commit-history depth. Fix 1 reuses the coordinator (no re-open: 0
|
||||||
|
|
@ -95,18 +62,15 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
|
||||||
// per-table __manifest scan is gone too. Pre-fix, each of these is a deep scan
|
// per-table __manifest scan is gone too. Pre-fix, each of these is a deep scan
|
||||||
// of an internal table that grows with commit count.
|
// of an internal table that grows with commit count.
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
manifest.stats().read_iops,
|
io.manifest_reads, 0,
|
||||||
0,
|
|
||||||
"warm same-branch read must not scan __manifest (resolution or per-table)"
|
"warm same-branch read must not scan __manifest (resolution or per-table)"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
commit_graph.stats().read_iops,
|
io.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"warm same-branch read must not open the commit graph (no coordinator re-open)"
|
"warm same-branch read must not open the commit graph (no coordinator re-open)"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe_count.load(Ordering::Relaxed),
|
io.version_probes, 1,
|
||||||
1,
|
|
||||||
"warm same-branch read performs exactly one version probe"
|
"warm same-branch read performs exactly one version probe"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -121,22 +85,17 @@ async fn multi_table_query_does_no_manifest_scans() {
|
||||||
let dir = tempfile::tempdir().unwrap();
|
let dir = tempfile::tempdir().unwrap();
|
||||||
let db = init_and_load(&dir).await;
|
let db = init_and_load(&dir).await;
|
||||||
|
|
||||||
let (probes_in, manifest, _commit_graph, _table, _probe) = probes();
|
let (out, io) = measure(db.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("main"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
db.query(
|
"age_stats",
|
||||||
ReadTarget::branch("main"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"age_stats",
|
.await;
|
||||||
¶ms(&[]),
|
out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
manifest.stats().read_iops,
|
io.manifest_reads, 0,
|
||||||
0,
|
|
||||||
"a multi-table read must not scan __manifest once per touched table"
|
"a multi-table read must not scan __manifest once per touched table"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -278,32 +237,25 @@ async fn warm_branch_read_does_no_manifest_scans() {
|
||||||
// Bind the handle's coordinator to the branch so reads of it take the warm path.
|
// Bind the handle's coordinator to the branch so reads of it take the warm path.
|
||||||
db.sync_branch("feature").await.unwrap();
|
db.sync_branch("feature").await.unwrap();
|
||||||
|
|
||||||
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
|
let (out, io) = measure(db.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("feature"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
db.query(
|
"total_people",
|
||||||
ReadTarget::branch("feature"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"total_people",
|
.await;
|
||||||
¶ms(&[]),
|
out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
manifest.stats().read_iops,
|
io.manifest_reads, 0,
|
||||||
0,
|
|
||||||
"warm branch read must not scan __manifest (branch-owned table opened by location)"
|
"warm branch read must not scan __manifest (branch-owned table opened by location)"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
commit_graph.stats().read_iops,
|
io.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"warm branch read must not open the commit graph"
|
"warm branch read must not open the commit graph"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe_count.load(Ordering::Relaxed),
|
io.version_probes, 1,
|
||||||
1,
|
|
||||||
"warm branch read performs exactly one version probe"
|
"warm branch read performs exactly one version probe"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -369,18 +321,14 @@ async fn warm_read_on_recreated_branch_observes_new_incarnation() {
|
||||||
"test setup must exercise branch incarnation reuse at one Lance version"
|
"test setup must exercise branch incarnation reuse at one Lance version"
|
||||||
);
|
);
|
||||||
|
|
||||||
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
|
let (new_feature, io) = measure(reader.query(
|
||||||
let new_feature = with_query_io_probes(
|
ReadTarget::branch("feature"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
reader.query(
|
"get_person",
|
||||||
ReadTarget::branch("feature"),
|
¶ms(&[("$name", "MainOnly")]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"get_person",
|
.await;
|
||||||
¶ms(&[("$name", "MainOnly")]),
|
let new_feature = new_feature.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
new_feature.num_rows(),
|
new_feature.num_rows(),
|
||||||
|
|
@ -388,17 +336,15 @@ async fn warm_read_on_recreated_branch_observes_new_incarnation() {
|
||||||
"warm reader must refresh to the recreated branch incarnation"
|
"warm reader must refresh to the recreated branch incarnation"
|
||||||
);
|
);
|
||||||
assert!(
|
assert!(
|
||||||
manifest.stats().read_iops > 0,
|
io.manifest_reads > 0,
|
||||||
"recreated branch must re-read the manifest after the incarnation probe"
|
"recreated branch must re-read the manifest after the incarnation probe"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
commit_graph.stats().read_iops,
|
io.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"same-branch incarnation refresh must be manifest-only"
|
"same-branch incarnation refresh must be manifest-only"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe_count.load(Ordering::Relaxed),
|
io.version_probes, 2,
|
||||||
2,
|
|
||||||
"stale same-branch read probes once under the read lock and once under the write lock"
|
"stale same-branch read probes once under the read lock and once under the write lock"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -469,39 +415,33 @@ async fn recreated_branch_owned_table_handle_uses_table_etag() {
|
||||||
"test setup must force table handle identity to differ only by e_tag"
|
"test setup must force table handle identity to differ only by e_tag"
|
||||||
);
|
);
|
||||||
|
|
||||||
let (probes_in, manifest, commit_graph, table, probe_count) = probes();
|
let (new_person, io) = measure(reader.query(
|
||||||
let new_person = with_query_io_probes(
|
ReadTarget::branch("feature"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
reader.query(
|
"get_person",
|
||||||
ReadTarget::branch("feature"),
|
¶ms(&[("$name", "NewOnly")]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"get_person",
|
.await;
|
||||||
¶ms(&[("$name", "NewOnly")]),
|
let new_person = new_person.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
new_person.num_rows(),
|
new_person.num_rows(),
|
||||||
1,
|
1,
|
||||||
"warm reader must open the recreated branch-owned table incarnation"
|
"warm reader must open the recreated branch-owned table incarnation"
|
||||||
);
|
);
|
||||||
assert!(
|
assert!(
|
||||||
table.stats().read_iops > 0,
|
io.data_reads > 0,
|
||||||
"table e_tag must force a held-handle cache miss for the recreated table"
|
"table e_tag must force a held-handle cache miss for the recreated table"
|
||||||
);
|
);
|
||||||
assert!(
|
assert!(
|
||||||
manifest.stats().read_iops > 0,
|
io.manifest_reads > 0,
|
||||||
"recreated branch must refresh the manifest"
|
"recreated branch must refresh the manifest"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
commit_graph.stats().read_iops,
|
io.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"same-branch table-incarnation refresh must be manifest-only"
|
"same-branch table-incarnation refresh must be manifest-only"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe_count.load(Ordering::Relaxed),
|
io.version_probes, 2,
|
||||||
2,
|
|
||||||
"stale same-branch read probes once under each lock"
|
"stale same-branch read probes once under each lock"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -594,35 +534,29 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() {
|
||||||
"test setup must force graph-index identity to differ only by snapshot incarnation"
|
"test setup must force graph-index identity to differ only by snapshot incarnation"
|
||||||
);
|
);
|
||||||
|
|
||||||
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
|
let (new_friends, io) = measure(reader.query(
|
||||||
let new_friends = with_query_io_probes(
|
ReadTarget::branch("feature"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
reader.query(
|
"friends_of",
|
||||||
ReadTarget::branch("feature"),
|
¶ms(&[("$name", "NewWalker")]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"friends_of",
|
.await;
|
||||||
¶ms(&[("$name", "NewWalker")]),
|
let new_friends = new_friends.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
first_column_strings(&new_friends),
|
first_column_strings(&new_friends),
|
||||||
vec!["Bob"],
|
vec!["Bob"],
|
||||||
"traversal must use the recreated branch's topology, not stale cached graph index"
|
"traversal must use the recreated branch's topology, not stale cached graph index"
|
||||||
);
|
);
|
||||||
assert!(
|
assert!(
|
||||||
manifest.stats().read_iops > 0,
|
io.manifest_reads > 0,
|
||||||
"recreated branch traversal must refresh the manifest"
|
"recreated branch traversal must refresh the manifest"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
commit_graph.stats().read_iops,
|
io.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"same-branch traversal incarnation refresh must be manifest-only"
|
"same-branch traversal incarnation refresh must be manifest-only"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe_count.load(Ordering::Relaxed),
|
io.version_probes, 2,
|
||||||
2,
|
|
||||||
"stale same-branch read probes once under each lock"
|
"stale same-branch read probes once under each lock"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -673,31 +607,25 @@ async fn stale_read_refreshes_manifest_only() {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
|
let (out, io) = measure(reader.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("main"),
|
||||||
probes_in,
|
TEST_QUERIES,
|
||||||
reader.query(
|
"total_people",
|
||||||
ReadTarget::branch("main"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"total_people",
|
.await;
|
||||||
¶ms(&[]),
|
out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
manifest.stats().read_iops > 0,
|
io.manifest_reads > 0,
|
||||||
"stale read must re-read the manifest"
|
"stale read must re-read the manifest"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
commit_graph.stats().read_iops,
|
io.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"stale refresh must be manifest-only (no commit-graph scan)"
|
"stale refresh must be manifest-only (no commit-graph scan)"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe_count.load(Ordering::Relaxed),
|
io.version_probes, 2,
|
||||||
2,
|
|
||||||
"stale same-branch read probes once under the read lock and once under the write lock"
|
"stale same-branch read probes once under the read lock and once under the write lock"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -721,55 +649,40 @@ async fn repeat_warm_read_reuses_table_handles() {
|
||||||
commit_many(&mut db, 10).await;
|
commit_many(&mut db, 10).await;
|
||||||
|
|
||||||
// Cold first read: opens the touched table.
|
// Cold first read: opens the touched table.
|
||||||
let (p1, _m1, _c1, table1, _pr1) = probes();
|
let (cold_out, cold) = measure(db.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("main"),
|
||||||
p1,
|
TEST_QUERIES,
|
||||||
db.query(
|
"total_people",
|
||||||
ReadTarget::branch("main"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"total_people",
|
.await;
|
||||||
¶ms(&[]),
|
cold_out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert!(
|
assert!(
|
||||||
table1.stats().read_iops > 0,
|
cold.data_reads > 0,
|
||||||
"the cold first read must open the table"
|
"the cold first read must open the table"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Warm repeat: the held handle is reused, so no open happens through this
|
// Warm repeat: the held handle is reused, so no open happens through this
|
||||||
// query's table wrapper.
|
// query's table wrapper. A fresh `measure()` isolates the warm repeat's cost.
|
||||||
let (p2, manifest2, commit_graph2, table2, probe2) = probes();
|
let (warm_out, warm) = measure(db.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("main"),
|
||||||
p2,
|
TEST_QUERIES,
|
||||||
db.query(
|
"total_people",
|
||||||
ReadTarget::branch("main"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"total_people",
|
.await;
|
||||||
¶ms(&[]),
|
warm_out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
table2.stats().read_iops,
|
warm.data_reads, 0,
|
||||||
0,
|
|
||||||
"a warm repeat read must reuse the held handle (0 table opens)"
|
"a warm repeat read must reuse the held handle (0 table opens)"
|
||||||
);
|
);
|
||||||
|
assert_eq!(warm.manifest_reads, 0, "warm repeat read: 0 manifest opens");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
manifest2.stats().read_iops,
|
warm.commit_graph_reads, 0,
|
||||||
0,
|
|
||||||
"warm repeat read: 0 manifest opens"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
commit_graph2.stats().read_iops,
|
|
||||||
0,
|
|
||||||
"warm repeat read: 0 commit-graph opens"
|
"warm repeat read: 0 commit-graph opens"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
probe2.load(Ordering::Relaxed),
|
warm.version_probes, 1,
|
||||||
1,
|
|
||||||
"warm repeat read: exactly one version probe"
|
"warm repeat read: exactly one version probe"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -807,20 +720,16 @@ async fn write_invalidates_table_cache_for_changed_table() {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// The next read re-opens Person at the new version (cache miss).
|
// The next read re-opens Person at the new version (cache miss).
|
||||||
let (p, _m, _c, table, _pr) = probes();
|
let (out, io) = measure(db.query(
|
||||||
with_query_io_probes(
|
ReadTarget::branch("main"),
|
||||||
p,
|
TEST_QUERIES,
|
||||||
db.query(
|
"total_people",
|
||||||
ReadTarget::branch("main"),
|
¶ms(&[]),
|
||||||
TEST_QUERIES,
|
))
|
||||||
"total_people",
|
.await;
|
||||||
¶ms(&[]),
|
out.unwrap();
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert!(
|
assert!(
|
||||||
table.stats().read_iops > 0,
|
io.data_reads > 0,
|
||||||
"a read after a write to the table must re-open it (version-keyed miss)"
|
"a read after a write to the table must re-open it (version-keyed miss)"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
||||||
159
crates/omnigraph/tests/write_cost.rs
Normal file
159
crates/omnigraph/tests/write_cost.rs
Normal file
|
|
@ -0,0 +1,159 @@
|
||||||
|
//! Cost-budget tests for the WRITE path (RFC-013 step 1) — the safety/latency
|
||||||
|
//! twin of `warm_read_cost.rs`, on the shared `helpers::cost` harness. A
|
||||||
|
//! committing write's per-table opens and internal-table scans must be bounded
|
||||||
|
//! and **flat across commit-history depth**, measured at the object-store
|
||||||
|
//! boundary. Guards invariant 15 (cost bounded by work, not history) on writes.
|
||||||
|
//!
|
||||||
|
//! **Backend split (see docs/dev/testing.md / RFC-013).** This file runs on
|
||||||
|
//! **local FS** and gates the **internal-table** term (`__manifest`/`_graph_commits`
|
||||||
|
//! fragment scans, ~+18/depth — O(fragments) on any backend, step 2's target).
|
||||||
|
//!
|
||||||
|
//! The **data-table opener** term (step 3a's win) is a per-object-store-RPC
|
||||||
|
//! phenomenon and is NOT gated here: local-FS latest-resolution is cheap whether
|
||||||
|
//! the open goes through the namespace builder or direct-by-URI, so the
|
||||||
|
//! namespace→direct switch is invisible on local. Measured: the local data-table
|
||||||
|
//! read count grows with depth too (~+0.9/depth), but that is a *different* term —
|
||||||
|
//! the merge-insert/RI scan reading O(depth) **fragments**, unchanged by the
|
||||||
|
//! opener switch (depth-100 = 92 ops both before and after step 3a, same slope)
|
||||||
|
//! and reduced only by compaction. The opener term shows up only on a real object
|
||||||
|
//! store (per-version GETs, ~+12/depth → flat after step 3a), so it is gated in
|
||||||
|
//! `write_cost_s3.rs` (bucket-gated). Same `measure`/`IoCounts` harness, different
|
||||||
|
//! backend; each term gated where it actually manifests.
|
||||||
|
#![recursion_limit = "512"]
|
||||||
|
|
||||||
|
mod helpers;
|
||||||
|
|
||||||
|
use helpers::cost::{
|
||||||
|
IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_with_staged,
|
||||||
|
};
|
||||||
|
use helpers::{MUTATION_QUERIES, commit_many, mixed_params};
|
||||||
|
|
||||||
|
// ── (A) The internal-table LOCK — RED today, the acceptance test for step 2 ──
|
||||||
|
//
|
||||||
|
// `__manifest` / `_graph_commits` scans must be O(1) in commit-history depth.
|
||||||
|
// RED today (O(fragments), uncompacted). Un-ignore when step 2 (internal-table
|
||||||
|
// compaction) lands — it must go green flat. (The data-table term is the S3
|
||||||
|
// gate's, `write_cost_s3.rs`; local-FS hides it.)
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "RED until step 2 (internal-table compaction): __manifest/_graph_commits scans are O(fragments) today — RFC-013 §0/§2.2. Un-ignore there as the red→green acceptance test."]
|
||||||
|
async fn internal_table_scans_are_flat_in_history() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = local_graph(&dir).await;
|
||||||
|
|
||||||
|
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
|
||||||
|
let mut current = 0u64;
|
||||||
|
for d in [10u64, 100] {
|
||||||
|
if d > current {
|
||||||
|
commit_many(&mut db, (d - current) as usize).await;
|
||||||
|
current = d;
|
||||||
|
}
|
||||||
|
let io = measure_insert(&mut db, &format!("lock_{d}")).await;
|
||||||
|
current += 1; // the measured write advanced depth by one
|
||||||
|
eprintln!(
|
||||||
|
"depth~{d}: data={} __manifest={} _graph_commits={}",
|
||||||
|
io.data_reads, io.manifest_reads, io.commit_graph_reads
|
||||||
|
);
|
||||||
|
curve.push((d, io));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_flat(&curve, |c| c.manifest_reads, 4, "__manifest scan");
|
||||||
|
assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits scan");
|
||||||
|
}
|
||||||
|
|
||||||
|
// The data-table OPENER history-gate (opener flat across depth) lives in
|
||||||
|
// `write_cost_s3.rs` — its history-dependence is an S3-only phenomenon. But the
|
||||||
|
// *probe that isolates* the opener (the `PrefixCounter` split) is validated here,
|
||||||
|
// every-PR, on local FS:
|
||||||
|
|
||||||
|
/// Proves the `PrefixCounter` opener/scan split: a committing write's data-table
|
||||||
|
/// reads divide into a **flat opener** term and a **growing scan** term. This pins
|
||||||
|
/// (a) the classifier actually attributes reads to the opener bucket (non-zero, so a
|
||||||
|
/// flat assertion isn't vacuously flat-at-zero), and (b) the local data-table growth
|
||||||
|
/// is the merge-insert/RI fragment scan, not the opener — which is *why* the S3
|
||||||
|
/// gate asserts `data_opener_reads`, not total `data_reads`. (On local FS the opener
|
||||||
|
/// is O(1) regardless of step 3a; the opener's history-dependence is gated on S3.)
|
||||||
|
#[tokio::test]
|
||||||
|
async fn data_table_reads_split_into_flat_opener_and_growing_scan() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = local_graph(&dir).await;
|
||||||
|
|
||||||
|
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
|
||||||
|
let mut current = 0u64;
|
||||||
|
for d in [10u64, 100] {
|
||||||
|
if d > current {
|
||||||
|
commit_many(&mut db, (d - current) as usize).await;
|
||||||
|
current = d;
|
||||||
|
}
|
||||||
|
let io = measure_insert(&mut db, &format!("split_{d}")).await;
|
||||||
|
current += 1;
|
||||||
|
eprintln!(
|
||||||
|
"depth~{d}: opener={} scan={} data_total={}",
|
||||||
|
io.data_opener_reads, io.data_scan_reads, io.data_reads
|
||||||
|
);
|
||||||
|
curve.push((d, io));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
curve[0].1.data_opener_reads > 0,
|
||||||
|
"opener reads must be > 0 — the classifier missed version-resolution reads, \
|
||||||
|
so a flat opener assertion would be vacuous"
|
||||||
|
);
|
||||||
|
assert_flat(&curve, |c| c.data_opener_reads, 4, "local data-table opener");
|
||||||
|
assert_grows(&curve, |c| c.data_scan_reads, 20, "local data-table scan");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── (B) Green-today regression guards — run on every PR ──
|
||||||
|
|
||||||
|
/// A single insert's *data-table* write cost is O(1): the table commit is a small
|
||||||
|
/// constant number of writes, independent of history.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn single_insert_data_write_is_bounded() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = local_graph(&dir).await;
|
||||||
|
commit_many(&mut db, 5).await;
|
||||||
|
let io = measure_insert(&mut db, "w").await;
|
||||||
|
eprintln!("single insert: data_writes={}", io.data_writes);
|
||||||
|
assert!(io.data_writes <= 4, "data-table write_iops should be a small constant, got {}", io.data_writes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// At a fixed shallow depth, the per-write object-store read count is below a
|
||||||
|
/// documented ceiling. Fails the moment a change *adds* a round-trip on the write
|
||||||
|
/// path — the "no new round-trip" guard (calibrated: ~50 at depth ~5).
|
||||||
|
#[tokio::test]
|
||||||
|
async fn write_op_count_ceiling_at_shallow_depth() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = local_graph(&dir).await;
|
||||||
|
commit_many(&mut db, 5).await;
|
||||||
|
let io = measure_insert(&mut db, "ceil").await;
|
||||||
|
eprintln!(
|
||||||
|
"depth~5: data={} __manifest={} _graph_commits={} total_reads={}",
|
||||||
|
io.data_reads, io.manifest_reads, io.commit_graph_reads, io.total_reads()
|
||||||
|
);
|
||||||
|
const CEILING: u64 = 80;
|
||||||
|
assert!(
|
||||||
|
io.total_reads() <= CEILING,
|
||||||
|
"per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added",
|
||||||
|
io.total_reads()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── (C) Fitness assert via the staged-write probes ──
|
||||||
|
|
||||||
|
/// A keyed `Person` insert routes through `stage_merge_insert` exactly once, does
|
||||||
|
/// no `stage_append`, and no inline vector-index build. Pins the structural shape.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn keyed_insert_routes_through_merge_insert_only() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = local_graph(&dir).await;
|
||||||
|
let (res, _io, staged) = measure_with_staged(db.mutate(
|
||||||
|
"main",
|
||||||
|
MUTATION_QUERIES,
|
||||||
|
"insert_person",
|
||||||
|
&mixed_params(&[("$name", "fit")], &[("$age", 30)]),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
res.unwrap();
|
||||||
|
assert_eq!(staged.stage_merge_insert, 1, "keyed Person insert stages one merge-insert");
|
||||||
|
assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append");
|
||||||
|
assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert");
|
||||||
|
}
|
||||||
71
crates/omnigraph/tests/write_cost_s3.rs
Normal file
71
crates/omnigraph/tests/write_cost_s3.rs
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
//! S3 (object-store) cost-budget gate for the WRITE path — the bucket-gated twin of
|
||||||
|
//! `write_cost.rs` that proves RFC-013 **step 3a's data-table opener win**. On the
|
||||||
|
//! shared `helpers::cost` harness (`measure`/`IoCounts`/`assert_flat`/`s3_graph`).
|
||||||
|
//!
|
||||||
|
//! The opener term is an **object-store-RPC phenomenon**: latest-version resolution
|
||||||
|
//! costs per-version GETs/HEADs on S3 (O(depth) before step 3a, when writes routed
|
||||||
|
//! through the lance-namespace builder), which local FS cannot reproduce (one cheap
|
||||||
|
//! `read_dir` regardless). After step 3a (direct-by-URI opens), the per-write
|
||||||
|
//! **data-table read count is FLAT across commit-history depth** — the measured 70%
|
||||||
|
//! win. This file is the red→green acceptance for that term (it would be RED on the
|
||||||
|
//! pre-3a `from_namespace` opener); `write_cost.rs` gates the internal-table term on
|
||||||
|
//! local every-PR.
|
||||||
|
//!
|
||||||
|
//! **Isolating the opener (important):** total `data_reads` is not opener-only — the
|
||||||
|
//! same wrapped `Dataset` backs the merge-insert/RI **scan**, which reads
|
||||||
|
//! O(fragment-count) and grows with history for a *different* reason (compaction's
|
||||||
|
//! domain, not the opener; this is the term that made the *local* data-table count
|
||||||
|
//! grow). The shared harness's `PrefixCounter` attributes each read by object-key
|
||||||
|
//! prefix, so this gate asserts `data_opener_reads` (reads of `_versions/`/`.manifest`)
|
||||||
|
//! **directly** — no compaction or fixture massaging needed. After step 3a the opener
|
||||||
|
//! is O(1) regardless of version-history depth; before it grew ~+12/depth (RFC §2.4
|
||||||
|
//! [M]). (See `write_cost.rs` for the local test that proves the split itself —
|
||||||
|
//! opener flat, scan growing.)
|
||||||
|
//!
|
||||||
|
//! Skips gracefully without `OMNIGRAPH_S3_TEST_BUCKET` (the `tests/s3_storage.rs`
|
||||||
|
//! pattern); runs for real in the rustfs CI job (`.github/workflows/ci.yml`).
|
||||||
|
#![recursion_limit = "512"]
|
||||||
|
|
||||||
|
mod helpers;
|
||||||
|
|
||||||
|
use helpers::cost::{IoCounts, assert_flat, measure_insert, s3_graph};
|
||||||
|
use helpers::commit_many;
|
||||||
|
|
||||||
|
/// After step 3a the data-table opener term is flat across depth on a real object
|
||||||
|
/// store (the measured win). RED on the pre-3a namespace-builder opener (O(depth)
|
||||||
|
/// per-version resolution).
|
||||||
|
#[tokio::test]
|
||||||
|
async fn data_table_opener_is_flat_in_history_on_s3() {
|
||||||
|
let Some(mut db) = s3_graph("write-cost-opener").await else {
|
||||||
|
eprintln!(
|
||||||
|
"SKIP data_table_opener_is_flat_in_history_on_s3: OMNIGRAPH_S3_TEST_BUCKET \
|
||||||
|
unset (or store unreachable) — the S3 opener gate needs an object store"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
|
||||||
|
let mut current = 0u64;
|
||||||
|
for d in [10u64, 50] {
|
||||||
|
if d > current {
|
||||||
|
commit_many(&mut db, (d - current) as usize).await;
|
||||||
|
current = d;
|
||||||
|
}
|
||||||
|
let io = measure_insert(&mut db, &format!("s3_{d}")).await;
|
||||||
|
current += 1;
|
||||||
|
eprintln!(
|
||||||
|
"depth~{d}: opener={} scan={} data_total={} __manifest={} _graph_commits={}",
|
||||||
|
io.data_opener_reads,
|
||||||
|
io.data_scan_reads,
|
||||||
|
io.data_reads,
|
||||||
|
io.manifest_reads,
|
||||||
|
io.commit_graph_reads
|
||||||
|
);
|
||||||
|
curve.push((d, io));
|
||||||
|
}
|
||||||
|
|
||||||
|
// The opener (latest-version resolution) is O(1) after step 3a (direct-by-URI),
|
||||||
|
// isolated from the scan by the PrefixCounter. Slack absorbs object-store variance;
|
||||||
|
// the pre-3a builder grew this ~+12/depth (RFC §2.4 [M]).
|
||||||
|
assert_flat(&curve, |c| c.data_opener_reads, 8, "S3 data-table opener");
|
||||||
|
}
|
||||||
|
|
@ -91,6 +91,7 @@ Working documents for in-flight feature work. Removed when the work lands.
|
||||||
| Restructure the CLI around explicit planes — one graph-addressing model, declared capability surface, plane-grouped help (expands RFC-009 Phase 4) | [rfc-010-cli-planes-restructure.md](rfc-010-cli-planes-restructure.md) |
|
| Restructure the CLI around explicit planes — one graph-addressing model, declared capability surface, plane-grouped help (expands RFC-009 Phase 4) | [rfc-010-cli-planes-restructure.md](rfc-010-cli-planes-restructure.md) |
|
||||||
| CLI refactoring — one addressing & config model post-`omnigraph.yaml`: scope + `--graph` + derived access path, served-default / privileged-direct, profiles, named queries, capability classifier (completes RFC-008) | [rfc-011-cli-refactoring.md](rfc-011-cli-refactoring.md) |
|
| CLI refactoring — one addressing & config model post-`omnigraph.yaml`: scope + `--graph` + derived access path, served-default / privileged-direct, profiles, named queries, capability classifier (completes RFC-008) | [rfc-011-cli-refactoring.md](rfc-011-cli-refactoring.md) |
|
||||||
| Provider-independent embedding configuration — one resolved `EmbeddingConfig` + sealed provider enum (Gemini/OpenAI/Mock), identity recorded in the schema IR, query-time same-space validation, NFR floor | [rfc-012-embedding-provider-config.md](rfc-012-embedding-provider-config.md) |
|
| Provider-independent embedding configuration — one resolved `EmbeddingConfig` + sealed provider enum (Gemini/OpenAI/Mock), identity recorded in the schema IR, query-time same-space validation, NFR floor | [rfc-012-embedding-provider-config.md](rfc-012-embedding-provider-config.md) |
|
||||||
|
| Write-path latency — capture-once `WriteTxn`, version-pinned opens, one `GraphPublishAuthority` fed declarative `PublishPlan`s, manifest-authoritative lineage, epoch fence, bounded history (compaction + cleanup), and an IO-counted cost contract (`iss-write-s3-roundtrip-amplification`, `iss-991`) | [rfc-013-write-path-latency.md](rfc-013-write-path-latency.md) |
|
||||||
|
|
||||||
## Boundary
|
## Boundary
|
||||||
|
|
||||||
|
|
|
||||||
1203
docs/dev/rfc-013-write-path-latency.md
Normal file
1203
docs/dev/rfc-013-write-path-latency.md
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -26,6 +26,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
|
||||||
| `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: <reason>` sentinel exempts reviewed lines |
|
| `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: <reason>` sentinel exempts reviewed lines |
|
||||||
| `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands |
|
| `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands |
|
||||||
| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below |
|
| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below |
|
||||||
|
| `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest`/`_graph_commits` scans flat in commit-history depth — the RED `internal_table_scans_are_flat_in_history` LOCK, `#[ignore]`'d until internal-table compaction lands) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below |
|
||||||
|
| `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary |
|
||||||
| `lifecycle.rs` | Graph lifecycle, schema state |
|
| `lifecycle.rs` | Graph lifecycle, schema state |
|
||||||
| `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) |
|
| `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) |
|
||||||
| `changes.rs` | `diff_between` / `diff_commits` |
|
| `changes.rs` | `diff_between` / `diff_commits` |
|
||||||
|
|
@ -69,9 +71,10 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
|
||||||
|
|
||||||
## RustFS / S3 integration
|
## RustFS / S3 integration
|
||||||
|
|
||||||
CI runs three S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job):
|
CI runs these S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job):
|
||||||
|
|
||||||
- `cargo test -p omnigraph-engine --test s3_storage`
|
- `cargo test -p omnigraph-engine --test s3_storage`
|
||||||
|
- `cargo test -p omnigraph-engine --test write_cost_s3` (RFC-013 step 3a's data-table opener cost gate — flat across commit depth on S3; the term local FS can't reproduce)
|
||||||
- `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot)
|
- `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot)
|
||||||
- `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket)
|
- `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket)
|
||||||
- `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow`
|
- `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow`
|
||||||
|
|
@ -126,7 +129,7 @@ When you pick up any change, walk through this:
|
||||||
6. **For substrate-touching changes** (Lance behavior), reach for `failpoints` or fixture-driven scenarios, not stubbed-out mocks.
|
6. **For substrate-touching changes** (Lance behavior), reach for `failpoints` or fixture-driven scenarios, not stubbed-out mocks.
|
||||||
7. **For server / API changes**, confirm the OpenAPI regeneration happens in `openapi.rs` and that the diff lands in `openapi.json`.
|
7. **For server / API changes**, confirm the OpenAPI regeneration happens in `openapi.rs` and that the diff lands in `openapi.json`.
|
||||||
8. **Verify your change makes an existing test fail before it makes the new one pass.** If you can break the code without breaking a test, your coverage gap is the problem to fix first.
|
8. **Verify your change makes an existing test fail before it makes the new one pass.** If you can break the code without breaking a test, your coverage gap is the problem to fix first.
|
||||||
9. **Bound hot-path cost at history depth.** If the change touches a read or open path, add or extend a test that asserts a *bounded* cost (e.g. a warm same-branch read performs zero `Dataset::open`, or a fixed object-op count) against a fixture with realistic *commit-history depth*, not just realistic row counts. Cost that scales with history is invisible on a shallow fixture and only bites in production. See "Cost-budget tests" below.
|
9. **Bound hot-path cost at history depth.** If the change touches a read, **write**, or open path, add or extend a test that asserts a *bounded* cost (e.g. a warm same-branch read performs zero `Dataset::open`, or a per-write read-op count flat across commit depth) against a fixture with realistic *commit-history depth*, not just realistic row counts. Reuse the shared `helpers::cost` harness (`measure`/`IoCounts`/`assert_flat`) — don't hand-roll `IOTracker` wiring. Cost that scales with history is invisible on a shallow fixture and only bites in production. See "Cost-budget tests" below.
|
||||||
|
|
||||||
## Cost-budget tests: bound hot-path cost at history depth
|
## Cost-budget tests: bound hot-path cost at history depth
|
||||||
|
|
||||||
|
|
@ -134,6 +137,7 @@ Correctness bugs fail loudly in tests; cost-scaling bugs pass every test and deg
|
||||||
|
|
||||||
- **Assert a cost budget, not just a result.** For a read/open path, assert the number of `Dataset::open` calls (or object-store ops) a warm query performs, and that it does not grow with commit count. The reference is LanceDB's IO-counted tests, which assert a cached read costs 0-1 IO and carry a named regression test against "a list call on every subsequent query."
|
- **Assert a cost budget, not just a result.** For a read/open path, assert the number of `Dataset::open` calls (or object-store ops) a warm query performs, and that it does not grow with commit count. The reference is LanceDB's IO-counted tests, which assert a cached read costs 0-1 IO and carry a named regression test against "a list call on every subsequent query."
|
||||||
- **Test at history depth.** Build a fixture with many *commits* (not many rows) and assert warm-read cost is flat across depths. A shallow fixture cannot catch an O(commits) cost.
|
- **Test at history depth.** Build a fixture with many *commits* (not many rows) and assert warm-read cost is flat across depths. A shallow fixture cannot catch an O(commits) cost.
|
||||||
|
- **Use the shared harness, and gate each term on the backend where it manifests.** `helpers::cost` (`measure`/`IoCounts`/`assert_flat`/`local_graph`/`s3_graph`) is the one place the `IOTracker`/task-local plumbing lives — consume it, don't duplicate it. The write path has *two distinct* depth terms that split cleanly across backends, and conflating them is a real trap (the local data-table read count grows with depth too, but for a different reason — the merge-insert/RI scan reading O(depth) *fragments*, reduced by compaction, not by the opener): (1) the **internal-table** scan term (`__manifest`/`_graph_commits` fragment scans) reproduces on **any** backend including local FS, so `write_cost.rs` gates it on local every-PR; (2) the **data-table opener** term (latest-version resolution) is a per-object-store-RPC phenomenon — local-FS resolves latest with one cheap `read_dir` regardless of the opener used, so the namespace-vs-direct difference is **invisible on local** and only shows on a real object store (per-version GETs), gated by the bucket-gated `write_cost_s3.rs`. Same harness, different fixture; each term asserted where it actually appears.
|
||||||
- This is the testing companion to invariant 15 in [docs/dev/invariants.md](invariants.md) (hot-path cost is bounded by work, not history).
|
- This is the testing companion to invariant 15 in [docs/dev/invariants.md](invariants.md) (hot-path cost is bounded by work, not history).
|
||||||
|
|
||||||
When in doubt, re-read [docs/dev/invariants.md](invariants.md) — quality gates apply to every change.
|
When in doubt, re-read [docs/dev/invariants.md](invariants.md) — quality gates apply to every change.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue