mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-21 02:28:07 +02:00
fix(maintenance): route uncovered drift through repair (#156)
* docs(invariants): note the non-atomic manifest->commit-graph publish gap Every graph publish commits __manifest then appends _graph_commits as two separate writes; a crash between them leaves the manifest ahead of the commit DAG. Live reads + durability are unaffected (reads resolve via the manifest) and recovery does not repair it; impact is bounded to commit history / time-travel by commit id / merge-base completeness. Pre-existing across all publishes, not the optimize reconcile specifically. Documented as a Known Gap; the fix is a commit-graph reconcilable from the manifest, not a recovery sidecar. * fix(maintenance): route uncovered drift through repair * fix(maintenance): harden repair review feedback
This commit is contained in:
parent
5eead8d29e
commit
d0e39e677e
16 changed files with 1108 additions and 93 deletions
|
|
@ -283,6 +283,25 @@ enum Command {
|
|||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Classify and explicitly repair manifest/head drift
|
||||
Repair {
|
||||
/// Graph URI
|
||||
uri: Option<String>,
|
||||
#[arg(long)]
|
||||
target: Option<String>,
|
||||
#[arg(long)]
|
||||
config: Option<PathBuf>,
|
||||
/// Publish verified maintenance drift. Without this flag, repair only
|
||||
/// previews what it would do.
|
||||
#[arg(long)]
|
||||
confirm: bool,
|
||||
/// Also publish suspicious or unverifiable drift. Requires
|
||||
/// `--confirm`; use only after operator review.
|
||||
#[arg(long, requires = "confirm")]
|
||||
force: bool,
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Remove old Lance versions from every table of the graph (destructive)
|
||||
Cleanup {
|
||||
/// Graph URI
|
||||
|
|
@ -3012,6 +3031,8 @@ async fn main() -> Result<()> {
|
|||
"fragments_added": s.fragments_added,
|
||||
"committed": s.committed,
|
||||
"skipped": s.skipped.map(|r| r.as_str()),
|
||||
"manifest_version": s.manifest_version,
|
||||
"lance_head_version": s.lance_head_version,
|
||||
})).collect::<Vec<_>>(),
|
||||
});
|
||||
print_json(&value)?;
|
||||
|
|
@ -3031,6 +3052,89 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
}
|
||||
}
|
||||
Command::Repair {
|
||||
uri,
|
||||
target,
|
||||
config,
|
||||
confirm,
|
||||
force,
|
||||
json,
|
||||
} => {
|
||||
let config = load_cli_config(config.as_ref())?;
|
||||
let uri = resolve_uri(&config, uri, target.as_deref())?;
|
||||
let db = Omnigraph::open(&uri).await?;
|
||||
let stats = db
|
||||
.repair(omnigraph::db::RepairOptions { confirm, force })
|
||||
.await?;
|
||||
let refused_count = stats
|
||||
.tables
|
||||
.iter()
|
||||
.filter(|s| matches!(s.action, omnigraph::db::RepairAction::Refused))
|
||||
.count();
|
||||
if json {
|
||||
let value = serde_json::json!({
|
||||
"uri": uri,
|
||||
"confirm": confirm,
|
||||
"force": force,
|
||||
"manifest_version": stats.manifest_version,
|
||||
"tables": stats.tables.iter().map(|s| serde_json::json!({
|
||||
"table_key": s.table_key,
|
||||
"manifest_version": s.manifest_version,
|
||||
"lance_head_version": s.lance_head_version,
|
||||
"classification": s.classification.as_str(),
|
||||
"action": s.action.as_str(),
|
||||
"operations": s.operations,
|
||||
"error": s.error,
|
||||
})).collect::<Vec<_>>(),
|
||||
});
|
||||
print_json(&value)?;
|
||||
} else {
|
||||
let mode = if confirm { "confirm" } else { "preview" };
|
||||
println!(
|
||||
"repair {} — {} mode, {} tables",
|
||||
uri,
|
||||
mode,
|
||||
stats.tables.len()
|
||||
);
|
||||
for s in &stats.tables {
|
||||
let drift = if s.manifest_version == s.lance_head_version {
|
||||
format!("{}", s.manifest_version)
|
||||
} else {
|
||||
format!("{} → {}", s.manifest_version, s.lance_head_version)
|
||||
};
|
||||
let ops = if s.operations.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
format!(" [{}]", s.operations.join(", "))
|
||||
};
|
||||
let err = s
|
||||
.error
|
||||
.as_ref()
|
||||
.map(|err| format!(" ({err})"))
|
||||
.unwrap_or_default();
|
||||
println!(
|
||||
" {:<40} {:<12} {:<22} {}{}{}",
|
||||
s.table_key,
|
||||
s.action.as_str(),
|
||||
s.classification.as_str(),
|
||||
drift,
|
||||
ops,
|
||||
err
|
||||
);
|
||||
}
|
||||
if !confirm {
|
||||
println!("rerun with --confirm to publish verified maintenance drift");
|
||||
}
|
||||
}
|
||||
if refused_count > 0 {
|
||||
bail!(
|
||||
"repair refused {} suspicious or unverifiable table(s); review the preview \
|
||||
output and rerun with --force --confirm only if publishing that drift is \
|
||||
intentional",
|
||||
refused_count
|
||||
);
|
||||
}
|
||||
}
|
||||
Command::Cleanup {
|
||||
uri,
|
||||
target,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use std::fs;
|
||||
|
||||
use lance::Dataset;
|
||||
use lance::index::DatasetIndexExt;
|
||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||
use serde_json::Value;
|
||||
|
|
@ -60,6 +61,25 @@ fn manifest_dataset_version(graph: &std::path::Path) -> u64 {
|
|||
})
|
||||
}
|
||||
|
||||
fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) {
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let uri = graph.to_string_lossy();
|
||||
let db = Omnigraph::open(uri.as_ref()).await.unwrap();
|
||||
let snap = db
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap();
|
||||
let entry = snap.entry("node:Person").unwrap();
|
||||
let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
|
||||
let mut ds = Dataset::open(&full_path).await.unwrap();
|
||||
let deleted = ds.delete("name = 'Alice'").await.unwrap();
|
||||
assert_eq!(deleted.num_deleted_rows, 1);
|
||||
let head = deleted.new_dataset.version().version;
|
||||
assert!(head > entry.table_version);
|
||||
(entry.table_version, head)
|
||||
})
|
||||
}
|
||||
|
||||
fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) {
|
||||
let config = root.join("omnigraph.yaml");
|
||||
let policy = root.join("policy.yaml");
|
||||
|
|
@ -235,6 +255,83 @@ fn init_creates_graph_successfully_on_missing_local_directory() {
|
|||
assert!(temp.path().join("omnigraph.yaml").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn repair_json_reports_noop_on_clean_graph() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
init_graph(&graph);
|
||||
load_fixture(&graph);
|
||||
|
||||
let output = output_success(cli().arg("repair").arg("--json").arg(&graph));
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
|
||||
assert_eq!(payload["confirm"], false);
|
||||
assert_eq!(payload["force"], false);
|
||||
assert_eq!(payload["manifest_version"], Value::Null);
|
||||
let tables = payload["tables"].as_array().unwrap();
|
||||
assert_eq!(tables.len(), 4);
|
||||
assert!(tables.iter().all(|table| {
|
||||
table["classification"] == "no_drift" && table["action"] == "no_op"
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn repair_confirm_json_refuses_suspicious_drift_with_nonzero_exit_then_force_succeeds() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
init_graph(&graph);
|
||||
load_fixture(&graph);
|
||||
let graph_manifest_before = manifest_dataset_version(&graph);
|
||||
let (table_manifest_before, table_head_before) = forge_person_delete_drift(&graph);
|
||||
|
||||
let refused = output_failure(
|
||||
cli()
|
||||
.arg("repair")
|
||||
.arg("--confirm")
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let refused_payload: Value = serde_json::from_slice(&refused.stdout).unwrap();
|
||||
assert_eq!(refused_payload["manifest_version"], Value::Null);
|
||||
let person = refused_payload["tables"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.find(|table| table["table_key"] == "node:Person")
|
||||
.unwrap();
|
||||
assert_eq!(person["classification"], "suspicious");
|
||||
assert_eq!(person["action"], "refused");
|
||||
assert!(
|
||||
String::from_utf8_lossy(&refused.stderr).contains("repair refused"),
|
||||
"stderr should explain the non-zero exit; got: {}",
|
||||
String::from_utf8_lossy(&refused.stderr)
|
||||
);
|
||||
assert_eq!(manifest_dataset_version(&graph), graph_manifest_before);
|
||||
|
||||
let forced = output_success(
|
||||
cli()
|
||||
.arg("repair")
|
||||
.arg("--force")
|
||||
.arg("--confirm")
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let forced_payload: Value = serde_json::from_slice(&forced.stdout).unwrap();
|
||||
let forced_manifest = forced_payload["manifest_version"].as_u64().unwrap();
|
||||
assert!(forced_manifest > graph_manifest_before);
|
||||
let person = forced_payload["tables"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.find(|table| table["table_key"] == "node:Person")
|
||||
.unwrap();
|
||||
assert_eq!(person["classification"], "suspicious");
|
||||
assert_eq!(person["action"], "forced");
|
||||
assert_eq!(person["manifest_version"], table_manifest_before);
|
||||
assert_eq!(person["lance_head_version"], table_head_before);
|
||||
assert_eq!(manifest_dataset_version(&graph), forced_manifest);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_plan_json_reports_supported_additive_change() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue