From 92fa3189f768ab71463fc3e75b7c49574abcf606 Mon Sep 17 00:00:00 2001 From: andrew Date: Sun, 12 Apr 2026 04:01:14 +0300 Subject: [PATCH] Add schema apply command and policy support --- Cargo.lock | 2 + README.md | 3 +- crates/omnigraph-cli/Cargo.toml | 1 + crates/omnigraph-cli/src/main.rs | 65 +- crates/omnigraph-cli/tests/cli.rs | 238 +++++++ crates/omnigraph-cli/tests/support/mod.rs | 10 + crates/omnigraph-cli/tests/system_remote.rs | 101 +++ crates/omnigraph-server/Cargo.toml | 1 + crates/omnigraph-server/src/api.rs | 31 +- crates/omnigraph-server/src/lib.rs | 30 +- crates/omnigraph-server/src/policy.rs | 45 ++ crates/omnigraph-server/tests/server.rs | 357 +++++++++- crates/omnigraph/src/db/graph_coordinator.rs | 28 +- crates/omnigraph/src/db/manifest.rs | 48 +- crates/omnigraph/src/db/manifest/layout.rs | 4 + crates/omnigraph/src/db/manifest/publisher.rs | 256 +++++-- crates/omnigraph/src/db/manifest/state.rs | 74 +- crates/omnigraph/src/db/manifest/tests.rs | 70 +- crates/omnigraph/src/db/mod.rs | 2 +- crates/omnigraph/src/db/omnigraph.rs | 665 ++++++++++++++++-- crates/omnigraph/src/table_store.rs | 14 + docs/cli.md | 4 + 22 files changed, 1903 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b1bd7b..6ba77b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4547,6 +4547,7 @@ dependencies = [ "assert_cmd", "clap", "color-eyre", + "lance-index", "omnigraph", "omnigraph-compiler", "omnigraph-server", @@ -4589,6 +4590,7 @@ dependencies = [ "clap", "color-eyre", "futures", + "lance-index", "omnigraph", "omnigraph-compiler", "serde", diff --git a/README.md b/README.md index d2b7477..dad10c0 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ cargo run -p omnigraph-cli -- read \ ``` Server routes include `/healthz`, `/snapshot`, `/export`, `/read`, `/change`, -`/ingest`, `/branches`, `/runs`, and `/commits`. +`/schema/apply`, `/ingest`, `/branches`, `/runs`, and `/commits`. To require auth, set `OMNIGRAPH_SERVER_BEARER_TOKEN` on the server and set the matching bearer token env var in your CLI target config. @@ -110,6 +110,7 @@ Core repo flow: ```bash omnigraph init --schema ./schema.pg ./repo.omni omnigraph load --data ./data.jsonl --mode overwrite ./repo.omni +omnigraph schema apply --schema ./next.pg ./repo.omni --json omnigraph snapshot ./repo.omni --branch main --json omnigraph read --uri ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}' omnigraph change --uri ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}' diff --git a/crates/omnigraph-cli/Cargo.toml b/crates/omnigraph-cli/Cargo.toml index 254ea79..df60889 100644 --- a/crates/omnigraph-cli/Cargo.toml +++ b/crates/omnigraph-cli/Cargo.toml @@ -26,3 +26,4 @@ assert_cmd = "2" predicates = "3" serde_json = { workspace = true } tempfile = { workspace = true } +lance-index = { workspace = true } diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 707cf9d..3bbe4a0 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -14,8 +14,9 @@ use omnigraph_server::api::{ BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput, CommitOutput, ErrorOutput, ExportRequest, IngestOutput, IngestRequest, ReadOutput, ReadRequest, - RunListOutput, RunOutput, SnapshotOutput, SnapshotTableOutput, commit_output, ingest_output, - read_output, run_output, snapshot_payload, + RunListOutput, RunOutput, SchemaApplyOutput, SchemaApplyRequest, SnapshotOutput, + SnapshotTableOutput, commit_output, ingest_output, read_output, run_output, + schema_apply_output, snapshot_payload, }; use omnigraph_server::{ AliasCommand, OmnigraphConfig, PolicyAction, PolicyDecision, PolicyEngine, PolicyRequest, @@ -280,6 +281,19 @@ enum SchemaCommand { #[arg(long)] json: bool, }, + /// Apply a supported schema migration + Apply { + /// Repo URI + uri: Option, + #[arg(long)] + target: Option, + #[arg(long)] + config: Option, + #[arg(long)] + schema: PathBuf, + #[arg(long)] + json: bool, + }, } #[derive(Debug, Subcommand)] @@ -448,6 +462,20 @@ struct SchemaPlanOutput<'a> { steps: &'a [SchemaMigrationStep], } +fn print_schema_apply_human(output: &SchemaApplyOutput) { + println!("schema apply for {}", output.uri); + println!("supported: {}", if output.supported { "yes" } else { "no" }); + println!("applied: {}", if output.applied { "yes" } else { "no" }); + println!("manifest_version: {}", output.manifest_version); + if output.steps.is_empty() { + println!("no schema changes"); + return; + } + for step in &output.steps { + println!("- {}", render_schema_plan_step(step)); + } +} + fn ensure_local_repo_parent(uri: &str) -> Result<()> { if !uri.contains("://") { fs::create_dir_all(uri)?; @@ -1796,6 +1824,39 @@ async fn main() -> Result<()> { print_schema_plan_human(&uri, &plan); } } + SchemaCommand::Apply { + uri, + target, + config, + schema, + json, + } => { + let config = load_cli_config(config.as_ref())?; + let bearer_token = + resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; + let uri = resolve_uri(&config, uri, target.as_deref())?; + let schema_source = fs::read_to_string(&schema)?; + let output = if is_remote_uri(&uri) { + remote_json::( + &http_client, + Method::POST, + remote_url(&uri, "/schema/apply"), + Some(serde_json::to_value(SchemaApplyRequest { + schema_source: schema_source.clone(), + })?), + bearer_token.as_deref(), + ) + .await? + } else { + let mut db = Omnigraph::open(&uri).await?; + schema_apply_output(&uri, db.apply_schema(&schema_source).await?) + }; + if json { + print_json(&output)?; + } else { + print_schema_apply_human(&output); + } + } }, Command::Snapshot { uri, diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index d2d20d2..b6925e5 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1,5 +1,6 @@ use std::fs; +use lance_index::traits::DatasetIndexExt; use omnigraph::db::{Omnigraph, ReadTarget}; use serde_json::Value; use tempfile::tempdir; @@ -299,6 +300,243 @@ fn schema_plan_json_reports_unsupported_type_change() { })); } +#[test] +fn schema_apply_json_applies_supported_migration() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = temp.path().join("next.pg"); + init_repo(&repo); + + let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + fs::write(&schema_path, next_schema).unwrap(); + + let output = output_success( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg("--json") + .arg(&repo), + ); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + + assert_eq!(payload["supported"], true); + assert_eq!(payload["applied"], true); + assert_eq!(payload["step_count"], 1); + + let db = tokio::runtime::Runtime::new() + .unwrap() + .block_on(Omnigraph::open(repo.to_string_lossy().as_ref())) + .unwrap(); + assert!( + db.catalog().node_types["Person"] + .properties + .contains_key("nickname") + ); +} + +#[test] +fn schema_apply_human_reports_noop() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = fixture("test.pg"); + init_repo(&repo); + + let output = output_success( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg(&repo), + ); + let stdout = stdout_string(&output); + + assert!(stdout.contains("applied: no")); + assert!(stdout.contains("no schema changes")); +} + +#[test] +fn schema_apply_json_renames_type_and_updates_snapshot() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = temp.path().join("rename.pg"); + init_repo(&repo); + + let renamed_schema = fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n") + .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human") + .replace( + "edge WorksAt: Person -> Company", + "edge WorksAt: Human -> Company", + ); + fs::write(&schema_path, renamed_schema).unwrap(); + + let output = output_success( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg("--json") + .arg(&repo), + ); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + assert_eq!(payload["applied"], true); + + let db = tokio::runtime::Runtime::new() + .unwrap() + .block_on(Omnigraph::open(repo.to_string_lossy().as_ref())) + .unwrap(); + let snapshot = tokio::runtime::Runtime::new() + .unwrap() + .block_on(db.snapshot_of(ReadTarget::branch("main"))) + .unwrap(); + assert!(snapshot.entry("node:Human").is_some()); + assert!(snapshot.entry("node:Person").is_none()); +} + +#[test] +fn schema_apply_json_renames_property_and_updates_catalog() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = temp.path().join("rename-property.pg"); + init_repo(&repo); + + let renamed_schema = fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("age: I32?", "years: I32? @rename_from(\"age\")"); + fs::write(&schema_path, renamed_schema).unwrap(); + + let output = output_success( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg("--json") + .arg(&repo), + ); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + assert_eq!(payload["applied"], true); + + let db = tokio::runtime::Runtime::new() + .unwrap() + .block_on(Omnigraph::open(repo.to_string_lossy().as_ref())) + .unwrap(); + let person = &db.catalog().node_types["Person"]; + assert!(person.properties.contains_key("years")); + assert!(!person.properties.contains_key("age")); +} + +#[test] +fn schema_apply_json_adds_index_for_existing_property() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = temp.path().join("index.pg"); + init_repo(&repo); + + let before_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async { + let db = Omnigraph::open(repo.to_string_lossy().as_ref()) + .await + .unwrap(); + let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let dataset = snapshot.open("node:Person").await.unwrap(); + dataset.load_indices().await.unwrap().len() + }); + + let indexed_schema = fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("name: String @key", "name: String @key @index"); + fs::write(&schema_path, indexed_schema).unwrap(); + + let output = output_success( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg("--json") + .arg(&repo), + ); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + assert_eq!(payload["applied"], true); + + let after_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async { + let db = Omnigraph::open(repo.to_string_lossy().as_ref()) + .await + .unwrap(); + let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let dataset = snapshot.open("node:Person").await.unwrap(); + dataset.load_indices().await.unwrap().len() + }); + assert!(after_index_count > before_index_count); +} + +#[test] +fn schema_apply_rejects_unsupported_plan() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = temp.path().join("breaking.pg"); + init_repo(&repo); + + let breaking_schema = fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("age: I32?", "age: I64?"); + fs::write(&schema_path, breaking_schema).unwrap(); + + let output = output_failure( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg(&repo), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("changing property type")); +} + +#[test] +fn schema_apply_rejects_when_non_main_branch_exists() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let schema_path = temp.path().join("next.pg"); + init_repo(&repo); + output_success( + cli() + .arg("branch") + .arg("create") + .arg("--from") + .arg("main") + .arg("--uri") + .arg(&repo) + .arg("feature"), + ); + + let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + fs::write(&schema_path, next_schema).unwrap(); + + let output = output_failure( + cli() + .arg("schema") + .arg("apply") + .arg("--schema") + .arg(&schema_path) + .arg(&repo), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("schema apply requires a repo with only main")); +} + #[test] fn load_json_outputs_summary_for_main_branch() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph-cli/tests/support/mod.rs b/crates/omnigraph-cli/tests/support/mod.rs index 8e38ee4..2c05f0e 100644 --- a/crates/omnigraph-cli/tests/support/mod.rs +++ b/crates/omnigraph-cli/tests/support/mod.rs @@ -110,6 +110,10 @@ pub fn write_config(path: &Path, source: &str) { fs::write(path, source).unwrap(); } +pub fn write_file(path: &Path, source: &str) { + fs::write(path, source).unwrap(); +} + fn yaml_string(value: &str) -> String { format!("'{}'", value.replace('\'', "''")) } @@ -278,6 +282,12 @@ impl SystemRepo { path } + pub fn write_file(&self, name: &str, source: &str) -> PathBuf { + let path = self.repo.parent().unwrap().join(name); + write_file(&path, source); + path + } + pub fn spawn_server(&self) -> TestServer { spawn_server(&self.repo) } diff --git a/crates/omnigraph-cli/tests/system_remote.rs b/crates/omnigraph-cli/tests/system_remote.rs index dc7af37..f602d0e 100644 --- a/crates/omnigraph-cli/tests/system_remote.rs +++ b/crates/omnigraph-cli/tests/system_remote.rs @@ -2,6 +2,7 @@ mod support; use std::fs; +use omnigraph::db::Omnigraph; use reqwest::blocking::Client; use serde_json::json; @@ -217,6 +218,106 @@ query insert_person($name: String, $age: I32) { assert!(runs_payload["runs"].as_array().unwrap().len() >= 2); } +#[test] +#[ignore = "requires loopback socket permissions in sandboxed runners"] +fn remote_schema_apply_via_cli_updates_repo() { + let repo = SystemRepo::initialized(); + let server = repo.spawn_server(); + let config = repo.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url)); + let next_schema = repo.write_file( + "next.pg", + &fs::read_to_string(fixture("test.pg")).unwrap().replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ), + ); + + let payload = parse_stdout_json(&output_success( + cli() + .arg("schema") + .arg("apply") + .arg("--config") + .arg(&config) + .arg("--schema") + .arg(&next_schema) + .arg("--json"), + )); + assert_eq!(payload["applied"], true); + + let db = tokio::runtime::Runtime::new() + .unwrap() + .block_on(Omnigraph::open(repo.path().to_string_lossy().as_ref())) + .unwrap(); + assert!( + db.catalog().node_types["Person"] + .properties + .contains_key("nickname") + ); +} + +#[test] +#[ignore = "requires loopback socket permissions in sandboxed runners"] +fn remote_schema_apply_rejects_unsupported_plan() { + let repo = SystemRepo::initialized(); + let server = repo.spawn_server(); + let config = repo.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url)); + let breaking_schema = repo.write_file( + "breaking.pg", + &fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("age: I32?", "age: I64?"), + ); + + let output = output_failure( + cli() + .arg("schema") + .arg("apply") + .arg("--config") + .arg(&config) + .arg("--schema") + .arg(&breaking_schema), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("changing property type")); +} + +#[test] +#[ignore = "requires loopback socket permissions in sandboxed runners"] +fn remote_schema_apply_rejects_when_non_main_branch_exists() { + let repo = SystemRepo::initialized(); + output_success( + cli() + .arg("branch") + .arg("create") + .arg("--from") + .arg("main") + .arg("--uri") + .arg(repo.path()) + .arg("feature"), + ); + let server = repo.spawn_server(); + let config = repo.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url)); + let next_schema = repo.write_file( + "next.pg", + &fs::read_to_string(fixture("test.pg")).unwrap().replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ), + ); + + let output = output_failure( + cli() + .arg("schema") + .arg("apply") + .arg("--config") + .arg(&config) + .arg("--schema") + .arg(&next_schema), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("schema apply requires a repo with only main")); +} + #[test] #[ignore = "requires loopback socket permissions in sandboxed runners"] fn remote_read_preserves_projection_order_in_json_and_csv() { diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index be17c79..d649a89 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -29,3 +29,4 @@ futures = { workspace = true } tempfile = { workspace = true } tower = { workspace = true } serial_test = "3" +lance-index = { workspace = true } diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 9411c60..b9e8bea 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -1,6 +1,9 @@ -use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, RunRecord, Snapshot}; +use omnigraph::db::{ + GraphCommit, MergeOutcome, ReadTarget, RunRecord, SchemaApplyResult, Snapshot, +}; use omnigraph::error::{MergeConflict, MergeConflictKind}; use omnigraph::loader::{IngestResult, LoadMode}; +use omnigraph_compiler::SchemaMigrationStep; use omnigraph_compiler::result::QueryResult; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -243,6 +246,21 @@ pub struct ChangeRequest { pub branch: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SchemaApplyRequest { + pub schema_source: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SchemaApplyOutput { + pub uri: String, + pub supported: bool, + pub applied: bool, + pub step_count: usize, + pub manifest_version: u64, + pub steps: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IngestRequest { pub branch: Option, @@ -318,6 +336,17 @@ pub fn snapshot_payload(branch: &str, snapshot: &Snapshot) -> SnapshotOutput { } } +pub fn schema_apply_output(uri: &str, result: SchemaApplyResult) -> SchemaApplyOutput { + SchemaApplyOutput { + uri: uri.to_string(), + supported: result.supported, + applied: result.applied, + step_count: result.steps.len(), + manifest_version: result.manifest_version, + steps: result.steps, + } +} + pub fn run_output(run: &RunRecord) -> RunOutput { RunOutput { run_id: run.run_id.as_str().to_string(), diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 4bcfe4f..0ca4d40 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -13,8 +13,8 @@ use api::{ BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput, CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, HealthOutput, IngestOutput, - IngestRequest, ReadOutput, ReadRequest, RunListOutput, SnapshotQuery, ingest_output, - snapshot_payload, + IngestRequest, ReadOutput, ReadRequest, RunListOutput, SchemaApplyOutput, SchemaApplyRequest, + SnapshotQuery, ingest_output, schema_apply_output, snapshot_payload, }; use axum::body::{Body, Bytes}; use axum::extract::DefaultBodyLimit; @@ -362,6 +362,7 @@ pub fn build_app(state: AppState) -> Router { .route("/export", post(server_export)) .route("/read", post(server_read)) .route("/change", post(server_change)) + .route("/schema/apply", post(server_schema_apply)) .route( "/ingest", post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)), @@ -658,6 +659,31 @@ async fn server_change( })) } +async fn server_schema_apply( + State(state): State, + actor: Option>, + Json(request): Json, +) -> std::result::Result, ApiError> { + let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); + authorize_request( + &state, + actor.as_ref().map(|Extension(actor)| actor), + PolicyRequest { + actor_id: actor_id.map(str::to_string).unwrap_or_default(), + action: PolicyAction::SchemaApply, + branch: None, + target_branch: Some("main".to_string()), + }, + )?; + let result = { + let mut db = Arc::clone(&state.db).write_owned().await; + db.apply_schema(&request.schema_source) + .await + .map_err(ApiError::from_omni)? + }; + Ok(Json(schema_apply_output(state.uri(), result))) +} + async fn server_ingest( State(state): State, actor: Option>, diff --git a/crates/omnigraph-server/src/policy.rs b/crates/omnigraph-server/src/policy.rs index 21b6ea6..e6b43bc 100644 --- a/crates/omnigraph-server/src/policy.rs +++ b/crates/omnigraph-server/src/policy.rs @@ -19,6 +19,7 @@ pub enum PolicyAction { Read, Export, Change, + SchemaApply, BranchCreate, BranchDelete, BranchMerge, @@ -33,6 +34,7 @@ impl PolicyAction { Self::Read => "read", Self::Export => "export", Self::Change => "change", + Self::SchemaApply => "schema_apply", Self::BranchCreate => "branch_create", Self::BranchDelete => "branch_delete", Self::BranchMerge => "branch_merge", @@ -50,6 +52,7 @@ impl PolicyAction { matches!( self, Self::BranchCreate + | Self::SchemaApply | Self::BranchDelete | Self::BranchMerge | Self::RunPublish @@ -72,6 +75,7 @@ impl FromStr for PolicyAction { "read" => Ok(Self::Read), "export" => Ok(Self::Export), "change" => Ok(Self::Change), + "schema_apply" => Ok(Self::SchemaApply), "branch_create" => Ok(Self::BranchCreate), "branch_delete" => Ok(Self::BranchDelete), "branch_merge" => Ok(Self::BranchMerge), @@ -591,6 +595,7 @@ namespace Omnigraph { action "read" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; action "export" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; action "change" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; + action "schema_apply" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; action "branch_create" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; action "branch_delete" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; action "branch_merge" appliesTo { principal: Actor, resource: Repo, context: RequestContext }; @@ -809,4 +814,44 @@ rules: engine.run_tests(&tests).unwrap(); } + + #[test] + fn schema_apply_uses_target_branch_scope() { + let policy: PolicyConfig = serde_yaml::from_str( + r#" +version: 1 +groups: + admins: [act-ragnor] +protected_branches: [main] +rules: + - id: admins-schema-apply + allow: + actors: { group: admins } + actions: [schema_apply] + target_branch_scope: protected +"#, + ) + .unwrap(); + + let engine = PolicyCompiler::compile(&policy, "repo").unwrap(); + let allow = engine + .authorize(&PolicyRequest { + actor_id: "act-ragnor".to_string(), + action: PolicyAction::SchemaApply, + branch: None, + target_branch: Some("main".to_string()), + }) + .unwrap(); + assert!(allow.allowed); + + let deny = engine + .authorize(&PolicyRequest { + actor_id: "act-ragnor".to_string(), + action: PolicyAction::SchemaApply, + branch: None, + target_branch: Some("feature".to_string()), + }) + .unwrap(); + assert!(!deny.allowed); + } } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 69fa6c8..feebdc6 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -5,11 +5,12 @@ use std::path::{Path, PathBuf}; use axum::Router; use axum::body::{Body, to_bytes}; use axum::http::{Method, Request, StatusCode}; +use lance_index::traits::DatasetIndexExt; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_server::api::{ BranchCreateRequest, BranchMergeRequest, ChangeRequest, ErrorOutput, ExportRequest, - IngestRequest, ReadRequest, + IngestRequest, ReadRequest, SchemaApplyRequest, }; use omnigraph_server::{AppState, build_app}; use serde_json::{Value, json}; @@ -86,6 +87,19 @@ rules: target_branch_scope: unprotected "#; +const SCHEMA_APPLY_POLICY_YAML: &str = r#" +version: 1 +groups: + admins: [act-ragnor] +protected_branches: [main] +rules: + - id: admins-schema-apply + allow: + actors: { group: admins } + actions: [schema_apply] + target_branch_scope: protected +"#; + fn fixture(name: &str) -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("../omnigraph/tests/fixtures") @@ -114,6 +128,16 @@ async fn init_repo_with_schema_and_data(schema: &str, data: &str) -> tempfile::T temp } +async fn init_repo_with_schema(schema: &str) -> tempfile::TempDir { + let temp = tempfile::tempdir().unwrap(); + let repo = repo_path(temp.path()); + fs::create_dir_all(&repo).unwrap(); + Omnigraph::init(repo.to_str().unwrap(), schema) + .await + .unwrap(); + temp +} + fn repo_path(root: &Path) -> PathBuf { root.join("server.omni") } @@ -206,6 +230,64 @@ async fn app_for_loaded_repo_with_auth_tokens_and_policy( (temp, build_app(state)) } +async fn app_for_repo_with_auth_tokens_and_policy( + schema: &str, + tokens: &[(&str, &str)], + policy: &str, +) -> (tempfile::TempDir, Router) { + let temp = init_repo_with_schema(schema).await; + let repo = repo_path(temp.path()); + let policy_path = temp.path().join("policy.yaml"); + fs::write(&policy_path, policy).unwrap(); + let state = AppState::open_with_bearer_tokens_and_policy( + repo.to_string_lossy().to_string(), + tokens + .iter() + .map(|(actor, token)| ((*actor).to_string(), (*token).to_string())) + .collect(), + Some(&policy_path), + ) + .await + .unwrap(); + (temp, build_app(state)) +} + +fn additive_schema_with_nickname() -> String { + fs::read_to_string(fixture("test.pg")).unwrap().replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ) +} + +fn renamed_person_schema() -> String { + fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n") + .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human") + .replace( + "edge WorksAt: Person -> Company", + "edge WorksAt: Human -> Company", + ) +} + +fn renamed_age_schema() -> String { + fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("age: I32?", "years: I32? @rename_from(\"age\")") +} + +fn indexed_name_schema() -> String { + fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("name: String @key", "name: String @key @index") +} + +fn unsupported_schema_change() -> String { + fs::read_to_string(fixture("test.pg")) + .unwrap() + .replace("age: I32?", "age: I64?") +} + async fn json_response(app: &Router, request: Request) -> (StatusCode, Value) { let response = app.clone().oneshot(request).await.unwrap(); let status = response.status(); @@ -214,6 +296,279 @@ async fn json_response(app: &Router, request: Request) -> (StatusCode, Val (status, value) } +#[tokio::test] +async fn schema_apply_route_updates_repo_for_authorized_admin() { + let (temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + SCHEMA_APPLY_POLICY_YAML, + ) + .await; + let schema = additive_schema_with_nickname(); + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: schema, + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::OK); + assert_eq!(payload["applied"], true); + let repo = repo_path(temp.path()); + let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap(); + assert!( + reopened.catalog().node_types["Person"] + .properties + .contains_key("nickname") + ); +} + +#[tokio::test] +async fn schema_apply_route_requires_schema_apply_policy_permission() { + let (_temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + POLICY_YAML, + ) + .await; + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: additive_schema_with_nickname(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::FORBIDDEN); + assert_eq!( + payload["code"], + serde_json::to_value(omnigraph_server::api::ErrorCode::Forbidden).unwrap() + ); +} + +#[tokio::test] +async fn schema_apply_route_requires_bearer_token_when_policy_enabled() { + let (_temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + SCHEMA_APPLY_POLICY_YAML, + ) + .await; + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: additive_schema_with_nickname(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::UNAUTHORIZED); + assert_eq!( + payload["code"], + serde_json::to_value(omnigraph_server::api::ErrorCode::Unauthorized).unwrap() + ); +} + +#[tokio::test] +async fn schema_apply_route_can_rename_type() { + let (temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + SCHEMA_APPLY_POLICY_YAML, + ) + .await; + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: renamed_person_schema(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::OK); + assert_eq!(payload["applied"], true); + let repo = repo_path(temp.path()); + let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap(); + let snapshot = reopened + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap(); + assert!(snapshot.entry("node:Human").is_some()); + assert!(snapshot.entry("node:Person").is_none()); +} + +#[tokio::test] +async fn schema_apply_route_can_rename_property() { + let (temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + SCHEMA_APPLY_POLICY_YAML, + ) + .await; + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: renamed_age_schema(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::OK); + assert_eq!(payload["applied"], true); + let repo = repo_path(temp.path()); + let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap(); + let person = &reopened.catalog().node_types["Person"]; + assert!(person.properties.contains_key("years")); + assert!(!person.properties.contains_key("age")); +} + +#[tokio::test] +async fn schema_apply_route_can_add_index() { + let (temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + SCHEMA_APPLY_POLICY_YAML, + ) + .await; + let repo = repo_path(temp.path()); + let before_index_count = { + let db = Omnigraph::open(repo.to_str().unwrap()).await.unwrap(); + let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let dataset = snapshot.open("node:Person").await.unwrap(); + dataset.load_indices().await.unwrap().len() + }; + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: indexed_name_schema(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::OK); + assert_eq!(payload["applied"], true); + let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap(); + let snapshot = reopened + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap(); + let dataset = snapshot.open("node:Person").await.unwrap(); + let after_index_count = dataset.load_indices().await.unwrap().len(); + assert!(after_index_count > before_index_count); +} + +#[tokio::test] +async fn schema_apply_route_rejects_unsupported_plan() { + let (_temp, app) = app_for_repo_with_auth_tokens_and_policy( + &fs::read_to_string(fixture("test.pg")).unwrap(), + &[("act-ragnor", "admin-token")], + SCHEMA_APPLY_POLICY_YAML, + ) + .await; + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: unsupported_schema_change(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::BAD_REQUEST); + assert_eq!( + payload["code"], + serde_json::to_value(omnigraph_server::api::ErrorCode::BadRequest).unwrap() + ); +} + +#[tokio::test] +async fn schema_apply_route_rejects_when_non_main_branch_exists() { + let temp = init_repo_with_schema(&fs::read_to_string(fixture("test.pg")).unwrap()).await; + let repo = repo_path(temp.path()); + let mut db = Omnigraph::open(repo.to_str().unwrap()).await.unwrap(); + db.branch_create("feature").await.unwrap(); + drop(db); + + let policy_path = temp.path().join("policy.yaml"); + fs::write(&policy_path, SCHEMA_APPLY_POLICY_YAML).unwrap(); + let state = AppState::open_with_bearer_tokens_and_policy( + repo.to_string_lossy().to_string(), + vec![("act-ragnor".to_string(), "admin-token".to_string())], + Some(&policy_path), + ) + .await + .unwrap(); + let app = build_app(state); + + let request = Request::builder() + .method(Method::POST) + .uri("/schema/apply") + .header("content-type", "application/json") + .header("authorization", "Bearer admin-token") + .body(Body::from( + serde_json::to_vec(&SchemaApplyRequest { + schema_source: additive_schema_with_nickname(), + }) + .unwrap(), + )) + .unwrap(); + let (status, payload) = json_response(&app, request).await; + + assert_eq!(status, StatusCode::CONFLICT); + assert_eq!( + payload["code"], + serde_json::to_value(omnigraph_server::api::ErrorCode::Conflict).unwrap() + ); +} + struct EnvGuard { saved: Vec<(&'static str, Option)>, } diff --git a/crates/omnigraph/src/db/graph_coordinator.rs b/crates/omnigraph/src/db/graph_coordinator.rs index 4de6d5d..c342db1 100644 --- a/crates/omnigraph/src/db/graph_coordinator.rs +++ b/crates/omnigraph/src/db/graph_coordinator.rs @@ -8,7 +8,7 @@ use crate::failpoints; use crate::storage::{StorageAdapter, join_uri, normalize_root_uri}; use super::commit_graph::{CommitGraph, GraphCommit}; -use super::manifest::{ManifestCoordinator, Snapshot, SubTableUpdate}; +use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate}; use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri, is_internal_run_branch}; const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance"; @@ -208,6 +208,10 @@ impl GraphCoordinator { }) } + pub(crate) async fn all_branches(&self) -> Result> { + self.manifest.list_branches().await + } + pub async fn branch_descendants(&self, name: &str) -> Result> { self.manifest .descendant_branches(name) @@ -414,6 +418,28 @@ impl GraphCoordinator { Ok(manifest_version) } + pub(crate) async fn commit_manifest_changes( + &mut self, + changes: &[ManifestChange], + ) -> Result { + let manifest_version = self.manifest.commit_changes(changes).await?; + failpoints::maybe_fail("graph_publish.after_manifest_commit")?; + Ok(manifest_version) + } + + pub(crate) async fn commit_changes_with_actor( + &mut self, + changes: &[ManifestChange], + actor_id: Option<&str>, + ) -> Result { + let manifest_version = self.commit_manifest_changes(changes).await?; + let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?; + Ok(PublishedSnapshot { + manifest_version, + _snapshot_id: snapshot_id, + }) + } + pub(crate) async fn record_graph_commit( &mut self, manifest_version: u64, diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 7d7dd45..4a67130 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -19,7 +19,7 @@ mod repo; #[path = "manifest/state.rs"] mod state; -use layout::{manifest_uri, open_manifest_dataset}; +use layout::{manifest_uri, open_manifest_dataset, type_name_hash}; pub(crate) use metadata::TableVersionMetadata; #[cfg(test)] use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state}; @@ -36,6 +36,7 @@ use state::{ManifestState, read_manifest_state}; const OBJECT_TYPE_TABLE: &str = "table"; const OBJECT_TYPE_TABLE_VERSION: &str = "table_version"; +const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone"; const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management"; /// Immutable point-in-time view of the database. @@ -85,6 +86,25 @@ impl SubTableUpdate { } } +#[derive(Debug, Clone)] +pub(crate) struct TableRegistration { + pub(crate) table_key: String, + pub(crate) table_path: String, +} + +#[derive(Debug, Clone)] +pub(crate) struct TableTombstone { + pub(crate) table_key: String, + pub(crate) tombstone_version: u64, +} + +#[derive(Debug, Clone)] +pub(crate) enum ManifestChange { + Update(SubTableUpdate), + RegisterTable(TableRegistration), + Tombstone(TableTombstone), +} + impl SubTableEntry { pub(crate) async fn open(&self, root_uri: &str) -> Result { open_table_at_version_from_manifest( @@ -97,6 +117,19 @@ impl SubTableEntry { } } +pub(crate) fn table_path_for_table_key(table_key: &str) -> Result { + if let Some(type_name) = table_key.strip_prefix("node:") { + return Ok(format!("nodes/{}", type_name_hash(type_name))); + } + if let Some(type_name) = table_key.strip_prefix("edge:") { + return Ok(format!("edges/{}", type_name_hash(type_name))); + } + Err(OmniError::manifest(format!( + "invalid table key '{}'", + table_key + ))) +} + /// An update to apply to the manifest via `commit`. #[derive(Debug, Clone)] pub struct SubTableUpdate { @@ -245,11 +278,20 @@ impl ManifestCoordinator { /// Atomically inserts one immutable `table_version` row per updated table. /// The merge-insert commit on `__manifest` is the graph-level publish point. pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result { - if updates.is_empty() { + let changes = updates + .iter() + .cloned() + .map(ManifestChange::Update) + .collect::>(); + self.commit_changes(&changes).await + } + + pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result { + if changes.is_empty() { return Ok(self.version()); } - self.dataset = self.publisher.publish(updates).await?; + self.dataset = self.publisher.publish(changes).await?; self.known_state = read_manifest_state(&self.dataset).await?; Ok(self.version()) diff --git a/crates/omnigraph/src/db/manifest/layout.rs b/crates/omnigraph/src/db/manifest/layout.rs index 9a4fca3..9cfde9a 100644 --- a/crates/omnigraph/src/db/manifest/layout.rs +++ b/crates/omnigraph/src/db/manifest/layout.rs @@ -40,6 +40,10 @@ pub(super) fn version_object_id(table_key: &str, version: u64) -> String { format!("{}${}", table_key, format_table_version(version)) } +pub(super) fn tombstone_object_id(table_key: &str, version: u64) -> String { + format!("{}$tombstone${}", table_key, format_table_version(version)) +} + pub(super) fn table_id_to_key(request_id: Option<&Vec>) -> lance_namespace::Result { match request_id { Some(request_id) if request_id.len() == 1 && !request_id[0].is_empty() => { diff --git a/crates/omnigraph/src/db/manifest/publisher.rs b/crates/omnigraph/src/db/manifest/publisher.rs index efdbd1d..f200ed4 100644 --- a/crates/omnigraph/src/db/manifest/publisher.rs +++ b/crates/omnigraph/src/db/manifest/publisher.rs @@ -15,11 +15,11 @@ //! This module should disappear once Lance Rust can do branch-aware batch table //! version publication against a managed namespace manifest. -use async_trait::async_trait; use std::collections::HashMap; use std::sync::Arc; use arrow_array::RecordBatchIterator; +use async_trait::async_trait; use lance::Dataset; use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched}; use lance_namespace::NamespaceError; @@ -27,16 +27,20 @@ use lance_namespace::models::CreateTableVersionRequest; use crate::error::{OmniError, Result}; -use super::layout::{open_manifest_dataset, version_object_id}; +use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id}; use super::metadata::parse_namespace_version_request; use super::state::{ - manifest_rows_batch, manifest_schema, read_manifest_entries, read_manifest_state, + manifest_rows_batch, manifest_schema, read_manifest_entries, read_registered_table_locations, + read_tombstone_versions, +}; +use super::{ + ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION, + SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone, }; -use super::{OBJECT_TYPE_TABLE_VERSION, SubTableEntry, SubTableUpdate}; #[async_trait] pub(super) trait ManifestBatchPublisher: Send + Sync { - async fn publish(&self, updates: &[SubTableUpdate]) -> Result; + async fn publish(&self, changes: &[ManifestChange]) -> Result; } pub(super) struct GraphNamespacePublisher { @@ -47,6 +51,8 @@ pub(super) struct GraphNamespacePublisher { #[derive(Debug)] struct PendingVersionRow { object_id: String, + object_type: String, + location: Option, metadata: Option, table_key: String, table_version: Option, @@ -72,17 +78,13 @@ impl GraphNamespacePublisher { &self, ) -> Result<( Dataset, - HashMap, + HashMap, HashMap<(String, u64), SubTableEntry>, + HashMap<(String, u64), ()>, )> { let dataset = self.dataset().await?; - let current = read_manifest_state(&dataset).await?; + let registered_tables = read_registered_table_locations(&dataset).await?; let existing_entries = read_manifest_entries(&dataset).await?; - let known_tables = current - .entries - .iter() - .map(|entry| (entry.table_key.clone(), ())) - .collect(); let existing_versions = existing_entries .iter() .map(|entry| { @@ -92,67 +94,153 @@ impl GraphNamespacePublisher { ) }) .collect(); - Ok((dataset, known_tables, existing_versions)) + let existing_tombstones = read_tombstone_versions(&dataset).await?; + Ok(( + dataset, + registered_tables, + existing_versions, + existing_tombstones, + )) } fn build_pending_rows( - requests: &[CreateTableVersionRequest], - known_tables: &HashMap, + changes: &[ManifestChange], + known_tables: &HashMap, existing_versions: &HashMap<(String, u64), SubTableEntry>, + existing_tombstones: &HashMap<(String, u64), ()>, ) -> Result> { let mut request_versions = HashMap::<(String, u64), ()>::new(); - let mut rows = Vec::with_capacity(requests.len()); + let mut known_tables = known_tables.clone(); + let mut rows = Vec::with_capacity(changes.len()); - for request in requests { - let (table_key, table_version, row_count, table_branch, version_metadata) = - parse_namespace_version_request(request) - .map_err(|e| OmniError::Lance(e.to_string()))?; - if !known_tables.contains_key(table_key.as_str()) { - return Err(OmniError::Lance( - NamespaceError::TableNotFound { - message: format!("table {} not found", table_key), - } - .to_string(), - )); - } - if request_versions - .insert((table_key.clone(), table_version), ()) - .is_some() + for change in changes { + if let ManifestChange::RegisterTable(TableRegistration { + table_key, + table_path, + }) = change { - return Err(OmniError::Lance( - NamespaceError::ConcurrentModification { - message: format!( - "table version {} already exists for {}", - table_version, table_key - ), + if let Some(existing_path) = known_tables.get(table_key) { + if existing_path != table_path { + return Err(OmniError::Lance( + NamespaceError::ConcurrentModification { + message: format!( + "table {} already exists with different path {}", + table_key, existing_path + ), + } + .to_string(), + )); } - .to_string(), - )); + } else { + known_tables.insert(table_key.clone(), table_path.clone()); + } + rows.push(PendingVersionRow { + object_id: table_key.clone(), + object_type: OBJECT_TYPE_TABLE.to_string(), + location: Some(table_path.clone()), + metadata: None, + table_key: table_key.clone(), + table_version: None, + table_branch: None, + row_count: None, + }); } - if let Some(existing) = existing_versions.get(&(table_key.clone(), table_version)) { - let is_owner_branch_handoff = - existing.row_count == row_count && existing.table_branch != table_branch; - if !is_owner_branch_handoff { - return Err(OmniError::Lance( - NamespaceError::ConcurrentModification { - message: format!( - "table version {} already exists for {}", - table_version, table_key - ), + } + + for change in changes { + match change { + ManifestChange::RegisterTable(_) => {} + ManifestChange::Update(update) => { + let request = update.to_create_table_version_request(); + let (table_key, table_version, row_count, table_branch, version_metadata) = + parse_namespace_version_request(&request) + .map_err(|e| OmniError::Lance(e.to_string()))?; + if !known_tables.contains_key(table_key.as_str()) { + return Err(OmniError::Lance( + NamespaceError::TableNotFound { + message: format!("table {} not found", table_key), + } + .to_string(), + )); + } + if request_versions + .insert((table_key.clone(), table_version), ()) + .is_some() + { + return Err(OmniError::Lance( + NamespaceError::ConcurrentModification { + message: format!( + "table version {} already exists for {}", + table_version, table_key + ), + } + .to_string(), + )); + } + if let Some(existing) = + existing_versions.get(&(table_key.clone(), table_version)) + { + let is_owner_branch_handoff = existing.row_count == row_count + && existing.table_branch != table_branch; + if !is_owner_branch_handoff { + return Err(OmniError::Lance( + NamespaceError::ConcurrentModification { + message: format!( + "table version {} already exists for {}", + table_version, table_key + ), + } + .to_string(), + )); } - .to_string(), - )); + } + + rows.push(PendingVersionRow { + object_id: version_object_id(&table_key, table_version), + object_type: OBJECT_TYPE_TABLE_VERSION.to_string(), + location: None, + metadata: Some(version_metadata.to_json_string()?), + table_key, + table_version: Some(table_version), + table_branch, + row_count: Some(row_count), + }); + } + ManifestChange::Tombstone(TableTombstone { + table_key, + tombstone_version, + }) => { + if !known_tables.contains_key(table_key.as_str()) { + return Err(OmniError::Lance( + NamespaceError::TableNotFound { + message: format!("table {} not found", table_key), + } + .to_string(), + )); + } + if existing_tombstones.contains_key(&(table_key.clone(), *tombstone_version)) { + return Err(OmniError::Lance( + NamespaceError::ConcurrentModification { + message: format!( + "table tombstone {} already exists for {}", + tombstone_version, table_key + ), + } + .to_string(), + )); + } + rows.push(PendingVersionRow { + object_id: tombstone_object_id(table_key, *tombstone_version), + object_type: OBJECT_TYPE_TABLE_TOMBSTONE.to_string(), + location: None, + metadata: None, + table_key: table_key.clone(), + table_version: Some(*tombstone_version), + table_branch: None, + row_count: None, + }); } } - - rows.push(PendingVersionRow { - object_id: version_object_id(&table_key, table_version), - metadata: Some(version_metadata.to_json_string()?), - table_key, - table_version: Some(table_version), - table_branch, - row_count: Some(row_count), - }); } Ok(rows) @@ -170,8 +258,8 @@ impl GraphNamespacePublisher { for row in rows { object_ids.push(row.object_id); - object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string()); - locations.push(None); + object_types.push(row.object_type); + locations.push(row.location); metadata.push(row.metadata); table_keys.push(row.table_key); table_versions.push(row.table_version); @@ -214,23 +302,41 @@ impl GraphNamespacePublisher { &self, requests: &[CreateTableVersionRequest], ) -> Result { - if requests.is_empty() { - return self.dataset().await; - } - - let (dataset, known_tables, existing_versions) = self.load_publish_state().await?; - let rows = Self::build_pending_rows(requests, &known_tables, &existing_versions)?; - self.merge_rows(dataset, rows).await + let changes = requests + .iter() + .cloned() + .map(|request| { + let (table_key, table_version, row_count, table_branch, version_metadata) = + parse_namespace_version_request(&request) + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(ManifestChange::Update(SubTableUpdate { + table_key, + table_version, + table_branch, + row_count, + version_metadata, + })) + }) + .collect::>>()?; + self.publish(&changes).await } } #[async_trait] impl ManifestBatchPublisher for GraphNamespacePublisher { - async fn publish(&self, updates: &[SubTableUpdate]) -> Result { - let requests: Vec = updates - .iter() - .map(SubTableUpdate::to_create_table_version_request) - .collect(); - self.publish_requests(&requests).await + async fn publish(&self, changes: &[ManifestChange]) -> Result { + if changes.is_empty() { + return self.dataset().await; + } + + let (dataset, known_tables, existing_versions, existing_tombstones) = + self.load_publish_state().await?; + let rows = Self::build_pending_rows( + changes, + &known_tables, + &existing_versions, + &existing_tombstones, + )?; + self.merge_rows(dataset, rows).await } } diff --git a/crates/omnigraph/src/db/manifest/state.rs b/crates/omnigraph/src/db/manifest/state.rs index 418615b..eb36518 100644 --- a/crates/omnigraph/src/db/manifest/state.rs +++ b/crates/omnigraph/src/db/manifest/state.rs @@ -10,7 +10,7 @@ use crate::error::{OmniError, Result}; use super::layout::version_object_id; use super::metadata::TableVersionMetadata; -use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_VERSION}; +use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION}; #[derive(Debug, Clone)] pub struct SubTableEntry { @@ -28,6 +28,19 @@ pub(super) struct ManifestState { pub(super) entries: Vec, } +#[derive(Debug, Clone)] +struct TableTombstoneEntry { + table_key: String, + tombstone_version: u64, +} + +#[derive(Debug, Clone)] +struct ManifestScan { + table_locations: HashMap, + version_entries: Vec, + tombstones: Vec, +} + pub(super) fn manifest_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("object_id", DataType::Utf8, false), @@ -48,10 +61,10 @@ pub(super) fn manifest_schema() -> SchemaRef { pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result { let version = dataset.version().version; - let entries = read_manifest_entries(dataset).await?; + let scan = read_manifest_scan(dataset).await?; let mut latest_versions = HashMap::::new(); - for entry in entries { + for entry in scan.version_entries { match latest_versions.get(&entry.table_key) { Some(existing) if existing.table_version >= entry.table_version => {} _ => { @@ -60,13 +73,52 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result = latest_versions.into_values().collect(); + let mut tombstones = HashMap::::new(); + for tombstone in scan.tombstones { + match tombstones.get(&tombstone.table_key) { + Some(existing) if *existing >= tombstone.tombstone_version => {} + _ => { + tombstones.insert(tombstone.table_key, tombstone.tombstone_version); + } + } + } + + let mut entries: Vec = latest_versions + .into_values() + .filter(|entry| { + tombstones + .get(&entry.table_key) + .map(|tombstone_version| *tombstone_version < entry.table_version) + .unwrap_or(true) + }) + .collect(); entries.sort_by(|a, b| a.table_key.cmp(&b.table_key)); Ok(ManifestState { version, entries }) } pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result> { + Ok(read_manifest_scan(dataset).await?.version_entries) +} + +pub(super) async fn read_registered_table_locations( + dataset: &Dataset, +) -> Result> { + Ok(read_manifest_scan(dataset).await?.table_locations) +} + +pub(super) async fn read_tombstone_versions( + dataset: &Dataset, +) -> Result> { + Ok(read_manifest_scan(dataset) + .await? + .tombstones + .into_iter() + .map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ())) + .collect()) +} + +async fn read_manifest_scan(dataset: &Dataset) -> Result { let batches: Vec = dataset .scan() .try_into_stream() @@ -78,6 +130,7 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result Result { + let tombstone_version = required_u64(versions, row, "table_version")?; + tombstones.push(TableTombstoneEntry { + table_key, + tombstone_version, + }); + } _ => {} } } @@ -149,7 +209,11 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result Result { - let requests: Vec = updates + async fn publish(&self, changes: &[ManifestChange]) -> Result { + let requests: Vec = changes .iter() - .map(SubTableUpdate::to_create_table_version_request) + .filter_map(|change| match change { + ManifestChange::Update(update) => Some(update.to_create_table_version_request()), + ManifestChange::RegisterTable(_) | ManifestChange::Tombstone(_) => None, + }) .collect(); self.requests.lock().await.extend_from_slice(&requests); - self.inner.publish_requests(&requests).await + self.inner.publish(changes).await } } @@ -903,7 +963,7 @@ struct FailingPublisher; #[async_trait] impl ManifestBatchPublisher for FailingPublisher { - async fn publish(&self, _updates: &[SubTableUpdate]) -> Result { + async fn publish(&self, _changes: &[ManifestChange]) -> Result { Err(OmniError::manifest( "injected batch publisher failure".to_string(), )) diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 7e5245f..263cc4b 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -8,6 +8,6 @@ mod schema_state; pub use commit_graph::GraphCommit; pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId}; pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; -pub use omnigraph::{MergeOutcome, Omnigraph}; +pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult}; pub(crate) use run_registry::is_internal_run_branch; pub use run_registry::{RunId, RunRecord, RunStatus}; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index c26118e..138140c 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use arrow_array::{ Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, - RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, + RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array, }; use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; @@ -17,7 +17,8 @@ use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType}; use omnigraph_compiler::schema::parser::parse_schema; use omnigraph_compiler::types::ScalarType; use omnigraph_compiler::{ - SchemaIR, SchemaMigrationPlan, build_catalog_from_ir, build_schema_ir, plan_schema_migration, + SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir, + build_schema_ir, plan_schema_migration, }; use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot}; @@ -28,7 +29,9 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_u use crate::table_store::TableStore; use super::commit_graph::GraphCommit; -use super::manifest::Snapshot; +use super::manifest::{ + ManifestChange, Snapshot, TableRegistration, TableTombstone, table_path_for_table_key, +}; use super::schema_state::{ SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir, validate_schema_contract, write_schema_contract, @@ -42,6 +45,14 @@ pub enum MergeOutcome { Merged, } +#[derive(Debug, Clone)] +pub struct SchemaApplyResult { + pub supported: bool, + pub applied: bool, + pub manifest_version: u64, + pub steps: Vec, +} + /// Top-level handle to an Omnigraph database. /// /// An Omnigraph is a Lance-native graph database with git-style branching. @@ -160,6 +171,324 @@ impl Omnigraph { .map_err(|err| OmniError::manifest(err.to_string())) } + pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result { + self.ensure_schema_state_valid().await?; + let branches = self.coordinator.all_branches().await?; + let public_non_main = branches + .iter() + .filter(|branch| branch.as_str() != "main") + .cloned() + .collect::>(); + if !public_non_main.is_empty() { + return Err(OmniError::manifest_conflict(format!( + "schema apply requires a repo with only main; found non-main branches: {}", + public_non_main.join(", ") + ))); + } + + let accepted_ir = read_accepted_schema_ir(self.uri(), Arc::clone(&self.storage)).await?; + let desired_ir = read_schema_ir_from_source(desired_schema_source)?; + let plan = plan_schema_migration(&accepted_ir, &desired_ir) + .map_err(|err| OmniError::manifest(err.to_string()))?; + if !plan.supported { + let reason = plan + .steps + .iter() + .find_map(|step| match step { + SchemaMigrationStep::UnsupportedChange { reason, .. } => Some(reason.as_str()), + _ => None, + }) + .unwrap_or("unsupported schema migration plan"); + return Err(OmniError::manifest(reason.to_string())); + } + if plan.steps.is_empty() { + return Ok(SchemaApplyResult { + supported: true, + applied: false, + manifest_version: self.version(), + steps: plan.steps, + }); + } + + let mut desired_catalog = build_catalog_from_ir(&desired_ir)?; + fixup_blob_schemas(&mut desired_catalog); + + let snapshot = self.snapshot(); + let mut added_tables = BTreeSet::new(); + let mut renamed_tables = HashMap::new(); + let mut rewritten_tables = BTreeSet::new(); + let mut indexed_tables = BTreeSet::new(); + let mut property_renames = HashMap::>::new(); + let mut changed_edge_tables = false; + + for step in &plan.steps { + match step { + SchemaMigrationStep::AddType { type_kind, name } => { + let table_key = schema_table_key(*type_kind, name); + if table_key.starts_with("edge:") { + changed_edge_tables = true; + } + added_tables.insert(table_key); + } + SchemaMigrationStep::RenameType { + type_kind, + from, + to, + } => { + let source_key = schema_table_key(*type_kind, from); + let target_key = schema_table_key(*type_kind, to); + if source_key.starts_with("edge:") { + changed_edge_tables = true; + } + renamed_tables.insert(target_key, source_key); + } + SchemaMigrationStep::AddProperty { + type_kind, + type_name, + .. + } => { + let table_key = schema_table_key(*type_kind, type_name); + if table_key.starts_with("edge:") { + changed_edge_tables = true; + } + rewritten_tables.insert(table_key); + } + SchemaMigrationStep::RenameProperty { + type_kind, + type_name, + from, + to, + } => { + let table_key = schema_table_key(*type_kind, type_name); + if table_key.starts_with("edge:") { + changed_edge_tables = true; + } + rewritten_tables.insert(table_key.clone()); + property_renames + .entry(table_key) + .or_default() + .insert(to.clone(), from.clone()); + } + SchemaMigrationStep::AddConstraint { + type_kind, + type_name, + .. + } => { + indexed_tables.insert(schema_table_key(*type_kind, type_name)); + } + SchemaMigrationStep::UpdateTypeMetadata { .. } + | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {} + SchemaMigrationStep::UnsupportedChange { reason, .. } => { + return Err(OmniError::manifest(reason.clone())); + } + } + } + + let mut table_registrations = HashMap::::new(); + let mut table_updates = HashMap::::new(); + let mut table_tombstones = HashMap::::new(); + + for table_key in &added_tables { + let table_path = table_path_for_table_key(table_key)?; + let dataset_uri = self.table_store.dataset_uri(&table_path); + let schema = schema_for_table_key(&desired_catalog, table_key)?; + let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?; + self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) + .await?; + let state = self.table_store.table_state(&dataset_uri, &ds).await?; + table_registrations.insert(table_key.clone(), table_path); + table_updates.insert( + table_key.clone(), + crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + } + + for (target_table_key, source_table_key) in &renamed_tables { + let source_entry = snapshot.entry(source_table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing source table '{}' for schema rename", + source_table_key + )) + })?; + let source_ds = snapshot.open(source_table_key).await?; + let batch = self + .batch_for_schema_apply_rewrite( + &source_ds, + source_table_key, + &self.catalog, + target_table_key, + &desired_catalog, + property_renames.get(target_table_key), + ) + .await?; + let table_path = table_path_for_table_key(target_table_key)?; + let dataset_uri = self.table_store.dataset_uri(&table_path); + let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?; + self.build_indices_on_dataset_for_catalog( + &desired_catalog, + target_table_key, + &mut target_ds, + ) + .await?; + let state = self + .table_store + .table_state(&dataset_uri, &target_ds) + .await?; + table_registrations.insert(target_table_key.clone(), table_path); + table_updates.insert( + target_table_key.clone(), + crate::db::SubTableUpdate { + table_key: target_table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + table_tombstones.insert( + source_table_key.clone(), + source_entry.table_version.saturating_add(1), + ); + } + + for table_key in &rewritten_tables { + if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) { + continue; + } + let entry = snapshot.entry(table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing source table '{}' for schema apply", + table_key + )) + })?; + let source_ds = snapshot.open(table_key).await?; + let batch = self + .batch_for_schema_apply_rewrite( + &source_ds, + table_key, + &self.catalog, + table_key, + &desired_catalog, + property_renames.get(table_key), + ) + .await?; + let dataset_uri = self.table_store.dataset_uri(&entry.table_path); + let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?; + let mut state = self + .table_store + .table_state(&dataset_uri, &target_ds) + .await?; + if indexed_tables.contains(table_key) { + self.build_indices_on_dataset_for_catalog( + &desired_catalog, + table_key, + &mut target_ds, + ) + .await?; + state = self + .table_store + .table_state(&dataset_uri, &target_ds) + .await?; + } + table_updates.insert( + table_key.clone(), + crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + } + + for table_key in &indexed_tables { + if added_tables.contains(table_key) + || renamed_tables.contains_key(table_key) + || rewritten_tables.contains(table_key) + { + continue; + } + let entry = snapshot.entry(table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing table '{}' for schema index apply", + table_key + )) + })?; + let dataset_uri = self.table_store.dataset_uri(&entry.table_path); + let mut ds = self + .table_store + .open_dataset_head_for_write(table_key, &dataset_uri, None) + .await?; + self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) + .await?; + let state = self.table_store.table_state(&dataset_uri, &ds).await?; + table_updates.insert( + table_key.clone(), + crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + } + + let mut manifest_changes = Vec::new(); + for (table_key, table_path) in table_registrations { + manifest_changes.push(ManifestChange::RegisterTable(TableRegistration { + table_key, + table_path, + })); + } + for update in table_updates.into_values() { + manifest_changes.push(ManifestChange::Update(update)); + } + for (table_key, tombstone_version) in table_tombstones { + manifest_changes.push(ManifestChange::Tombstone(TableTombstone { + table_key, + tombstone_version, + })); + } + + let actor_id = self.current_audit_actor().map(str::to_string); + let PublishedSnapshot { + manifest_version, + _snapshot_id: _, + } = self + .coordinator + .commit_changes_with_actor(&manifest_changes, actor_id.as_deref()) + .await?; + + let schema_path = join_uri(&self.root_uri, SCHEMA_SOURCE_FILENAME); + self.storage + .write_text(&schema_path, desired_schema_source) + .await?; + write_schema_contract(&self.root_uri, self.storage.as_ref(), &desired_ir).await?; + + self.catalog = desired_catalog; + self.schema_source = desired_schema_source.to_string(); + self.coordinator.refresh().await?; + self.runtime_cache.invalidate_all().await; + if changed_edge_tables { + self.invalidate_graph_index().await; + } + + Ok(SchemaApplyResult { + supported: true, + applied: true, + manifest_version, + steps: plan.steps, + }) + } + pub(crate) fn table_store(&self) -> &TableStore { &self.table_store } @@ -1368,6 +1697,16 @@ impl Omnigraph { &self, table_key: &str, ds: &mut Dataset, + ) -> Result<()> { + self.build_indices_on_dataset_for_catalog(&self.catalog, table_key, ds) + .await + } + + pub(crate) async fn build_indices_on_dataset_for_catalog( + &self, + catalog: &Catalog, + table_key: &str, + ds: &mut Dataset, ) -> Result<()> { if let Some(type_name) = table_key.strip_prefix("node:") { if !self.table_store.has_btree_index(ds, "id").await? { @@ -1379,7 +1718,7 @@ impl Omnigraph { })?; } - if let Some(node_type) = self.catalog.node_types.get(type_name) { + if let Some(node_type) = catalog.node_types.get(type_name) { for index_cols in &node_type.indices { if index_cols.len() != 1 { continue; @@ -1600,58 +1939,100 @@ impl Omnigraph { source_ds: &Dataset, table_key: &str, ) -> Result { - let target_schema = schema_for_table_key(self.catalog(), table_key)?; - let blob_properties = blob_properties_for_table_key(self.catalog(), table_key)?; - if blob_properties.is_empty() { - let batches = self.table_store().scan_batches(source_ds).await?; - return concat_or_empty_batches(target_schema, batches); - } + self.batch_for_schema_apply_rewrite( + source_ds, + table_key, + &self.catalog, + table_key, + &self.catalog, + None, + ) + .await + } - let batches = self - .table_store() - .scan_with(source_ds, None, None, None, true, |_| Ok(())) - .await?; - let batch = concat_or_empty_batches(target_schema.clone(), batches)?; - if batch.num_rows() == 0 { - return Ok(batch); + async fn batch_for_schema_apply_rewrite( + &self, + source_ds: &Dataset, + source_table_key: &str, + source_catalog: &Catalog, + target_table_key: &str, + target_catalog: &Catalog, + property_renames: Option<&HashMap>, + ) -> Result { + let target_schema = schema_for_table_key(target_catalog, target_table_key)?; + let source_blob_properties = + blob_properties_for_table_key(source_catalog, source_table_key)?; + let target_blob_properties = + blob_properties_for_table_key(target_catalog, target_table_key)?; + let needs_row_ids = + !source_blob_properties.is_empty() || !target_blob_properties.is_empty(); + let batches = if needs_row_ids { + self.table_store() + .scan_with(source_ds, None, None, None, true, |_| Ok(())) + .await? + } else { + self.table_store().scan_batches(source_ds).await? + }; + if batches.is_empty() { + return Ok(RecordBatch::new_empty(target_schema)); } + let source_schema = batches[0].schema(); + let batch = concat_or_empty_batches(source_schema, batches)?; - let row_ids = batch - .column_by_name("_rowid") - .and_then(|col| col.as_any().downcast_ref::()) - .ok_or_else(|| { - OmniError::Lance(format!( - "expected _rowid column when rewriting '{}'", - table_key - )) - })?; - let row_ids: Vec = row_ids.values().iter().copied().collect(); + let row_ids = if needs_row_ids { + Some( + batch + .column_by_name("_rowid") + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| { + OmniError::Lance(format!( + "expected _rowid column when rewriting '{}'", + source_table_key + )) + })? + .values() + .iter() + .copied() + .collect::>(), + ) + } else { + None + }; let mut columns = Vec::with_capacity(target_schema.fields().len()); for field in target_schema.fields() { - if blob_properties.contains(field.name()) { - let descriptions = batch - .column_by_name(field.name()) - .and_then(|col| col.as_any().downcast_ref::()) - .ok_or_else(|| { - OmniError::Lance(format!( - "expected blob descriptions for '{}.{}'", - table_key, - field.name() - )) - })?; - columns.push( - self.rebuild_blob_column(source_ds, field.name(), descriptions, &row_ids) - .await?, - ); + let source_name = property_renames + .and_then(|renames| renames.get(field.name())) + .map(String::as_str) + .unwrap_or_else(|| field.name().as_str()); + if let Some(column) = batch.column_by_name(source_name) { + if target_blob_properties.contains(field.name()) + && source_blob_properties.contains(source_name) + { + let descriptions = + column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OmniError::Lance(format!( + "expected blob descriptions for '{}.{}'", + source_table_key, source_name + )) + })?; + let rebuilt = self + .rebuild_blob_column( + source_ds, + source_name, + descriptions, + row_ids.as_deref().unwrap_or(&[]), + ) + .await?; + columns.push(rebuilt); + } else { + columns.push(column.clone()); + } } else { - columns.push(batch.column_by_name(field.name()).cloned().ok_or_else(|| { - OmniError::Lance(format!( - "missing column '{}.{}' in rewrite batch", - table_key, - field.name() - )) - })?); + columns.push(new_null_array(field.data_type(), batch.num_rows())); } } @@ -2130,6 +2511,14 @@ fn read_schema_ir_from_source(schema_source: &str) -> Result { build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string())) } +fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String { + match type_kind { + SchemaTypeKind::Node => format!("node:{}", name), + SchemaTypeKind::Edge => format!("edge:{}", name), + SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"), + } +} + fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result> { if let Some(type_name) = table_key.strip_prefix("node:") { let node_type: &NodeType = catalog @@ -2327,8 +2716,10 @@ fn json_value_from_array(array: &dyn Array, row: usize) -> Result Company ); } + async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec { + let snapshot = db.snapshot(); + let ds = snapshot.open(table_key).await.unwrap(); + let batches = db.table_store().scan_batches(&ds).await.unwrap(); + batches + .into_iter() + .flat_map(|batch| { + (0..batch.num_rows()) + .map(|row| record_batch_row_to_json(&batch, row).unwrap()) + .collect::>() + }) + .collect() + } + + async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option) { + let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap(); + let schema: Arc = Arc::new(ds.schema().into()); + let columns: Vec> = schema + .fields() + .iter() + .map(|field| match field.name().as_str() { + "id" => Arc::new(StringArray::from(vec![name])) as Arc, + "name" => Arc::new(StringArray::from(vec![name])) as Arc, + "age" => Arc::new(Int32Array::from(vec![age])) as Arc, + _ => new_null_array(field.data_type(), 1), + }) + .collect(); + let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); + let state = db + .table_store() + .append_batch(&full_path, &mut ds, batch) + .await + .unwrap(); + db.commit_updates(&[crate::db::SubTableUpdate { + table_key: "node:Person".to_string(), + table_version: state.version, + table_branch, + row_count: state.row_count, + version_metadata: state.version_metadata, + }]) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_apply_schema_noop_returns_not_applied() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let result = db.apply_schema(TEST_SCHEMA).await.unwrap(); + assert!(result.supported); + assert!(!result.applied); + assert!(result.steps.is_empty()); + } + + #[tokio::test] + async fn test_apply_schema_adds_nullable_property_and_preserves_rows() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + seed_person_row(&mut db, "Alice", Some(30)).await; + + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + let result = db.apply_schema(&desired).await.unwrap(); + assert!(result.applied); + + let reopened = Omnigraph::open(uri).await.unwrap(); + let rows = table_rows_json(&reopened, "node:Person").await; + assert_eq!(rows.len(), 1); + assert_eq!(rows[0]["name"], "Alice"); + assert_eq!(rows[0]["age"], 30); + assert!(rows[0]["nickname"].is_null()); + assert!( + reopened.catalog().node_types["Person"] + .properties + .contains_key("nickname") + ); + assert!(dir.path().join("_schema.pg").exists()); + } + + #[tokio::test] + async fn test_apply_schema_renames_property_and_preserves_values() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + seed_person_row(&mut db, "Alice", Some(30)).await; + + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " years: I32? @rename_from(\"age\")\n}", + ); + db.apply_schema(&desired).await.unwrap(); + + let reopened = Omnigraph::open(uri).await.unwrap(); + let rows = table_rows_json(&reopened, "node:Person").await; + assert_eq!(rows[0]["name"], "Alice"); + assert_eq!(rows[0]["years"], 30); + assert!(rows[0].get("age").is_none()); + } + + #[tokio::test] + async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + seed_person_row(&mut db, "Alice", Some(30)).await; + let before_version = db.snapshot().version(); + + let desired = TEST_SCHEMA + .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n") + .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human") + .replace( + "edge WorksAt: Person -> Company", + "edge WorksAt: Human -> Company", + ); + db.apply_schema(&desired).await.unwrap(); + + let head = db.snapshot(); + assert!(head.entry("node:Person").is_none()); + assert!(head.entry("node:Human").is_some()); + let historical = ManifestCoordinator::snapshot_at(uri, None, before_version) + .await + .unwrap(); + assert!(historical.entry("node:Person").is_some()); + assert!(historical.entry("node:Human").is_none()); + } + + #[tokio::test] + async fn test_apply_schema_rejects_when_non_main_branch_exists() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + db.branch_create("feature").await.unwrap(); + + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + let err = db.apply_schema(&desired).await.unwrap_err(); + assert!( + err.to_string() + .contains("schema apply requires a repo with only main") + ); + } + + #[tokio::test] + async fn test_apply_schema_adds_index_for_existing_property() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index"); + db.apply_schema(&desired).await.unwrap(); + + let snapshot = db.snapshot(); + let ds = snapshot.open("node:Person").await.unwrap(); + assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); + } + + #[tokio::test] + async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let before_version = db.snapshot().version(); + + let desired = TEST_SCHEMA.replace("age: I32?", "age: I64?"); + let err = db.apply_schema(&desired).await.unwrap_err(); + assert!(err.to_string().contains("changing property type")); + assert_eq!(db.snapshot().version(), before_version); + } + #[tokio::test] async fn test_open_nonexistent_fails() { let result = Omnigraph::open("/tmp/nonexistent_omnigraph_test_xyz").await; diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index e9403f6..981bdc8 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -399,6 +399,20 @@ impl TableStore { self.append_batch(dataset_uri, ds, batch).await } + pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result { + let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()); + let params = WriteParams { + mode: WriteMode::Overwrite, + enable_stable_row_ids: true, + data_storage_version: Some(LanceFileVersion::V2_2), + allow_external_blob_outside_bases: true, + ..Default::default() + }; + Dataset::write(reader, dataset_uri, Some(params)) + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + pub async fn merge_insert_batch( &self, dataset_uri: &str, diff --git a/docs/cli.md b/docs/cli.md index 835ff3d..afeae98 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -48,6 +48,7 @@ and configure the matching `bearer_token_env` in `omnigraph.yaml`. ```bash omnigraph schema plan --schema ./next.pg ./repo.omni --json +omnigraph schema apply --schema ./next.pg ./repo.omni --json omnigraph policy validate --config ./omnigraph.yaml omnigraph policy test --config ./omnigraph.yaml omnigraph policy explain --config ./omnigraph.yaml --actor act-alice --action read --branch main @@ -87,3 +88,6 @@ The config file can also define: - auth env files - query aliases for common read and change commands - `policy.file` for Cedar authorization rules + +When policy is enabled, `schema apply` is authorized through the +`schema_apply` action and is typically limited to admins on protected `main`.