Merge branch 'main' into ragnorc/index-best-practices-audit

This commit is contained in:
Ragnor Comerford 2026-06-09 17:57:14 +02:00 committed by GitHub
commit 5ca5c40df7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 5302 additions and 202 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-cli"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
description = "CLI for the Omnigraph graph database."
license = "MIT"
@ -13,10 +13,11 @@ name = "omnigraph"
path = "src/main.rs"
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
omnigraph-server = { path = "../omnigraph-server", version = "0.6.1" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" }
omnigraph-server = { path = "../omnigraph-server", version = "0.6.2" }
clap = { workspace = true }
color-eyre = { workspace = true }
serde = { workspace = true }

View file

@ -10,6 +10,10 @@ use color_eyre::eyre::{Result, bail};
use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
DiagnosticSeverity, PlanOutput, StatusOutput, ValidateOutput, plan_config_dir,
status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::{
@ -283,6 +287,25 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Classify and explicitly repair manifest/head drift
Repair {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Publish verified maintenance drift. Without this flag, repair only
/// previews what it would do.
#[arg(long)]
confirm: bool,
/// Also publish suspicious or unverifiable drift. Requires
/// `--confirm`; use only after operator review.
#[arg(long, requires = "confirm")]
force: bool,
#[arg(long)]
json: bool,
},
/// Remove old Lance versions from every table of the graph (destructive)
Cleanup {
/// Graph URI
@ -305,6 +328,11 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Validate and plan read-only cluster configuration.
Cluster {
#[command(subcommand)]
command: ClusterCommand,
},
/// Manage graphs on a multi-graph server (MR-668)
Graphs {
#[command(subcommand)]
@ -312,6 +340,37 @@ enum Command {
},
}
#[derive(Debug, Subcommand)]
enum ClusterCommand {
/// Validate cluster.yaml and referenced schemas, queries, and policy files.
Validate {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Produce a read-only plan by diffing cluster.yaml against __cluster/state.json.
Plan {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Read the local JSON state ledger without scanning live graph resources.
Status {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
}
/// Operations on the graph registry of a multi-graph server (MR-668).
///
/// All operations target a remote multi-graph server URL (http:// or
@ -683,6 +742,118 @@ fn print_json<T: Serialize>(value: &T) -> Result<()> {
Ok(())
}
fn print_cluster_validate_human(output: &ValidateOutput) {
if output.ok {
println!(
"cluster config valid: {} resource(s), {} dependency edge(s)",
output.resources.len(),
output.dependencies.len()
);
} else {
println!("cluster config invalid");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_plan_human(output: &PlanOutput) {
if output.ok {
println!(
"cluster plan: {} change(s), {} approval gate(s)",
output.changes.len(),
output.approvals_required.len()
);
for change in &output.changes {
println!(" {:?} {}", change.operation, change.resource);
}
if output.changes.is_empty() {
println!(" no changes");
}
} else {
println!("cluster plan failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_status_human(output: &StatusOutput) {
if output.ok {
let state = &output.state_observations;
if state.state_found {
println!(
"cluster state: revision {}, {} resource(s)",
state.state_revision, state.resource_count
);
if let Some(digest) = state.applied_config_digest.as_deref() {
println!(" applied config: {digest}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: held ({lock_id})"),
None => println!(" lock: held"),
}
} else {
println!(" lock: not held");
}
} else {
println!("cluster state missing");
}
} else {
println!("cluster status failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
for diagnostic in diagnostics {
let label = match diagnostic.severity {
DiagnosticSeverity::Error => "ERROR",
DiagnosticSeverity::Warning => "WARN ",
};
println!(
"{label} {} {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
);
}
}
fn finish_cluster_validate(output: &ValidateOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_validate_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_plan_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_status_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn is_remote_uri(uri: &str) -> bool {
uri.starts_with("http://") || uri.starts_with("https://")
}
@ -801,13 +972,11 @@ struct ResolvedPolicyContext {
fn resolve_policy_context(config: &OmnigraphConfig) -> Result<ResolvedPolicyContext> {
let selected = config.resolve_policy_tooling_graph_selection()?;
let policy_file = config
.resolve_policy_file_for(selected)
.ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let policy_file = config.resolve_policy_file_for(selected).ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let graph_id = match selected {
Some(name) => graph_resource_id_for_selection(Some(name), ""),
None => graph_resource_id_for_selection(None, "default"),
@ -2166,16 +2335,14 @@ fn rewrite_deprecated_argv(args: Vec<OsString>) -> Vec<OsString> {
}
if let Some(sub) = args.get(1).and_then(|s| s.to_str()) {
match sub {
"read" => eprintln!(
"warning: `omnigraph read` is deprecated; use `omnigraph query` instead"
),
"read" => {
eprintln!("warning: `omnigraph read` is deprecated; use `omnigraph query` instead")
}
"change" => eprintln!(
"warning: `omnigraph change` is deprecated; use `omnigraph mutate` instead"
),
"check" => {
eprintln!(
"warning: `omnigraph check` is deprecated; use `omnigraph lint` instead"
);
eprintln!("warning: `omnigraph check` is deprecated; use `omnigraph lint` instead");
// Rewrite the top-level subcommand to `lint`; pass through the rest.
let mut out = Vec::with_capacity(args.len());
out.push(args[0].clone());
@ -3012,6 +3179,8 @@ async fn main() -> Result<()> {
"fragments_added": s.fragments_added,
"committed": s.committed,
"skipped": s.skipped.map(|r| r.as_str()),
"manifest_version": s.manifest_version,
"lance_head_version": s.lance_head_version,
})).collect::<Vec<_>>(),
});
print_json(&value)?;
@ -3031,6 +3200,89 @@ async fn main() -> Result<()> {
}
}
}
Command::Repair {
uri,
target,
config,
confirm,
force,
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let db = Omnigraph::open(&uri).await?;
let stats = db
.repair(omnigraph::db::RepairOptions { confirm, force })
.await?;
let refused_count = stats
.tables
.iter()
.filter(|s| matches!(s.action, omnigraph::db::RepairAction::Refused))
.count();
if json {
let value = serde_json::json!({
"uri": uri,
"confirm": confirm,
"force": force,
"manifest_version": stats.manifest_version,
"tables": stats.tables.iter().map(|s| serde_json::json!({
"table_key": s.table_key,
"manifest_version": s.manifest_version,
"lance_head_version": s.lance_head_version,
"classification": s.classification.as_str(),
"action": s.action.as_str(),
"operations": s.operations,
"error": s.error,
})).collect::<Vec<_>>(),
});
print_json(&value)?;
} else {
let mode = if confirm { "confirm" } else { "preview" };
println!(
"repair {} — {} mode, {} tables",
uri,
mode,
stats.tables.len()
);
for s in &stats.tables {
let drift = if s.manifest_version == s.lance_head_version {
format!("{}", s.manifest_version)
} else {
format!("{}{}", s.manifest_version, s.lance_head_version)
};
let ops = if s.operations.is_empty() {
String::new()
} else {
format!(" [{}]", s.operations.join(", "))
};
let err = s
.error
.as_ref()
.map(|err| format!(" ({err})"))
.unwrap_or_default();
println!(
" {:<40} {:<12} {:<22} {}{}{}",
s.table_key,
s.action.as_str(),
s.classification.as_str(),
drift,
ops,
err
);
}
if !confirm {
println!("rerun with --confirm to publish verified maintenance drift");
}
}
if refused_count > 0 {
bail!(
"repair refused {} suspicious or unverifiable table(s); review the preview \
output and rerun with --force --confirm only if publishing that drift is \
intentional",
refused_count
);
}
}
Command::Cleanup {
uri,
target,
@ -3111,6 +3363,20 @@ async fn main() -> Result<()> {
}
}
}
Command::Cluster { command } => match command {
ClusterCommand::Validate { config, json } => {
let output = validate_config_dir(config);
finish_cluster_validate(&output, json)?;
}
ClusterCommand::Plan { config, json } => {
let output = plan_config_dir(config);
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Status { config, json } => {
let output = status_config_dir(config);
finish_cluster_status(&output, json)?;
}
},
Command::Graphs { command } => match command {
GraphsCommand::List {
uri,
@ -3157,8 +3423,8 @@ mod tests {
use super::{
DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file,
legacy_change_request_body, load_cli_config, load_env_file_into_process,
normalize_bearer_token, parse_env_assignment, resolve_policy_context,
resolve_cli_graph, resolve_remote_bearer_token,
normalize_bearer_token, parse_env_assignment, resolve_cli_graph, resolve_policy_context,
resolve_remote_bearer_token,
};
use omnigraph_server::load_config;
use reqwest::header::AUTHORIZATION;
@ -3420,7 +3686,8 @@ graphs:
}
#[test]
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri() {
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri()
{
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(

View file

@ -1,5 +1,6 @@
use std::fs;
use lance::Dataset;
use lance::index::DatasetIndexExt;
use omnigraph::db::{Omnigraph, ReadTarget};
use serde_json::Value;
@ -60,6 +61,25 @@ fn manifest_dataset_version(graph: &std::path::Path) -> u64 {
})
}
fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let uri = graph.to_string_lossy();
let db = Omnigraph::open(uri.as_ref()).await.unwrap();
let snap = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap();
let entry = snap.entry("node:Person").unwrap();
let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
let mut ds = Dataset::open(&full_path).await.unwrap();
let deleted = ds.delete("name = 'Alice'").await.unwrap();
assert_eq!(deleted.num_deleted_rows, 1);
let head = deleted.new_dataset.version().version;
assert!(head > entry.table_version);
(entry.table_version, head)
})
}
fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) {
let config = root.join("omnigraph.yaml");
let policy = root.join("policy.yaml");
@ -78,6 +98,52 @@ policy:
(config, policy)
}
fn write_cluster_config_fixture(root: &std::path::Path) {
fs::write(
root.join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
}
"#,
)
.unwrap();
fs::write(
root.join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name, $p.age }
}
"#,
)
.unwrap();
fs::write(root.join("base.policy.yaml"), "rules: []\n").unwrap();
fs::write(
root.join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#,
)
.unwrap();
}
#[test]
fn version_command_prints_current_cli_version() {
let output = output_success(cli().arg("version"));
@ -89,6 +155,270 @@ fn version_command_prints_current_cli_version() {
);
}
#[test]
fn cluster_validate_config_success() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("cluster config valid"), "{stdout}");
}
#[test]
fn cluster_validate_json_is_stable() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert!(json["resource_digests"]["graph.knowledge"].is_string());
assert!(json["resource_digests"]["query.knowledge.find_person"].is_string());
assert_eq!(json["dependencies"][0]["from"], "policy.base");
assert_eq!(json["dependencies"][0]["to"], "graph.knowledge");
}
#[test]
fn cluster_plan_json_reads_inferred_local_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" },
"policy.old": { "digest": "old-policy" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], true);
assert!(
json["changes"]
.as_array()
.unwrap()
.iter()
.any(|change| change["resource"] == "policy.old" && change["operation"] == "delete"),
"plan should read state and delete stale resources: {json}"
);
}
#[test]
fn cluster_status_json_reports_missing_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"missing state should be a warning diagnostic: {json}"
);
}
#[test]
fn cluster_status_json_reports_extended_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 5,
"applied_revision": {
"config_digest": "applied",
"resources": {
"graph.knowledge": { "digest": "graph-digest" }
}
},
"resource_statuses": {
"graph.knowledge": { "status": "applied", "conditions": ["healthy"] }
},
"approval_records": {},
"recovery_records": {},
"observations": {}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_revision"], 5);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["resource_digests"]["graph.knowledge"], "graph-digest");
assert_eq!(
json["resource_statuses"]["graph.knowledge"]["status"],
"applied"
);
}
#[test]
fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 9,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_revision"], 9);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(!state_dir.join("lock.json").exists());
}
#[test]
fn cluster_plan_locked_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
r#"
{
"version": 1,
"lock_id": "held-lock",
"operation": "plan",
"created_at": "2026-06-08T00:00:00Z",
"pid": 123
}
"#,
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_acquired"], false);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
"locked state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_validate_invalid_config_exits_nonzero() {
let temp = tempdir().unwrap();
fs::write(
temp.path().join("cluster.yaml"),
"version: 1\ngraphs: {}\npipelines: {}\n",
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("future_phase_field"), "{stdout}");
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));
@ -235,6 +565,83 @@ fn init_creates_graph_successfully_on_missing_local_directory() {
assert!(temp.path().join("omnigraph.yaml").exists());
}
#[test]
fn repair_json_reports_noop_on_clean_graph() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
init_graph(&graph);
load_fixture(&graph);
let output = output_success(cli().arg("repair").arg("--json").arg(&graph));
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["confirm"], false);
assert_eq!(payload["force"], false);
assert_eq!(payload["manifest_version"], Value::Null);
let tables = payload["tables"].as_array().unwrap();
assert_eq!(tables.len(), 4);
assert!(tables.iter().all(|table| {
table["classification"] == "no_drift" && table["action"] == "no_op"
}));
}
#[test]
fn repair_confirm_json_refuses_suspicious_drift_with_nonzero_exit_then_force_succeeds() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
init_graph(&graph);
load_fixture(&graph);
let graph_manifest_before = manifest_dataset_version(&graph);
let (table_manifest_before, table_head_before) = forge_person_delete_drift(&graph);
let refused = output_failure(
cli()
.arg("repair")
.arg("--confirm")
.arg("--json")
.arg(&graph),
);
let refused_payload: Value = serde_json::from_slice(&refused.stdout).unwrap();
assert_eq!(refused_payload["manifest_version"], Value::Null);
let person = refused_payload["tables"]
.as_array()
.unwrap()
.iter()
.find(|table| table["table_key"] == "node:Person")
.unwrap();
assert_eq!(person["classification"], "suspicious");
assert_eq!(person["action"], "refused");
assert!(
String::from_utf8_lossy(&refused.stderr).contains("repair refused"),
"stderr should explain the non-zero exit; got: {}",
String::from_utf8_lossy(&refused.stderr)
);
assert_eq!(manifest_dataset_version(&graph), graph_manifest_before);
let forced = output_success(
cli()
.arg("repair")
.arg("--force")
.arg("--confirm")
.arg("--json")
.arg(&graph),
);
let forced_payload: Value = serde_json::from_slice(&forced.stdout).unwrap();
let forced_manifest = forced_payload["manifest_version"].as_u64().unwrap();
assert!(forced_manifest > graph_manifest_before);
let person = forced_payload["tables"]
.as_array()
.unwrap()
.iter()
.find(|table| table["table_key"] == "node:Person")
.unwrap();
assert_eq!(person["classification"], "suspicious");
assert_eq!(person["action"], "forced");
assert_eq!(person["manifest_version"], table_manifest_before);
assert_eq!(person["lance_head_version"], table_head_before);
assert_eq!(manifest_dataset_version(&graph), forced_manifest);
}
#[test]
fn schema_plan_json_reports_supported_additive_change() {
let temp = tempdir().unwrap();
@ -798,8 +1205,7 @@ fn deprecated_read_and_change_subcommands_emit_warnings() {
let output = cli().arg("read").output().unwrap();
let stderr = String::from_utf8(output.stderr).unwrap();
assert!(
stderr.contains("`omnigraph read` is deprecated")
&& stderr.contains("`omnigraph query`"),
stderr.contains("`omnigraph read` is deprecated") && stderr.contains("`omnigraph query`"),
"expected `omnigraph read` deprecation warning; got: {stderr}"
);
@ -2394,9 +2800,19 @@ fn queries_validate_exits_zero_on_clean_registry() {
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"),
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_success(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let output = output_success(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let stdout = stdout_string(&output);
assert!(stdout.contains("OK"), "stdout:\n{stdout}");
}
@ -2405,12 +2821,21 @@ fn queries_validate_exits_zero_on_clean_registry() {
fn queries_validate_exits_nonzero_on_type_broken_query() {
let graph = SystemGraph::loaded();
// `Widget` is not in the fixture schema.
graph.write_query("ghost.gq", "query ghost() { match { $w: Widget } return { $w.name } }");
graph.write_query(
"ghost.gq",
"query ghost() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "ghost", "ghost.gq"),
);
let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(
stdout.contains("ghost"),
@ -2444,7 +2869,13 @@ fn queries_list_prints_registered_query() {
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("find_person"), "stdout:\n{stdout}");
assert!(
@ -2480,7 +2911,13 @@ fn queries_list_requires_graph_selection_for_per_graph_only_registries() {
),
);
let output = output_failure(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("local") && stderr.contains("--target local"),
@ -2505,7 +2942,13 @@ fn queries_list_without_graph_selection_lists_top_level_registry() {
),
);
let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("top_find"), "stdout:\n{stdout}");
}
@ -2524,7 +2967,11 @@ fn queries_list_unknown_target_errors() {
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"),
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_failure(
cli()
@ -2566,7 +3013,7 @@ fn queries_commands_reject_named_graph_with_populated_top_level_block() {
" file: ./find_person.gq\n",
"cli:\n",
" graph: local\n",
"queries:\n", // populated top-level block: the coherence violation
"queries:\n", // populated top-level block: the coherence violation
" legacy:\n",
" file: ./legacy.gq\n",
"policy: {{}}\n",
@ -2592,8 +3039,14 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
// collision — `queries validate` must fail (offline, before the engine
// opens) and name both queries plus the contested tool.
let graph = SystemGraph::loaded();
graph.write_query("a.gq", "query a() { match { $p: Person } return { $p.name } }");
graph.write_query("b.gq", "query b() { match { $p: Person } return { $p.name } }");
graph.write_query(
"a.gq",
"query a() { match { $p: Person } return { $p.name } }",
);
graph.write_query(
"b.gq",
"query b() { match { $p: Person } return { $p.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&format!(
@ -2615,7 +3068,13 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("dup") && stderr.contains("'a'") && stderr.contains("'b'"),
@ -2635,7 +3094,10 @@ fn queries_validate_positional_uri_ignores_default_graph() {
);
// `Widget` is not in the fixture schema — the default graph's per-graph
// query would break validate if it were (wrongly) selected.
graph.write_query("broken.gq", "query broken() { match { $w: Widget } return { $w.name } }");
graph.write_query(
"broken.gq",
"query broken() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
concat!(

View file

@ -0,0 +1,22 @@
[package]
name = "omnigraph-cluster"
version = "0.6.2"
edition = "2024"
description = "Read-only cluster configuration validation and planning for Omnigraph."
license = "MIT"
repository = "https://github.com/ModernRelay/omnigraph"
homepage = "https://github.com/ModernRelay/omnigraph"
documentation = "https://docs.rs/omnigraph-cluster"
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
ulid = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-compiler"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
description = "Schema/query compiler for Omnigraph. Zero Lance dependency."
license = "MIT"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-policy"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
description = "Policy / authorization layer for Omnigraph — Cedar-backed PolicyEngine, PolicyChecker trait, ResourceScope enum."
license = "MIT"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-server"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
description = "HTTP server for the Omnigraph graph database."
license = "MIT"
@ -19,9 +19,9 @@ default = []
aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"]
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" }
axum = { workspace = true }
clap = { workspace = true }
color-eyre = { workspace = true }

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-engine"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
description = "Runtime engine for the Omnigraph graph database."
license = "MIT"
@ -16,8 +16,8 @@ default = []
failpoints = ["dep:fail", "fail/failpoints"]
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" }
lance = { workspace = true }
lance-datafusion = { workspace = true }
datafusion = { workspace = true }
@ -51,7 +51,7 @@ chrono = { workspace = true }
arc-swap = { workspace = true }
[dev-dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
serial_test = "3"

View file

@ -11,8 +11,9 @@ pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, Snapsh
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub(crate) use omnigraph::ensure_public_branch_ref;
pub use omnigraph::{
CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions,
SchemaApplyResult, SkipReason, TableCleanupStats, TableOptimizeStats,
CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, RepairAction,
RepairClassification, RepairOptions, RepairStats, SchemaApplyOptions, SchemaApplyResult,
SkipReason, TableCleanupStats, TableOptimizeStats, TableRepairStats,
};
pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__";

View file

@ -30,10 +30,14 @@ use crate::table_store::TableStore;
mod export;
mod optimize;
mod repair;
mod schema_apply;
mod table_ops;
pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
pub use repair::{
RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
};
pub use schema_apply::SchemaApplyOptions;
use super::commit_graph::GraphCommit;
@ -682,6 +686,16 @@ impl Omnigraph {
.map(|resolved| resolved.snapshot)
}
pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
self.ensure_schema_state_valid().await?;
let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
let coord = self.coordinator.read().await;
coord
.resolve_target(&requested)
.await
.map(|resolved| resolved.snapshot)
}
pub(crate) async fn version(&self) -> u64 {
self.coordinator.read().await.version()
}
@ -999,6 +1013,13 @@ impl Omnigraph {
optimize::optimize_all_tables(self).await
}
/// Classify and explicitly repair uncovered manifest/head drift. See
/// [`repair`] for the distinction between safe maintenance drift and
/// suspicious/unverifiable drift.
pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
repair::repair_all_tables(self, options).await
}
/// Remove Lance manifests (and the fragments they uniquely own) per the
/// given [`optimize::CleanupPolicyOptions`]. Destructive to version
/// history. See [`optimize`] for details.

View file

@ -75,8 +75,7 @@ pub struct CleanupPolicyOptions {
}
/// Why `optimize` did not compact a table. Typed so callers branch on the
/// reason rather than sniffing a string. One variant today, gated by
/// [`LANCE_SUPPORTS_BLOB_COMPACTION`].
/// reason rather than sniffing a string.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SkipReason {
@ -84,6 +83,12 @@ pub enum SkipReason {
/// `BlobHandling::AllBinary`, which mis-decodes blob-v2 columns; see
/// [`LANCE_SUPPORTS_BLOB_COMPACTION`] and `docs/dev/lance.md`.
BlobColumnsUnsupportedByLance,
/// The Lance dataset HEAD is ahead of the version recorded in
/// `__manifest`, and no recovery sidecar covers that movement. `optimize`
/// cannot infer whether the drift is benign maintenance or an external
/// semantic write, so it leaves the table untouched and points operators at
/// explicit `repair`.
DriftNeedsRepair,
}
impl SkipReason {
@ -92,6 +97,7 @@ impl SkipReason {
pub fn as_str(&self) -> &'static str {
match self {
SkipReason::BlobColumnsUnsupportedByLance => "blob_columns_unsupported_by_lance",
SkipReason::DriftNeedsRepair => "drift_needs_repair",
}
}
}
@ -103,6 +109,7 @@ impl std::fmt::Display for SkipReason {
SkipReason::BlobColumnsUnsupportedByLance => {
"blob columns — Lance compaction unsupported"
}
SkipReason::DriftNeedsRepair => "manifest/head drift — run omnigraph repair",
};
f.write_str(msg)
}
@ -125,6 +132,12 @@ pub struct TableOptimizeStats {
/// `Some(reason)` if this table was deliberately not compacted. When set,
/// `fragments_removed == 0`, `fragments_added == 0`, and `!committed`.
pub skipped: Option<SkipReason>,
/// Manifest table version observed by optimize for drift skips. `None` for
/// normal compaction/no-op/blob skips.
pub manifest_version: Option<u64>,
/// Lance HEAD version observed by optimize for drift skips. `None` for
/// normal compaction/no-op/blob skips.
pub lance_head_version: Option<u64>,
}
impl TableOptimizeStats {
@ -136,6 +149,8 @@ impl TableOptimizeStats {
fragments_added: metrics.fragments_added,
committed,
skipped: None,
manifest_version: None,
lance_head_version: None,
}
}
@ -147,6 +162,25 @@ impl TableOptimizeStats {
fragments_added: 0,
committed: false,
skipped: Some(reason),
manifest_version: None,
lance_head_version: None,
}
}
/// Stat for a table skipped because the manifest and Lance HEAD disagree.
fn skipped_for_drift(
table_key: String,
manifest_version: u64,
lance_head_version: u64,
) -> Self {
Self {
table_key,
fragments_removed: 0,
fragments_added: 0,
committed: false,
skipped: Some(SkipReason::DriftNeedsRepair),
manifest_version: Some(manifest_version),
lance_head_version: Some(lance_head_version),
}
}
}
@ -185,8 +219,7 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
));
}
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let snapshot = db.fresh_snapshot_for_branch(None).await?;
// Compute per-table state (path + whether it has blob columns) up front, in
// a scope that drops the catalog handle before the async stream starts.
@ -258,7 +291,8 @@ async fn optimize_one_table(
) -> Result<TableOptimizeStats> {
// Lance `compact_files` mis-decodes blob-v2 columns under the forced
// `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION). Skip
// blob-bearing tables and report it rather than aborting the whole sweep.
// blob-bearing tables before acquiring the write queue; `repair` is the
// operator tool for full manifest/head drift classification.
if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION {
tracing::warn!(
target: "omnigraph::optimize",
@ -291,20 +325,41 @@ async fn optimize_one_table(
// CAS baseline: the table's current manifest version, read under the queue
// (in-memory coordinator snapshot, no storage I/O — stable for this section).
let expected_version = db
.snapshot()
.await
.fresh_snapshot_for_branch(None)
.await?
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let lance_head_version = ds.version().version;
if lance_head_version < expected_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, expected_version
)));
}
if lance_head_version > expected_version {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
manifest_version = expected_version,
lance_head_version,
"skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \
to classify and publish covered maintenance drift explicitly",
);
return Ok(TableOptimizeStats::skipped_for_drift(
table_key,
expected_version,
lance_head_version,
));
}
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment). A
// steady-state already-compacted table yields an empty plan and is never
// pinned in a sidecar (a zero-commit pin would classify NoMovement on
// recovery and force an all-or-nothing rollback). There is no drift to
// reconcile here: optimize runs only on a recovered graph (the pending-
// sidecar guard above), and recovery roll-back now publishes, so
// `HEAD == manifest` holds going in.
// recovery and force an all-or-nothing rollback). Uncovered pre-existing
// drift is skipped above and must go through explicit repair.
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
@ -641,7 +696,7 @@ fn orphan_branches(present: Vec<String>, keep: &std::collections::HashSet<String
orphans
}
fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec<String> {
pub(super) fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec<String> {
let mut keys: Vec<String> = catalog
.node_types
.keys()

View file

@ -0,0 +1,332 @@
//! Explicit repair for uncovered manifest/head drift.
//!
//! Recovery sidecars handle deterministic crash residuals automatically. This
//! module is for the different case: a table's Lance HEAD is ahead of the
//! version recorded in `__manifest` and there is no sidecar encoding writer
//! intent. `repair` classifies that uncovered drift from Lance transactions and
//! only auto-publishes maintenance-only drift when the operator confirms.
use std::collections::HashMap;
use lance::Dataset;
use lance::dataset::transaction::Operation;
use super::*;
/// Options for [`Omnigraph::repair`].
#[derive(Debug, Clone, Copy, Default)]
pub struct RepairOptions {
/// Preview by default. With `confirm`, verified maintenance drift is
/// published to `__manifest`.
pub confirm: bool,
/// Also publish suspicious/unverifiable drift. Requires `confirm`.
pub force: bool,
}
/// Classification of a table's manifest/head state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum RepairClassification {
/// Lance HEAD equals the manifest pin.
NoDrift,
/// Every uncovered Lance transaction is maintenance-only (`Rewrite` or
/// `ReserveFragments`), so publishing the HEAD is content-preserving.
VerifiedMaintenance,
/// At least one uncovered transaction is semantic (`Append`, `Delete`,
/// `Update`, etc.).
Suspicious,
/// A needed transaction could not be read, so the drift cannot be judged.
Unverifiable,
}
impl RepairClassification {
/// Stable machine-readable token for serialized output.
pub fn as_str(&self) -> &'static str {
match self {
Self::NoDrift => "no_drift",
Self::VerifiedMaintenance => "verified_maintenance",
Self::Suspicious => "suspicious",
Self::Unverifiable => "unverifiable",
}
}
}
impl std::fmt::Display for RepairClassification {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
/// What repair did for a table.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum RepairAction {
/// Nothing to do.
NoOp,
/// Drift was reported but not published because this was a preview.
Preview,
/// Verified maintenance drift was published to `__manifest`.
Healed,
/// Suspicious/unverifiable drift was published because `force` was set.
Forced,
/// Drift was left untouched because it was not safe to publish without
/// `force`.
Refused,
}
impl RepairAction {
/// Stable machine-readable token for serialized output.
pub fn as_str(&self) -> &'static str {
match self {
Self::NoOp => "no_op",
Self::Preview => "preview",
Self::Healed => "healed",
Self::Forced => "forced",
Self::Refused => "refused",
}
}
}
impl std::fmt::Display for RepairAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
/// Per-table repair outcome.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TableRepairStats {
pub table_key: String,
pub manifest_version: u64,
pub lance_head_version: u64,
pub classification: RepairClassification,
pub action: RepairAction,
pub operations: Vec<String>,
pub error: Option<String>,
}
/// Whole-graph repair outcome.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct RepairStats {
pub tables: Vec<TableRepairStats>,
/// New graph manifest version if repair published any table pins.
pub manifest_version: Option<u64>,
}
struct ClassificationResult {
classification: RepairClassification,
operations: Vec<String>,
error: Option<String>,
}
pub async fn repair_all_tables(db: &Omnigraph, options: RepairOptions) -> Result<RepairStats> {
if options.force && !options.confirm {
return Err(OmniError::manifest("repair --force requires --confirm"));
}
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("repair").await?;
ensure_no_pending_recovery_sidecars(db, "repair").await?;
let snapshot = db.fresh_snapshot_for_branch(None).await?;
let table_tasks: Vec<(String, String)> = {
let catalog = db.catalog();
let mut tasks = Vec::new();
for table_key in optimize::all_table_keys(&catalog) {
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
tasks.push((table_key, full_path));
}
tasks
};
if table_tasks.is_empty() {
return Ok(RepairStats {
tables: Vec::new(),
manifest_version: None,
});
}
let queue_keys: Vec<(String, Option<String>)> = table_tasks
.iter()
.map(|(table_key, _)| (table_key.clone(), None))
.collect();
let _guards = db.write_queue().acquire_many(&queue_keys).await;
ensure_no_pending_recovery_sidecars(db, "repair").await?;
let snapshot = db.fresh_snapshot_for_branch(None).await?;
let mut tables = Vec::with_capacity(table_tasks.len());
let mut updates = Vec::new();
let mut expected = HashMap::new();
let mut any_forced = false;
for (table_key, full_path) in table_tasks {
let ds = db
.table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let manifest_version = snapshot
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let lance_head_version = ds.version().version;
if lance_head_version < manifest_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, manifest_version
)));
}
if lance_head_version == manifest_version {
tables.push(TableRepairStats {
table_key,
manifest_version,
lance_head_version,
classification: RepairClassification::NoDrift,
action: RepairAction::NoOp,
operations: Vec::new(),
error: None,
});
continue;
}
let classification = classify_drift(&ds, manifest_version, lance_head_version).await;
let action = match (
options.confirm,
options.force,
classification.classification,
) {
(false, _, _) => RepairAction::Preview,
(true, _, RepairClassification::VerifiedMaintenance) => RepairAction::Healed,
(true, true, RepairClassification::Suspicious | RepairClassification::Unverifiable) => {
any_forced = true;
RepairAction::Forced
}
(true, _, RepairClassification::Suspicious | RepairClassification::Unverifiable) => {
RepairAction::Refused
}
(true, _, RepairClassification::NoDrift) => RepairAction::NoOp,
};
if matches!(action, RepairAction::Healed | RepairAction::Forced) {
let state = db.table_store.table_state(&full_path, &ds).await?;
updates.push(crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
expected.insert(table_key.clone(), manifest_version);
}
tables.push(TableRepairStats {
table_key,
manifest_version,
lance_head_version,
classification: classification.classification,
action,
operations: classification.operations,
error: classification.error,
});
}
let manifest_version = if updates.is_empty() {
None
} else {
let actor = if any_forced {
Some("omnigraph:repair:force")
} else {
Some("omnigraph:repair")
};
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = db
.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&updates, &expected, actor)
.await?;
db.runtime_cache.invalidate_all().await;
if updates
.iter()
.any(|update| update.table_key.starts_with("edge:"))
{
db.invalidate_graph_index().await;
}
Some(manifest_version)
};
Ok(RepairStats {
tables,
manifest_version,
})
}
async fn ensure_no_pending_recovery_sidecars(db: &Omnigraph, operation: &str) -> Result<()> {
if !crate::db::manifest::list_sidecars(db.root_uri(), db.storage_adapter())
.await?
.is_empty()
{
return Err(OmniError::manifest_conflict(format!(
"{operation} requires a clean recovery state; reopen the graph to run the \
recovery sweep before repairing"
)));
}
Ok(())
}
async fn classify_drift(
ds: &Dataset,
manifest_version: u64,
lance_head_version: u64,
) -> ClassificationResult {
let mut operations = Vec::new();
let mut saw_suspicious = false;
let mut error = None;
for version in manifest_version.saturating_add(1)..=lance_head_version {
match ds.read_transaction_by_version(version).await {
Ok(Some(transaction)) => {
let operation = transaction.operation;
operations.push(operation.name().to_string());
if !matches!(
operation,
Operation::Rewrite { .. } | Operation::ReserveFragments { .. }
) {
saw_suspicious = true;
}
}
Ok(None) => {
error = Some(format!("missing Lance transaction for version {version}"));
break;
}
Err(err) => {
error = Some(format!(
"failed to read Lance transaction for version {version}: {err}"
));
break;
}
}
}
let classification = if error.is_some() {
RepairClassification::Unverifiable
} else if saw_suspicious {
RepairClassification::Suspicious
} else {
RepairClassification::VerifiedMaintenance
};
ClassificationResult {
classification,
operations,
error,
}
}

View file

@ -697,7 +697,7 @@ fn update_unique_constraints(
if any_null {
continue;
}
let value = parts.join("|");
let value = crate::loader::composite_unique_key(&parts);
let row_id = row_id_at(batch, row)?;
if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
conflicts.push(MergeConflict {

View file

@ -569,7 +569,8 @@ use super::staging::{MutationStaging, PendingMode};
/// via `open_for_mutation_on_branch`, which compares Lance HEAD against
/// the manifest's pinned version — that fence is the engine's
/// publisher-style OCC catching cross-writer drift before we make any
/// changes.
/// changes. For delete-only queries, this strict open is also the uncovered
/// drift guard that runs before `delete_where` can inline-commit.
///
/// On subsequent touches *within the same query*, behavior depends on
/// whether the table has already been inline-committed by a delete op:
@ -904,12 +905,12 @@ impl Omnigraph {
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
crate::loader::validate_value_constraints(&batch, node_type)?;
crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
if !unique_groups.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&batch,
type_name,
&unique_props,
&unique_groups,
)?;
}
let has_key = node_type.key_property().is_some();
@ -945,12 +946,12 @@ impl Omnigraph {
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_edge(edge_type);
if !unique_props.is_empty() {
let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type);
if !unique_groups.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&batch,
type_name,
&unique_props,
&unique_groups,
)?;
}
let table_key = format!("edge:{}", type_name);
@ -1093,12 +1094,12 @@ impl Omnigraph {
let node_type = &self.catalog().node_types[type_name];
crate::loader::validate_value_constraints(&updated, node_type)?;
crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
if !unique_groups.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&updated,
type_name,
&unique_props,
&unique_groups,
)?;
}

View file

@ -495,25 +495,21 @@ impl StagedMutation {
// until `ensure_path` learns how to bump expected_version on
// op-kind upgrade.
//
// Why per-branch (and not the bound-branch `db.snapshot()`):
// when the caller mutates a branch other than the engine's
// bound branch (e.g., feature-branch ingest from a server
// handle bound to main), `db.snapshot()` returns the bound
// branch's view of each table — which is the wrong pin for
// the publisher's CAS on a different branch. Using
// `snapshot_for_branch(branch)` resolves the per-branch
// entries correctly. The cost is one fresh manifest read per
// mutation; PR 1b's regression came from this same read, but
// that read is now strictly necessary for cross-branch
// correctness. Single-table same-branch mutations could still
// skip this read (queue exclusivity makes the publisher CAS a
// no-op), but the conditional adds complexity for marginal
// gain — left as a follow-up perf optimization.
// Why a fresh per-branch snapshot (and not the bound-branch
// `db.snapshot()` / `snapshot_for_branch()` fast path): a stale
// engine handle may be bound to the same branch it is writing. For
// non-strict Insert/Merge, that stale local view is allowed to rebase
// to the live manifest pin under the queue; only uncovered Lance
// HEAD>manifest drift is refused. For writes targeting a branch other
// than the engine's bound branch (e.g., feature-branch ingest from a
// server handle bound to main), the same helper also resolves the
// correct branch pin. The cost is one fresh manifest read per mutation
// plus one Lance HEAD open per staged table for the drift guard below.
//
// Multi-coordinator deployments (§VI.27 aspirational) get
// genuine cross-process drift detection from this read for
// free.
let snapshot = db.snapshot_for_branch(branch).await?;
let snapshot = db.fresh_snapshot_for_branch(branch).await?;
for entry in staged.iter_mut() {
let current = snapshot
.entry(&entry.table_key)
@ -541,6 +537,35 @@ impl StagedMutation {
));
}
// Separate manifest-visible concurrency from uncovered Lance drift.
// Non-strict inserts/merges are allowed to rebase from their staged
// read version to the fresh manifest pin above, but only if the
// live Lance HEAD still equals that manifest pin. If an external
// raw Lance write or a pre-fix maintenance path moved HEAD without
// publishing `__manifest`, this write must not silently fold it.
let head = db
.table_store()
.open_dataset_head_for_write(
&entry.table_key,
&entry.path.full_path,
entry.path.table_branch.as_deref(),
)
.await?
.version()
.version;
if head < current {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
entry.table_key, head, current
)));
}
if head > current {
return Err(OmniError::manifest_conflict(format!(
"table '{}' has Lance HEAD version {} ahead of manifest version {}; run `omnigraph repair` before writing",
entry.table_key, head, current
)));
}
entry.expected_version = current;
expected_versions.insert(entry.table_key.clone(), current);
}

View file

@ -399,9 +399,9 @@ async fn load_jsonl_reader<R: BufRead>(
let batch = build_node_batch(node_type, rows)?;
validate_value_constraints(&batch, node_type)?;
validate_enum_constraints(&batch, &node_type.properties, type_name)?;
let unique_props = unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
let unique_groups = unique_constraint_groups_for_node(node_type);
if !unique_groups.is_empty() {
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
}
let loaded_count = batch.num_rows();
let table_key = format!("node:{}", type_name);
@ -510,9 +510,9 @@ async fn load_jsonl_reader<R: BufRead>(
let edge_type = &catalog.edge_types[edge_name];
let batch = build_edge_batch(edge_type, rows)?;
validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
let unique_props = unique_property_names_for_edge(edge_type);
if !unique_props.is_empty() {
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
let unique_groups = unique_constraint_groups_for_edge(edge_type);
if !unique_groups.is_empty() {
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
}
let loaded_count = batch.num_rows();
let table_key = format!("edge:{}", edge_name);
@ -1425,8 +1425,16 @@ pub(crate) fn validate_enum_constraints(
Ok(())
}
/// Detect duplicate values within a single `RecordBatch` for any of the named
/// `unique_properties`. Returns an error on the first duplicate found.
/// Detect duplicate values within a single `RecordBatch` for any of the
/// `unique_constraints` groups. Each group is a list of one or more columns
/// that together form a uniqueness key: a violation occurs when two rows share
/// the same tuple of values across *all* columns in a group, so a composite
/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an
/// error on the first duplicate found.
///
/// Rows where any column in a group is null are exempt (standard SQL semantics
/// for uniqueness over nullable columns), as is any group whose columns are
/// not all present in the batch (e.g. a partial-schema load).
///
/// Note: this only catches duplicates *within* the batch. Cross-batch
/// uniqueness against already-committed rows is not enforced here — that
@ -1434,22 +1442,39 @@ pub(crate) fn validate_enum_constraints(
pub(crate) fn enforce_unique_constraints_intra_batch(
batch: &RecordBatch,
type_name: &str,
unique_properties: &[String],
unique_constraints: &[Vec<String>],
) -> Result<()> {
for property in unique_properties {
let Some(col_idx) = batch.schema().index_of(property).ok() else {
for columns in unique_constraints {
let Some(col_indices) = columns
.iter()
.map(|name| batch.schema().index_of(name).ok())
.collect::<Option<Vec<usize>>>()
else {
continue;
};
let arr = batch.column(col_idx);
let mut seen: HashMap<String, usize> = HashMap::new();
for row in 0..batch.num_rows() {
let Some(value) = scalar_to_string(arr, row) else {
let mut parts = Vec::with_capacity(col_indices.len());
let mut any_null = false;
for &col_idx in &col_indices {
let Some(value) = scalar_to_string(batch.column(col_idx), row) else {
any_null = true;
break;
};
parts.push(value);
}
if any_null {
continue;
};
}
let value = composite_unique_key(&parts);
if let Some(prev_row) = seen.insert(value.clone(), row) {
return Err(OmniError::manifest(format!(
"@unique violation on {}.{}: value '{}' appears in rows {} and {}",
type_name, property, value, prev_row, row
type_name,
format_unique_columns(columns),
value,
prev_row,
row
)));
}
}
@ -1457,6 +1482,27 @@ pub(crate) fn enforce_unique_constraints_intra_batch(
Ok(())
}
/// Join one row's rendered, non-null column values into a single composite
/// uniqueness key. The separator is the unit separator (U+001F) — a control
/// char highly unlikely to occur in real data, so distinct tuples like
/// `("a|b", "c")` and `("a", "b|c")` stay distinct rather than colliding.
///
/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and
/// the branch-merge path (`exec/merge.rs::update_unique_constraints`) so the
/// two cannot silently drift to incompatible keyings.
pub(crate) fn composite_unique_key(parts: &[String]) -> String {
parts.join("\u{1f}")
}
/// Render a unique constraint's columns for error messages: a single column
/// as `col`, a composite as `(a, b)`.
fn format_unique_columns(columns: &[String]) -> String {
match columns {
[single] => single.clone(),
_ => format!("({})", columns.join(", ")),
}
}
/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
/// uniqueness comparison. Returns `None` for null values (nulls are exempt
/// from uniqueness in standard SQL semantics).
@ -1498,39 +1544,30 @@ fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
None
}
/// Build the flat list of property names that must be checked for uniqueness
/// on a node type. Includes both `@unique` properties (from
/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
pub(crate) fn unique_property_names_for_node(
/// Build the list of uniqueness constraint groups to enforce on a node type.
/// Each group is the column tuple of one constraint. Includes every
/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the
/// `@key` (which implies uniqueness over its column tuple). Grouping is
/// preserved so a composite `@unique(a, b)` is enforced as a composite key
/// rather than degraded into independent single-field checks.
pub(crate) fn unique_constraint_groups_for_node(
node_type: &omnigraph_compiler::catalog::NodeType,
) -> Vec<String> {
let mut props: Vec<String> = node_type
.unique_constraints
.iter()
.flatten()
.cloned()
.collect();
if let Some(key) = &node_type.key {
props.extend(key.iter().cloned());
) -> Vec<Vec<String>> {
let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
if let Some(key) = &node_type.key
&& !groups.contains(key)
{
groups.push(key.clone());
}
props.sort();
props.dedup();
props
groups
}
/// Same as [`unique_property_names_for_node`] but for an edge type.
pub(crate) fn unique_property_names_for_edge(
/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges
/// have no `@key`).
pub(crate) fn unique_constraint_groups_for_edge(
edge_type: &omnigraph_compiler::catalog::EdgeType,
) -> Vec<String> {
let mut props: Vec<String> = edge_type
.unique_constraints
.iter()
.flatten()
.cloned()
.collect();
props.sort();
props.dedup();
props
) -> Vec<Vec<String>> {
edge_type.unique_constraints.clone()
}
fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {

View file

@ -856,7 +856,7 @@ impl TableStore {
// before the FirstSeen setter has a chance to silently collapse
// anything):
// - Load path: `enforce_unique_constraints_intra_batch`
// (`loader/mod.rs:1453`) errors on intra-batch `@key` dups.
// (`loader/mod.rs:1471`) errors on intra-batch `@key` dups.
// - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
// accumulates and dedupes by `id`.
// - Branch-merge path: `compute_source_delta` /

View file

@ -39,6 +39,26 @@ query insert_user($name: String, $email: String) {
}
"#;
const EDGE_UNIQUE_SCHEMA: &str = r#"
node Person {
name: String @key
}
edge Knows: Person -> Person {
@unique(src, dst)
}
"#;
const EDGE_UNIQUE_DATA: &str = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Person","data":{"name":"Bob"}}
{"type":"Person","data":{"name":"Carol"}}"#;
const EDGE_UNIQUE_MUTATIONS: &str = r#"
query add_knows($from: String, $to: String) {
insert Knows { from: $from, to: $to }
}
"#;
const CARDINALITY_SCHEMA: &str = r#"
node Person {
name: String @key
@ -1119,6 +1139,87 @@ async fn branch_merge_reports_unique_violation_conflict() {
}
}
/// Regression for the MR-983 follow-up: the branch-merge path must enforce an
/// edge composite `@unique(src, dst)` as a true composite key, consistent with
/// the intake path. Two branches inserting the *same* (src, dst) pair must
/// conflict on merge.
#[tokio::test]
async fn branch_merge_reports_composite_unique_violation_conflict() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await;
main.branch_create("feature").await.unwrap();
let mut feature = Omnigraph::open(uri).await.unwrap();
mutate_main(
&mut main,
EDGE_UNIQUE_MUTATIONS,
"add_knows",
&params(&[("$from", "Alice"), ("$to", "Bob")]),
)
.await
.unwrap();
mutate_branch(
&mut feature,
"feature",
EDGE_UNIQUE_MUTATIONS,
"add_knows",
&params(&[("$from", "Alice"), ("$to", "Bob")]),
)
.await
.unwrap();
let err = main.branch_merge("feature", "main").await.unwrap_err();
match err {
OmniError::MergeConflicts(conflicts) => {
assert!(conflicts.iter().any(|conflict| {
conflict.table_key == "edge:Knows"
&& conflict.kind == MergeConflictKind::UniqueViolation
}));
}
other => panic!("expected merge conflicts, got {other:?}"),
}
}
/// Sibling to the above: pairs sharing `src` but differing on `dst` are unique
/// on the (src, dst) tuple and must merge cleanly. Guards against the composite
/// degrading back into a single-field `@unique(src)` on the merge path.
#[tokio::test]
async fn branch_merge_allows_distinct_composite_unique_pairs() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await;
main.branch_create("feature").await.unwrap();
let mut feature = Omnigraph::open(uri).await.unwrap();
mutate_main(
&mut main,
EDGE_UNIQUE_MUTATIONS,
"add_knows",
&params(&[("$from", "Alice"), ("$to", "Bob")]),
)
.await
.unwrap();
mutate_branch(
&mut feature,
"feature",
EDGE_UNIQUE_MUTATIONS,
"add_knows",
&params(&[("$from", "Alice"), ("$to", "Carol")]),
)
.await
.unwrap();
main.branch_merge("feature", "main")
.await
.expect("distinct (src, dst) pairs are unique on the composite and must merge cleanly");
assert_eq!(count_rows(&main, "edge:Knows").await, 2);
}
#[tokio::test]
async fn branch_merge_reports_cardinality_violation_conflict() {
let dir = tempfile::tempdir().unwrap();

View file

@ -188,7 +188,7 @@ node Thing {
///
/// Defense in depth:
/// 1. The loader's `enforce_unique_constraints_intra_batch`
/// (`loader/mod.rs:1453`), invoked unconditionally on any node type
/// (`loader/mod.rs:1471`), invoked unconditionally on any node type
/// with a `@key`, errors on intra-batch duplicate `@key` values at
/// intake — pinned by this test across every `LoadMode`.
/// 2. The `check_batch_unique_by_keys` precondition at the top of
@ -229,6 +229,57 @@ node Thing {
}
}
/// Regression for MR-983: a node-level composite `@unique(a, b)` must be
/// enforced as a true composite key, not degraded into independent
/// single-field checks. Pre-fix, `unique_property_names_for_node` flattened
/// every constraint group into one property list, so `@unique(source,
/// external_id)` was enforced as `@unique(source)` *and* `@unique(external_id)`
/// — rejecting rows that were unique on the composite key and naming only the
/// first field in the error.
#[tokio::test]
async fn loader_enforces_composite_unique_as_composite_key() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = r#"
node ExternalID {
slug: String @key
source: String @index
external_id: String @index
@unique(source, external_id)
}
"#;
let mut db = Omnigraph::init(uri, schema).await.unwrap();
// Same `source`, different `external_id` → unique on the composite key.
// This is the exact repro from MR-983 and must be accepted.
let composite_ok = r#"{"type":"ExternalID","data":{"slug":"a","source":"whatsapp","external_id":"+E.164"}}
{"type":"ExternalID","data":{"slug":"b","source":"whatsapp","external_id":"pn:12345"}}
"#;
load_jsonl(&mut db, composite_ok, LoadMode::Overwrite)
.await
.expect("rows unique on the composite (source, external_id) must be accepted");
assert_eq!(count_rows(&db, "node:ExternalID").await, 2);
// Both composite columns equal → genuine violation. The error must name
// the whole composite, not just the first field.
let composite_dupe = r#"{"type":"ExternalID","data":{"slug":"c","source":"whatsapp","external_id":"dup"}}
{"type":"ExternalID","data":{"slug":"d","source":"whatsapp","external_id":"dup"}}
"#;
let err = load_jsonl(&mut db, composite_dupe, LoadMode::Overwrite)
.await
.unwrap_err();
let msg = err.to_string();
// Columns are canonicalized to sorted order in the catalog, so the
// message reads `(external_id, source)`; assert order-agnostically that
// both composite columns are named (not just the first, as pre-fix).
assert!(
msg.contains("@unique violation")
&& msg.contains("source")
&& msg.contains("external_id"),
"composite violation must name both columns (got: {msg})"
);
}
/// Canary for the upstream Lance gap that the `FirstSeen` workaround
/// in `table_store.rs` masks. The bug class is "Window 2": load →
/// indices built explicitly → merge → merge. Even with the engine

View file

@ -30,6 +30,7 @@ use arrow_schema::{DataType, Field, Schema};
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::optimize::{CompactionOptions, compact_files};
use lance::dataset::transaction::Operation;
use lance::dataset::write::delete::DeleteResult;
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams};
use lance::index::DatasetIndexExt;
@ -225,6 +226,33 @@ async fn _compile_compact_files_signature() -> lance::Result<()> {
Ok(())
}
// --- Guard 7b: transaction history exposes repair's classification surface -
//
// `db/omnigraph/repair.rs` reads Lance transactions between manifest and HEAD
// and treats only `ReserveFragments` + `Rewrite` as safe maintenance drift.
// Compile-only.
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_transaction_history_for_repair_signature() -> lance::Result<()> {
let ds: Dataset = unimplemented!();
let tx = ds.read_transaction_by_version(1u64).await?;
if let Some(tx) = tx {
let operation = tx.operation;
let _name: &str = operation.name();
match operation {
Operation::Rewrite { .. } | Operation::ReserveFragments { .. } => {}
_ => {}
}
}
Ok(())
}
// --- Guard 8: Dataset::delete returns DeleteResult { new_dataset, num_deleted_rows } ---
//
// `table_store.rs::delete_where` consumes both fields. When MR-A migrates
@ -332,7 +360,10 @@ async fn compact_files_still_fails_on_blob_columns() {
]));
RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(ids)) as _, Arc::new(content) as _],
vec![
Arc::new(StringArray::from(ids)) as _,
Arc::new(content) as _,
],
)
.unwrap()
}

View file

@ -8,7 +8,11 @@ mod helpers;
use std::time::Duration;
use lance::Dataset;
use omnigraph::db::{CleanupPolicyOptions, Omnigraph, ReadTarget, SkipReason};
use lance::dataset::optimize::{CompactionOptions, compact_files};
use omnigraph::db::{
CleanupPolicyOptions, Omnigraph, ReadTarget, RepairAction, RepairClassification, RepairOptions,
SkipReason,
};
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::{
@ -27,11 +31,64 @@ fn node_table_uri(root: &str, type_name: &str) -> String {
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
async fn person_manifest_and_head(db: &Omnigraph, root: &str) -> (u64, u64, String) {
let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let entry = snap.entry("node:Person").unwrap();
let full = format!("{}/{}", root.trim_end_matches('/'), entry.table_path);
let head = Dataset::open(&full).await.unwrap().version().version;
(entry.table_version, head, full)
}
async fn add_person_fragments(db: &mut Omnigraph) {
for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] {
mutate_main(
db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age as i64)]),
)
.await
.expect("insert");
}
}
async fn forge_person_compaction_drift(db: &mut Omnigraph, root: &str) -> (u64, u64, String) {
add_person_fragments(db).await;
let (manifest_version, _, full) = person_manifest_and_head(db, root).await;
let mut ds = Dataset::open(&full).await.unwrap();
let metrics = compact_files(&mut ds, CompactionOptions::default(), None)
.await
.expect("raw Lance compaction");
let lance_head_version = ds.version().version;
assert!(
lance_head_version > manifest_version,
"raw Lance compaction should advance HEAD beyond manifest"
);
assert!(
metrics.fragments_removed > 0 || metrics.fragments_added > 0,
"test precondition: raw compaction should rewrite fragments"
);
(manifest_version, lance_head_version, full)
}
async fn forge_person_delete_drift(db: &Omnigraph, root: &str) -> (u64, u64, String) {
let (manifest_version, _, full) = person_manifest_and_head(db, root).await;
let mut ds = Dataset::open(&full).await.unwrap();
let deleted = ds.delete("name = 'Alice'").await.expect("raw Lance delete");
assert_eq!(deleted.num_deleted_rows, 1, "fixture should delete Alice");
let lance_head_version = deleted.new_dataset.version().version;
assert!(
lance_head_version > manifest_version,
"raw Lance delete should advance HEAD beyond manifest"
);
(manifest_version, lance_head_version, full)
}
#[tokio::test]
async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let stats = db.optimize().await.unwrap();
@ -47,7 +104,7 @@ async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() {
#[tokio::test]
async fn optimize_after_load_then_again_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let db = init_and_load(&dir).await;
// First pass may compact (load wrote real fragments).
let _first = db.optimize().await.unwrap();
@ -180,7 +237,12 @@ node Tag {\n slug: String @key\n}\n";
#[tokio::test]
async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap().trim_end_matches('/').to_string();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
// Several separate inserts → multiple Person fragments, so `compact_files`
@ -234,6 +296,281 @@ async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() {
assert!(result.applied, "schema apply should report applied=true");
}
#[tokio::test]
async fn optimize_skips_preexisting_manifest_head_drift() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let stats = db.optimize().await.unwrap();
let person = stats
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person stat present");
assert_eq!(person.skipped, Some(SkipReason::DriftNeedsRepair));
assert!(!person.committed);
assert_eq!(person.manifest_version, Some(manifest_before));
assert_eq!(person.lance_head_version, Some(head_before));
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(
manifest_after, manifest_before,
"optimize must not publish uncovered drift"
);
assert_eq!(
head_after, head_before,
"optimize must not move drifted HEAD"
);
}
#[tokio::test]
async fn repair_preview_reports_verified_maintenance_drift_without_healing() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: false,
force: false,
})
.await
.unwrap();
assert_eq!(stats.manifest_version, None);
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(
person.classification,
RepairClassification::VerifiedMaintenance
);
assert_eq!(person.action, RepairAction::Preview);
assert_eq!(person.manifest_version, manifest_before);
assert_eq!(person.lance_head_version, head_before);
assert!(
person
.operations
.iter()
.all(|op| op == "ReserveFragments" || op == "Rewrite"),
"maintenance drift should only include Lance maintenance operations: {:?}",
person.operations
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(head_after, head_before);
}
#[tokio::test]
async fn repair_confirm_heals_verified_maintenance_drift() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (_, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: true,
force: false,
})
.await
.unwrap();
assert!(
stats.manifest_version.is_some(),
"confirmed repair should publish one manifest commit"
);
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(
person.classification,
RepairClassification::VerifiedMaintenance
);
assert_eq!(person.action, RepairAction::Healed);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, head_before);
assert_eq!(head_after, head_before);
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let result = db
.apply_schema(&desired)
.await
.expect("strict schema apply should succeed after repair");
assert!(result.applied);
}
#[tokio::test]
async fn repair_refuses_raw_delete_without_force() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_delete_drift(&db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: true,
force: false,
})
.await
.unwrap();
assert_eq!(stats.manifest_version, None);
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(person.classification, RepairClassification::Suspicious);
assert_eq!(person.action, RepairAction::Refused);
assert!(
person.operations.iter().any(|op| op == "Delete"),
"raw Lance delete should be reported as a suspicious operation: {:?}",
person.operations
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(head_after, head_before);
assert_eq!(
count_rows(&db, "node:Person").await,
4,
"manifest-pinned reads should still see the pre-delete version"
);
}
#[tokio::test]
async fn repair_force_heals_suspicious_drift() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let db = init_and_load(&dir).await;
let (_, head_before, _) = forge_person_delete_drift(&db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: true,
force: true,
})
.await
.unwrap();
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(person.classification, RepairClassification::Suspicious);
assert_eq!(person.action, RepairAction::Forced);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, head_before);
assert_eq!(head_after, head_before);
assert_eq!(
count_rows(&db, "node:Person").await,
3,
"forced repair publishes the raw delete's HEAD"
);
}
#[tokio::test]
async fn non_strict_load_refuses_uncovered_drift_before_folding_it() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let err = load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Ivan\",\"age\":44}}",
LoadMode::Merge,
)
.await
.expect_err("merge load must not silently fold uncovered drift");
assert!(
err.to_string().contains("omnigraph repair"),
"error should point at explicit repair; got: {err}"
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(head_after, head_before);
}
#[tokio::test]
async fn delete_only_mutation_refuses_uncovered_drift_before_inline_commit() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "Alice")], &[]),
)
.await
.expect_err("strict delete must reject uncovered drift before delete_where");
assert!(
err.to_string().contains("expected"),
"delete should fail as a strict stale-version write; got: {err}"
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(
head_after, head_before,
"delete_where must not run after the strict drift guard fails"
);
assert_eq!(
count_rows(&db, "node:Person").await,
8,
"manifest-pinned reads should still see all rows present before the failed delete"
);
}
// Regression: `optimize` must REFUSE when an unresolved recovery sidecar is
// pending. Operating on an unrecovered graph could publish a partial write that
// the all-or-nothing recovery sweep would roll back; the operator must reopen

View file

@ -6,8 +6,8 @@
//! What this file covers:
//! - No `__run__*` branches are created by load or mutate.
//! - Cancellation of a mutation future leaves no graph-level state.
//! - Concurrent writers to the same table land exactly one publish; the
//! loser surfaces `ManifestConflictDetails::ExpectedVersionMismatch`.
//! - Concurrent non-strict inserts/merges rebase under the per-table queue;
//! strict updates/deletes surface `ExpectedVersionMismatch` on stale state.
//! - Failed mutations and loads leave the target unchanged.
//! - Multi-statement mutations are atomic (one commit per query).
//! - actor_id propagates through to the commit graph.
@ -17,7 +17,7 @@ mod helpers;
use arrow_array::Array;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
use omnigraph::error::OmniError;
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::*;
@ -241,18 +241,11 @@ async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() {
assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish");
}
/// Concurrent writers to the same `(table, branch)` produce exactly one
/// success and one `ExpectedVersionMismatch`. The replacement for the old
/// `concurrent_conflicting_run_publish_fails_cleanly` test — the OCC fence
/// has moved from a graph-level run-publish merge into the publisher's
/// per-table CAS.
///
/// Drives the race by interleaving two handles that captured the same
/// pre-write manifest snapshot: A commits first; B's commit then sees
/// `expected_versions[node:Person] = pre` while the manifest is at
/// `pre + 1`, and the publisher rejects.
/// Stale non-strict writers rebase to the live manifest pin under the
/// per-table queue instead of folding raw drift or returning a false 409.
/// Strict update/delete semantics are covered by the consistency/server tests.
#[tokio::test]
async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() {
async fn stale_non_strict_insert_rebases_to_live_manifest_pin() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_string_lossy().into_owned();
@ -281,40 +274,30 @@ async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() {
.unwrap();
}
// Writer B's coordinator is still at the pre-A snapshot. Its mutation
// captures expected_versions[node:Person] = pre (stale), then publishes
// — the publisher's CAS pre-check sees the manifest is now at post and
// rejects with ExpectedVersionMismatch.
let result_b = db_b
.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "WriterB")], &[("$age", 42)]),
)
.await;
// Writer B's coordinator is still at the pre-A snapshot, but Insert is
// non-strict: commit_all re-reads the live manifest pin under the queue,
// verifies Lance HEAD equals that pin, and then lets Lance rebase the
// staged append.
db_b.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "WriterB")], &[("$age", 42)]),
)
.await
.unwrap();
let err = result_b.expect_err("stale writer must hit ExpectedVersionMismatch");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert_eq!(manifest_err.kind, ManifestErrorKind::Conflict);
let Some(ManifestConflictDetails::ExpectedVersionMismatch {
ref table_key,
expected,
actual,
}) = manifest_err.details
else {
panic!(
"expected ExpectedVersionMismatch, got {:?}",
manifest_err.details,
);
};
assert_eq!(table_key, "node:Person");
assert!(
actual > expected,
"actual ({actual}) should be ahead of expected ({expected})",
);
for name in ["WriterA", "WriterB"] {
let person = query_main(
&mut db_b,
TEST_QUERIES,
"get_person",
&params(&[("$name", name)]),
)
.await
.unwrap();
assert_eq!(person.num_rows(), 1, "{name} should be visible");
}
}
/// The cancellation hole that motivated removing the Run state machine: dropping a mutation future