mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-30 02:49:39 +02:00
fix(optimize): skip blob-bearing tables to avoid Lance compaction crash (#138)
Some checks failed
CI / Classify Changes (push) Has been cancelled
CI / Check AGENTS.md Links (push) Has been cancelled
CI / Container Entrypoint (push) Has been cancelled
Release Edge / Prepare edge release (push) Has been cancelled
CI / Test Workspace (push) Has been cancelled
CI / Test omnigraph-server --features aws (push) Has been cancelled
CI / Test Windows release binaries (push) Has been cancelled
CI / RustFS S3 Integration (push) Has been cancelled
Release Edge / Build edge omnigraph-linux-x86_64 (push) Has been cancelled
Release Edge / Build edge omnigraph-macos-arm64 (push) Has been cancelled
Release Edge / Build edge omnigraph-windows-x86_64 (push) Has been cancelled
Release Edge / Smoke Windows installer (push) Has been cancelled
Some checks failed
CI / Classify Changes (push) Has been cancelled
CI / Check AGENTS.md Links (push) Has been cancelled
CI / Container Entrypoint (push) Has been cancelled
Release Edge / Prepare edge release (push) Has been cancelled
CI / Test Workspace (push) Has been cancelled
CI / Test omnigraph-server --features aws (push) Has been cancelled
CI / Test Windows release binaries (push) Has been cancelled
CI / RustFS S3 Integration (push) Has been cancelled
Release Edge / Build edge omnigraph-linux-x86_64 (push) Has been cancelled
Release Edge / Build edge omnigraph-macos-arm64 (push) Has been cancelled
Release Edge / Build edge omnigraph-windows-x86_64 (push) Has been cancelled
Release Edge / Smoke Windows installer (push) Has been cancelled
* test(optimize): pin Lance blob-column compaction failure as a surface guard
Lance compact_files mis-decodes blob-v2 columns under its forced BlobHandling::AllBinary read ("more fields in the schema than provided column indices"), failing even a pristine uniform-V2_2 multi-fragment blob table; reads use descriptor handling and are unaffected.
Guard 10 reproduces this and is self-retiring: it turns red on the Lance bump that fixes the bug, forcing LANCE_SUPPORTS_BLOB_COMPACTION to flip.
* fix(optimize): skip blob-bearing tables instead of crashing compaction
omnigraph optimize aborted the whole sweep when any node/edge table had a Blob property: Lance compact_files cannot decode blob-v2 columns under AllBinary (the column-index error pinned by the surface guard). Skip blob-bearing tables behind a LANCE_SUPPORTS_BLOB_COMPACTION gate and report them via TableOptimizeStats.skipped / SkipReason (surfaced in the CLI and a tracing::warn) instead of erroring, which also isolates the failure so the other tables still compact.
Reads/writes are unaffected; only fragment/space reclamation on blob tables is deferred until the upstream Lance fix. Adds a maintenance.rs regression test (validated red with the column-index symptom before the fix, green after), a concise v0.6.1 release note, and updates docs (maintenance, cli-reference, AGENTS capability matrix, invariants Known Gaps, lance.md audit, constants).
* refactor(optimize): make TableOptimizeStats and SkipReason non_exhaustive
Both are returned result types, never built by callers, so #[non_exhaustive] makes this the last field/variant addition that can break downstream literal construction and keeps future ones non-breaking (review feedback on the public-field addition). The v0.6.1 Compatibility Notes call out the source-level change.
Also drops the now-stale "RED today / GREEN after the fix lands" narration in the optimize_skips_blob_table_and_reports_skip test (historical regression context now that the fix is in this branch), and folds in the expanded v0.6.1 release note.
* chore(release): bump workspace to v0.6.1
Coherent version bump to accompany the v0.6.1 release note: all five crate manifests + path-dependency constraints, Cargo.lock, the AGENTS.md surveyed-version line, and openapi.json info.version move 0.6.0 -> 0.6.1. Matches the established release pattern (#118 landed the v0.6.0 note + bump together) and resolves the Codex/Devin review flag that a v0.6.1 note without a bump leaves CARGO_PKG_VERSION reporting 0.6.0 and mixed package versions.
This commit is contained in:
parent
3c2b1b8051
commit
d54bccb940
20 changed files with 363 additions and 47 deletions
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-cli"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
edition = "2024"
|
||||
description = "CLI for the Omnigraph graph database."
|
||||
license = "MIT"
|
||||
|
|
@ -13,10 +13,10 @@ name = "omnigraph"
|
|||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.0" }
|
||||
omnigraph-server = { path = "../omnigraph-server", version = "0.6.0" }
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
|
||||
omnigraph-server = { path = "../omnigraph-server", version = "0.6.1" }
|
||||
clap = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -3011,18 +3011,19 @@ async fn main() -> Result<()> {
|
|||
"fragments_removed": s.fragments_removed,
|
||||
"fragments_added": s.fragments_added,
|
||||
"committed": s.committed,
|
||||
"skipped": s.skipped.map(|r| r.as_str()),
|
||||
})).collect::<Vec<_>>(),
|
||||
});
|
||||
print_json(&value)?;
|
||||
} else {
|
||||
println!("optimize {} — {} tables", uri, stats.len());
|
||||
for s in &stats {
|
||||
if s.committed {
|
||||
if let Some(reason) = s.skipped {
|
||||
println!(" {:<40} skipped ({reason})", s.table_key);
|
||||
} else if s.committed {
|
||||
println!(
|
||||
" {:<40} frags {} → {} ✓",
|
||||
s.table_key,
|
||||
s.fragments_removed + s.fragments_added - s.fragments_added,
|
||||
s.fragments_added
|
||||
s.table_key, s.fragments_removed, s.fragments_added
|
||||
);
|
||||
} else {
|
||||
println!(" {:<40} no-op", s.table_key);
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-compiler"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
edition = "2024"
|
||||
description = "Schema/query compiler for Omnigraph. Zero Lance dependency."
|
||||
license = "MIT"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-policy"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
edition = "2024"
|
||||
description = "Policy / authorization layer for Omnigraph — Cedar-backed PolicyEngine, PolicyChecker trait, ResourceScope enum."
|
||||
license = "MIT"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-server"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
edition = "2024"
|
||||
description = "HTTP server for the Omnigraph graph database."
|
||||
license = "MIT"
|
||||
|
|
@ -19,9 +19,9 @@ default = []
|
|||
aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"]
|
||||
|
||||
[dependencies]
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.0" }
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
|
||||
axum = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-engine"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
edition = "2024"
|
||||
description = "Runtime engine for the Omnigraph graph database."
|
||||
license = "MIT"
|
||||
|
|
@ -16,8 +16,8 @@ default = []
|
|||
failpoints = ["dep:fail", "fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
|
||||
lance = { workspace = true }
|
||||
lance-datafusion = { workspace = true }
|
||||
datafusion = { workspace = true }
|
||||
|
|
@ -51,7 +51,7 @@ chrono = { workspace = true }
|
|||
arc-swap = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
|
||||
tokio = { workspace = true }
|
||||
lance-namespace-impls = { workspace = true }
|
||||
serial_test = "3"
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
|
|||
pub(crate) use omnigraph::ensure_public_branch_ref;
|
||||
pub use omnigraph::{
|
||||
CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions,
|
||||
SchemaApplyResult, TableCleanupStats, TableOptimizeStats,
|
||||
SchemaApplyResult, SkipReason, TableCleanupStats, TableOptimizeStats,
|
||||
};
|
||||
pub(crate) use run_registry::is_internal_run_branch;
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ mod optimize;
|
|||
mod schema_apply;
|
||||
mod table_ops;
|
||||
|
||||
pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
|
||||
pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
|
||||
pub use schema_apply::SchemaApplyOptions;
|
||||
|
||||
use super::commit_graph::GraphCommit;
|
||||
|
|
|
|||
|
|
@ -40,6 +40,20 @@ fn maint_concurrency() -> usize {
|
|||
.unwrap_or(DEFAULT_MAINT_CONCURRENCY)
|
||||
}
|
||||
|
||||
/// Whether the installed Lance can compact a dataset that contains blob
|
||||
/// columns. `false` today: Lance `compact_files` forces
|
||||
/// `BlobHandling::AllBinary` on the read side, and the blob-v2 struct decoder
|
||||
/// mis-counts columns ("there were more fields in the schema than provided
|
||||
/// column indices"), failing even a pristine uniform-V2_2 multi-fragment blob
|
||||
/// table. Reads are unaffected (queries use descriptor handling).
|
||||
///
|
||||
/// While `false`, [`optimize_all_tables`] skips blob-bearing tables and reports
|
||||
/// [`SkipReason::BlobColumnsUnsupportedByLance`] instead of aborting the whole
|
||||
/// sweep. Flip to `true` once the upstream Lance fix ships — the
|
||||
/// `lance_surface_guards.rs::compact_files_still_fails_on_blob_columns` guard
|
||||
/// turns red on that bump and forces this flip. Tracked in `docs/dev/lance.md`.
|
||||
const LANCE_SUPPORTS_BLOB_COMPACTION: bool = false;
|
||||
|
||||
/// Retention knobs for [`cleanup_all_tables`]. At least one must be set or
|
||||
/// nothing is cleaned. If both are set, Lance applies them as AND (a manifest
|
||||
/// is kept if it satisfies either — i.e. only manifests older than BOTH the
|
||||
|
|
@ -52,8 +66,45 @@ pub struct CleanupPolicyOptions {
|
|||
pub older_than: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Per-table outcome of `optimize_all_tables`.
|
||||
/// Why `optimize` did not compact a table. Typed so callers branch on the
|
||||
/// reason rather than sniffing a string. One variant today, gated by
|
||||
/// [`LANCE_SUPPORTS_BLOB_COMPACTION`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[non_exhaustive]
|
||||
pub enum SkipReason {
|
||||
/// The table has one or more `Blob` columns. Lance `compact_files` forces
|
||||
/// `BlobHandling::AllBinary`, which mis-decodes blob-v2 columns; see
|
||||
/// [`LANCE_SUPPORTS_BLOB_COMPACTION`] and `docs/dev/lance.md`.
|
||||
BlobColumnsUnsupportedByLance,
|
||||
}
|
||||
|
||||
impl SkipReason {
|
||||
/// Stable machine-readable token for serialized output (e.g. CLI `--json`).
|
||||
/// Once emitted this is part of the output contract — keep it stable.
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
SkipReason::BlobColumnsUnsupportedByLance => "blob_columns_unsupported_by_lance",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SkipReason {
|
||||
/// Human-readable reason for CLI and log output.
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let msg = match self {
|
||||
SkipReason::BlobColumnsUnsupportedByLance => {
|
||||
"blob columns — Lance compaction unsupported"
|
||||
}
|
||||
};
|
||||
f.write_str(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-table outcome of `optimize_all_tables`. This is a returned result type,
|
||||
/// not built by callers, so it is `#[non_exhaustive]`: future fields stay
|
||||
/// non-breaking and downstream code reads fields rather than constructing it.
|
||||
#[derive(Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct TableOptimizeStats {
|
||||
pub table_key: String,
|
||||
/// Number of source fragments that were rewritten by Lance.
|
||||
|
|
@ -62,6 +113,33 @@ pub struct TableOptimizeStats {
|
|||
pub fragments_added: usize,
|
||||
/// Did this table get a new Lance manifest version from the compaction?
|
||||
pub committed: bool,
|
||||
/// `Some(reason)` if this table was deliberately not compacted. When set,
|
||||
/// `fragments_removed == 0`, `fragments_added == 0`, and `!committed`.
|
||||
pub skipped: Option<SkipReason>,
|
||||
}
|
||||
|
||||
impl TableOptimizeStats {
|
||||
/// Stat for a table that Lance actually compacted.
|
||||
fn compacted(table_key: String, metrics: &CompactionMetrics, committed: bool) -> Self {
|
||||
Self {
|
||||
table_key,
|
||||
fragments_removed: metrics.fragments_removed,
|
||||
fragments_added: metrics.fragments_added,
|
||||
committed,
|
||||
skipped: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Stat for a table that was deliberately skipped (compaction not attempted).
|
||||
fn skipped(table_key: String, reason: SkipReason) -> Self {
|
||||
Self {
|
||||
table_key,
|
||||
fragments_removed: 0,
|
||||
fragments_added: 0,
|
||||
committed: false,
|
||||
skipped: Some(reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-table outcome of `cleanup_all_tables`. `error` is `Some` when this
|
||||
|
|
@ -84,14 +162,21 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
|
|||
let resolved = db.resolved_branch_target(None).await?;
|
||||
let snapshot = resolved.snapshot;
|
||||
|
||||
let table_tasks: Vec<_> = all_table_keys(&db.catalog())
|
||||
.into_iter()
|
||||
.filter_map(|table_key| {
|
||||
let entry = snapshot.entry(&table_key)?;
|
||||
// Compute per-table state (path + whether it has blob columns) up front, in
|
||||
// a scope that drops the catalog handle before the async stream starts.
|
||||
let table_tasks: Vec<(String, String, bool)> = {
|
||||
let catalog = db.catalog();
|
||||
let mut tasks = Vec::new();
|
||||
for table_key in all_table_keys(&catalog) {
|
||||
let Some(entry) = snapshot.entry(&table_key) else {
|
||||
continue;
|
||||
};
|
||||
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
Some((table_key, full_path))
|
||||
})
|
||||
.collect();
|
||||
let has_blob = !blob_properties_for_table_key(&catalog, &table_key)?.is_empty();
|
||||
tasks.push((table_key, full_path, has_blob));
|
||||
}
|
||||
tasks
|
||||
};
|
||||
|
||||
if table_tasks.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
|
|
@ -101,7 +186,24 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
|
|||
let table_store = &db.table_store;
|
||||
|
||||
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
|
||||
.map(|(table_key, full_path)| async move {
|
||||
.map(|(table_key, full_path, has_blob)| async move {
|
||||
// Lance `compact_files` mis-decodes blob-v2 columns under the forced
|
||||
// `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION).
|
||||
// Skip blob-bearing tables and report it rather than aborting the
|
||||
// whole sweep — the other tables still compact.
|
||||
if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION {
|
||||
tracing::warn!(
|
||||
target: "omnigraph::optimize",
|
||||
table = %table_key,
|
||||
"skipping compaction: table has blob columns the current Lance \
|
||||
cannot rewrite (blob-v2 AllBinary decode bug); other tables \
|
||||
unaffected — rerun after the Lance fix",
|
||||
);
|
||||
return Ok(TableOptimizeStats::skipped(
|
||||
table_key,
|
||||
SkipReason::BlobColumnsUnsupportedByLance,
|
||||
));
|
||||
}
|
||||
let mut ds = table_store
|
||||
.open_dataset_head_for_write(&table_key, &full_path, None)
|
||||
.await?;
|
||||
|
|
@ -111,12 +213,11 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
|
|||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
let version_after = ds.version().version;
|
||||
Ok(TableOptimizeStats {
|
||||
Ok(TableOptimizeStats::compacted(
|
||||
table_key,
|
||||
fragments_removed: metrics.fragments_removed,
|
||||
fragments_added: metrics.fragments_added,
|
||||
committed: version_after != version_before,
|
||||
})
|
||||
&metrics,
|
||||
version_after != version_before,
|
||||
))
|
||||
})
|
||||
.buffer_unordered(concurrency)
|
||||
.collect()
|
||||
|
|
|
|||
|
|
@ -290,3 +290,88 @@ async fn force_delete_branch_semantics() {
|
|||
TableStore::force_delete_branch's NotFound tolerance can be simplified."
|
||||
);
|
||||
}
|
||||
|
||||
// --- Guard 10: blob-column compaction is still broken in this Lance --------
|
||||
//
|
||||
// `db/omnigraph/optimize.rs` skips tables with blob columns while
|
||||
// `LANCE_SUPPORTS_BLOB_COMPACTION = false`: Lance `compact_files` forces
|
||||
// `BlobHandling::AllBinary`, and the blob-v2 struct decoder mis-counts columns
|
||||
// ("more fields in the schema than provided column indices"), failing even a
|
||||
// pristine uniform-V2_2 multi-fragment blob table. Reads are unaffected (they
|
||||
// use descriptor handling).
|
||||
//
|
||||
// WHEN THIS TEST TURNS RED (compact_files no longer errors), the Lance bug is
|
||||
// fixed: flip `LANCE_SUPPORTS_BLOB_COMPACTION` to true in optimize.rs, drop the
|
||||
// blob-skip branch + the `optimize_skips_blob_table_and_reports_skip`
|
||||
// skip assertions in maintenance.rs, and re-pin docs/dev/lance.md.
|
||||
|
||||
#[tokio::test]
|
||||
async fn compact_files_still_fails_on_blob_columns() {
|
||||
use arrow_array::{LargeBinaryArray, StructArray};
|
||||
|
||||
fn blob_batch(start: i32, n: i32) -> RecordBatch {
|
||||
let ids: Vec<String> = (start..start + n).map(|i| format!("n{i}")).collect();
|
||||
let data =
|
||||
LargeBinaryArray::from_iter_values((start..start + n).map(|i| format!("blob{i}")));
|
||||
let blob_uri = StringArray::from(vec![None::<&str>; n as usize]);
|
||||
let DataType::Struct(fields) = lance::blob::blob_field("content", true).data_type().clone()
|
||||
else {
|
||||
unreachable!("blob_field is always a Struct");
|
||||
};
|
||||
let content = StructArray::new(
|
||||
fields,
|
||||
vec![Arc::new(data) as _, Arc::new(blob_uri) as _],
|
||||
None,
|
||||
);
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
lance::blob::blob_field("content", true),
|
||||
]));
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![Arc::new(StringArray::from(ids)) as _, Arc::new(content) as _],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn write(uri: &str, batch: RecordBatch, mode: WriteMode) {
|
||||
let schema = batch.schema();
|
||||
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||
// Blob v2 requires file version >= 2.2; without the pin the *write*
|
||||
// would fail with a different error, masking the guard's intent.
|
||||
let params = WriteParams {
|
||||
mode,
|
||||
enable_stable_row_ids: true,
|
||||
data_storage_version: Some(LanceFileVersion::V2_2),
|
||||
..Default::default()
|
||||
};
|
||||
Dataset::write(reader, uri, Some(params)).await.unwrap();
|
||||
}
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().join("guard10-blob.lance");
|
||||
let uri = uri.to_str().unwrap();
|
||||
|
||||
// Uniform V2_2, two fragments → forces compaction to actually rewrite.
|
||||
write(uri, blob_batch(0, 2), WriteMode::Create).await;
|
||||
write(uri, blob_batch(100, 2), WriteMode::Append).await;
|
||||
|
||||
let mut ds = Dataset::open(uri).await.unwrap();
|
||||
assert!(
|
||||
ds.get_fragments().len() >= 2,
|
||||
"guard needs a multi-fragment table to trigger a real compaction rewrite"
|
||||
);
|
||||
|
||||
let result = compact_files(&mut ds, CompactionOptions::default(), None).await;
|
||||
let err = result.expect_err(
|
||||
"compact_files unexpectedly SUCCEEDED on a blob table — the Lance blob-v2 \
|
||||
compaction bug is fixed. Flip LANCE_SUPPORTS_BLOB_COMPACTION to true in \
|
||||
db/omnigraph/optimize.rs, remove the blob-skip branch, and re-pin docs/dev/lance.md.",
|
||||
);
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("more fields in the schema than provided column indices"),
|
||||
"blob compaction failed with an unexpected error (Lance internals may have \
|
||||
shifted): {err}"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ mod helpers;
|
|||
use std::time::Duration;
|
||||
|
||||
use lance::Dataset;
|
||||
use omnigraph::db::{CleanupPolicyOptions, Omnigraph};
|
||||
use omnigraph::db::{CleanupPolicyOptions, Omnigraph, SkipReason};
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
|
||||
use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load};
|
||||
|
|
@ -72,6 +72,97 @@ async fn optimize_after_load_then_again_is_idempotent() {
|
|||
}
|
||||
}
|
||||
|
||||
// Regression: `optimize` must not crash on a graph that has a `Blob` table.
|
||||
//
|
||||
// Lance `compact_files` forces `BlobHandling::AllBinary`, which mis-decodes
|
||||
// blob-v2 columns ("more fields in the schema than provided column indices"),
|
||||
// failing even a pristine uniform-V2_2 multi-fragment blob table. `optimize`
|
||||
// must skip blob-bearing tables (and report the skip) rather than aborting the
|
||||
// whole sweep.
|
||||
//
|
||||
// Before the skip fix, `optimize()` returned that Lance error here and aborted
|
||||
// the whole sweep; it now skips the blob table (`doc.skipped == Some(..)`)
|
||||
// while the sibling non-blob `Tag` table still compacts. The skip is gated by
|
||||
// `LANCE_SUPPORTS_BLOB_COMPACTION`; the surface guard
|
||||
// `compact_files_still_fails_on_blob_columns` flags when the upstream Lance fix
|
||||
// makes the skip (and this test's blob arm) removable.
|
||||
#[tokio::test]
|
||||
async fn optimize_skips_blob_table_and_reports_skip() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
// One Blob node type (`Doc`) + one plain node type (`Tag`): proves the blob
|
||||
// table is skipped while a non-blob table in the same sweep still compacts.
|
||||
let schema = "\
|
||||
node Doc {\n slug: String @key\n content: Blob\n}\n\
|
||||
node Tag {\n slug: String @key\n}\n";
|
||||
let mut db = Omnigraph::init(uri, schema).await.unwrap();
|
||||
|
||||
// Multi-fragment blob table: Overwrite creates fragment 1; each Merge of
|
||||
// new keys appends another. A >=2-fragment blob table is exactly what
|
||||
// crashes `compact_files` today (single fragment would no-op and not crash).
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"content\":\"base64:aGVsbG8x\"}}\n{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"content\":\"base64:aGVsbG8y\"}}",
|
||||
LoadMode::Overwrite,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"content\":\"base64:aGVsbG8z\"}}",
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"content\":\"base64:aGVsbG80\"}}",
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Plain table, also multi-fragment so it has something to compact.
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Tag\",\"data\":{\"slug\":\"t1\"}}\n{\"type\":\"Tag\",\"data\":{\"slug\":\"t2\"}}",
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Tag\",\"data\":{\"slug\":\"t3\"}}",
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let stats = db
|
||||
.optimize()
|
||||
.await
|
||||
.expect("optimize must not crash on a graph with a Blob table");
|
||||
|
||||
let doc = stats
|
||||
.iter()
|
||||
.find(|s| s.table_key == "node:Doc")
|
||||
.expect("Doc stat present");
|
||||
let tag = stats
|
||||
.iter()
|
||||
.find(|s| s.table_key == "node:Tag")
|
||||
.expect("Tag stat present");
|
||||
// The blob table is skipped (and reported), not compacted.
|
||||
assert_eq!(
|
||||
doc.skipped,
|
||||
Some(SkipReason::BlobColumnsUnsupportedByLance),
|
||||
"blob table must be reported as skipped",
|
||||
);
|
||||
assert!(!doc.committed, "skipped blob table is not compacted");
|
||||
assert_eq!(doc.fragments_removed, 0);
|
||||
assert_eq!(doc.fragments_added, 0);
|
||||
// The plain (non-blob) table is unaffected by the skip.
|
||||
assert_eq!(tag.skipped, None, "non-blob table must not be skipped");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cleanup_without_any_policy_option_errors() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue