diff --git a/crates/omnigraph/src/db/graph_coordinator.rs b/crates/omnigraph/src/db/graph_coordinator.rs index c342db1..03422f0 100644 --- a/crates/omnigraph/src/db/graph_coordinator.rs +++ b/crates/omnigraph/src/db/graph_coordinator.rs @@ -8,8 +8,9 @@ use crate::failpoints; use crate::storage::{StorageAdapter, join_uri, normalize_root_uri}; use super::commit_graph::{CommitGraph, GraphCommit}; +use super::is_internal_system_branch; use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate}; -use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri, is_internal_run_branch}; +use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri}; const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance"; @@ -203,7 +204,7 @@ impl GraphCoordinator { self.manifest.list_branches().await.map(|branches| { branches .into_iter() - .filter(|branch| !is_internal_run_branch(branch)) + .filter(|branch| !is_internal_system_branch(branch)) .collect() }) } @@ -219,7 +220,7 @@ impl GraphCoordinator { .map(|branches| { branches .into_iter() - .filter(|branch| !is_internal_run_branch(branch)) + .filter(|branch| !is_internal_system_branch(branch)) .collect() }) } diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 263cc4b..a4f90d1 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -11,3 +11,13 @@ pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult}; pub(crate) use run_registry::is_internal_run_branch; pub use run_registry::{RunId, RunRecord, RunStatus}; + +pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__"; + +pub(crate) fn is_schema_apply_lock_branch(name: &str) -> bool { + name.trim_start_matches('/') == SCHEMA_APPLY_LOCK_BRANCH +} + +pub(crate) fn is_internal_system_branch(name: &str) -> bool { + is_internal_run_branch(name) || is_schema_apply_lock_branch(name) +} diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 138140c..f63f9f0 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -22,7 +22,7 @@ use omnigraph_compiler::{ }; use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot}; -use crate::db::run_registry::{RunRecord, RunStatus, is_internal_run_branch}; +use crate::db::run_registry::{RunRecord, RunStatus}; use crate::error::{ManifestErrorKind, OmniError, Result}; use crate::runtime_cache::RuntimeCache; use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri}; @@ -30,13 +30,17 @@ use crate::table_store::TableStore; use super::commit_graph::GraphCommit; use super::manifest::{ - ManifestChange, Snapshot, TableRegistration, TableTombstone, table_path_for_table_key, + ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone, + table_path_for_table_key, }; use super::schema_state::{ SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir, validate_schema_contract, write_schema_contract, }; -use super::{ReadTarget, ResolvedTarget, RunId, SnapshotId}; +use super::{ + ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, + is_internal_system_branch, is_schema_apply_lock_branch, +}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum MergeOutcome { @@ -172,17 +176,31 @@ impl Omnigraph { } pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result { + self.acquire_schema_apply_lock().await?; + let result = self.apply_schema_with_lock(desired_schema_source).await; + let release_result = self.release_schema_apply_lock().await; + match (result, release_result) { + (Ok(result), Ok(())) => Ok(result), + (Ok(_), Err(err)) => Err(err), + (Err(err), Ok(())) => Err(err), + (Err(err), Err(_)) => Err(err), + } + } + + async fn apply_schema_with_lock( + &mut self, + desired_schema_source: &str, + ) -> Result { self.ensure_schema_state_valid().await?; let branches = self.coordinator.all_branches().await?; - let public_non_main = branches - .iter() - .filter(|branch| branch.as_str() != "main") - .cloned() + let blocking_branches = branches + .into_iter() + .filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch)) .collect::>(); - if !public_non_main.is_empty() { + if !blocking_branches.is_empty() { return Err(OmniError::manifest_conflict(format!( "schema apply requires a repo with only main; found non-main branches: {}", - public_non_main.join(", ") + blocking_branches.join(", ") ))); } @@ -214,6 +232,7 @@ impl Omnigraph { fixup_blob_schemas(&mut desired_catalog); let snapshot = self.snapshot(); + let base_manifest_version = snapshot.version(); let mut added_tables = BTreeSet::new(); let mut renamed_tables = HashMap::new(); let mut rewritten_tables = BTreeSet::new(); @@ -316,6 +335,8 @@ impl Omnigraph { source_table_key )) })?; + self.ensure_snapshot_entry_head_matches(source_entry) + .await?; let source_ds = snapshot.open(source_table_key).await?; let batch = self .batch_for_schema_apply_rewrite( @@ -367,6 +388,7 @@ impl Omnigraph { table_key )) })?; + self.ensure_snapshot_entry_head_matches(entry).await?; let source_ds = snapshot.open(table_key).await?; let batch = self .batch_for_schema_apply_rewrite( @@ -380,22 +402,12 @@ impl Omnigraph { .await?; let dataset_uri = self.table_store.dataset_uri(&entry.table_path); let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?; - let mut state = self + self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds) + .await?; + let state = self .table_store .table_state(&dataset_uri, &target_ds) .await?; - if indexed_tables.contains(table_key) { - self.build_indices_on_dataset_for_catalog( - &desired_catalog, - table_key, - &mut target_ds, - ) - .await?; - state = self - .table_store - .table_state(&dataset_uri, &target_ds) - .await?; - } table_updates.insert( table_key.clone(), crate::db::SubTableUpdate { @@ -421,11 +433,14 @@ impl Omnigraph { table_key )) })?; + self.ensure_snapshot_entry_head_matches(entry).await?; let dataset_uri = self.table_store.dataset_uri(&entry.table_path); let mut ds = self .table_store - .open_dataset_head_for_write(table_key, &dataset_uri, None) + .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) .await?; + self.table_store + .ensure_expected_version(&ds, table_key, entry.table_version)?; self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) .await?; let state = self.table_store.table_state(&dataset_uri, &ds).await?; @@ -458,6 +473,15 @@ impl Omnigraph { })); } + self.refresh().await?; + if self.version() != base_manifest_version { + return Err(OmniError::manifest_conflict(format!( + "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress", + base_manifest_version, + self.version() + ))); + } + let actor_id = self.current_audit_actor().map(str::to_string); let PublishedSnapshot { manifest_version, @@ -489,6 +513,84 @@ impl Omnigraph { }) } + pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> { + self.refresh().await?; + self.ensure_schema_apply_not_locked(operation).await + } + + async fn acquire_schema_apply_lock(&mut self) -> Result<()> { + self.ensure_schema_state_valid().await?; + self.refresh().await?; + let branches = self.coordinator.all_branches().await?; + if branches + .iter() + .any(|branch| is_schema_apply_lock_branch(branch)) + { + return Err(OmniError::manifest_conflict( + "schema apply is already in progress".to_string(), + )); + } + + self.coordinator + .branch_create(SCHEMA_APPLY_LOCK_BRANCH) + .await?; + self.refresh().await?; + + let blocking_branches = self + .coordinator + .all_branches() + .await? + .into_iter() + .filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch)) + .collect::>(); + if !blocking_branches.is_empty() { + let _ = self.release_schema_apply_lock().await; + return Err(OmniError::manifest_conflict(format!( + "schema apply requires a repo with only main; found non-main branches: {}", + blocking_branches.join(", ") + ))); + } + + Ok(()) + } + + async fn release_schema_apply_lock(&mut self) -> Result<()> { + self.coordinator + .branch_delete(SCHEMA_APPLY_LOCK_BRANCH) + .await?; + self.refresh().await + } + + async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> { + if self + .coordinator + .all_branches() + .await? + .iter() + .any(|branch| is_schema_apply_lock_branch(branch)) + { + return Err(OmniError::manifest_conflict(format!( + "{} is unavailable while schema apply is in progress", + operation + ))); + } + Ok(()) + } + + async fn ensure_snapshot_entry_head_matches(&self, entry: &SubTableEntry) -> Result<()> { + let dataset_uri = self.table_store.dataset_uri(&entry.table_path); + let ds = self + .table_store + .open_dataset_head_for_write( + &entry.table_key, + &dataset_uri, + entry.table_branch.as_deref(), + ) + .await?; + self.table_store + .ensure_expected_version(&ds, &entry.table_key, entry.table_version) + } + pub(crate) fn table_store(&self) -> &TableStore { &self.table_store } @@ -906,6 +1008,7 @@ impl Omnigraph { pub(crate) async fn ensure_indices_for_branch(&mut self, branch: Option<&str>) -> Result<()> { self.ensure_schema_state_valid().await?; + self.ensure_schema_apply_idle("ensure_indices").await?; let resolved = self.resolved_branch_target(branch).await?; let snapshot = resolved.snapshot; let mut updates = Vec::new(); @@ -1196,6 +1299,7 @@ impl Omnigraph { pub async fn branch_create(&mut self, name: &str) -> Result<()> { self.ensure_schema_state_valid().await?; + self.ensure_schema_apply_idle("branch_create").await?; ensure_public_branch_ref(name, "branch_create")?; self.coordinator.branch_create(name).await } @@ -1209,6 +1313,7 @@ impl Omnigraph { from: impl Into, name: &str, ) -> Result<()> { + self.ensure_schema_apply_idle("branch_create_from").await?; self.branch_create_from_impl(from, name, false).await } @@ -1242,6 +1347,7 @@ impl Omnigraph { pub async fn branch_delete(&mut self, name: &str) -> Result<()> { self.ensure_schema_state_valid().await?; + self.ensure_schema_apply_idle("branch_delete").await?; ensure_public_branch_ref(name, "branch_delete")?; self.refresh().await?; let branch = normalize_branch_name(name)? @@ -1283,6 +1389,7 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { self.ensure_schema_state_valid().await?; + self.ensure_schema_apply_idle("begin_run").await?; ensure_public_branch_ref(target_branch, "begin_run")?; let target_branch = normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string()); @@ -1389,6 +1496,7 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { self.ensure_schema_state_valid().await?; + self.ensure_schema_apply_idle("publish_run").await?; let run = self.get_run(run_id).await?; match run.status { RunStatus::Running => {} @@ -1577,6 +1685,7 @@ impl Omnigraph { branch: Option<&str>, table_key: &str, ) -> Result<(Dataset, String, Option)> { + self.ensure_schema_apply_not_locked("write").await?; let resolved = self.resolved_branch_target(branch).await?; let entry = resolved .snapshot @@ -1677,6 +1786,7 @@ impl Omnigraph { table_branch: Option<&str>, expected_version: u64, ) -> Result { + self.ensure_schema_apply_not_locked("write").await?; self.table_store .reopen_for_mutation(full_path, table_branch, table_key, expected_version) .await @@ -1883,6 +1993,7 @@ impl Omnigraph { &mut self, updates: &[crate::db::SubTableUpdate], ) -> Result { + self.ensure_schema_apply_not_locked("write commit").await?; let current_branch = self.coordinator.current_branch().map(str::to_string); let prepared = self .prepare_updates_for_commit(current_branch.as_deref(), updates) @@ -1920,6 +2031,7 @@ impl Omnigraph { branch: Option<&str>, updates: &[crate::db::SubTableUpdate], ) -> Result { + self.ensure_schema_apply_not_locked("write commit").await?; let prepared = self.prepare_updates_for_commit(branch, updates).await?; self.commit_prepared_updates_on_branch(branch, &prepared) .await @@ -2362,9 +2474,9 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result> { } fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> { - if is_internal_run_branch(branch) { + if is_internal_system_branch(branch) { return Err(OmniError::manifest(format!( - "{} does not allow internal run ref '{}'", + "{} does not allow internal system ref '{}'", operation, branch ))); } @@ -3175,6 +3287,75 @@ edge WorksAt: Person -> Company assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); } + #[tokio::test] + async fn test_apply_schema_rewrite_preserves_existing_indices() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index"); + let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap(); + seed_person_row(&mut db, "Alice", Some(30)).await; + + let desired = initial_schema.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + db.apply_schema(&desired).await.unwrap(); + + let snapshot = db.snapshot(); + let ds = snapshot.open("node:Person").await.unwrap(); + assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap()); + assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); + } + + #[tokio::test] + async fn test_open_for_mutation_rejects_while_schema_apply_locked() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let mut db = db; + db.coordinator + .branch_create(SCHEMA_APPLY_LOCK_BRANCH) + .await + .unwrap(); + + let err = db.open_for_mutation("node:Person").await.unwrap_err(); + assert!( + err.to_string() + .contains("write is unavailable while schema apply is in progress") + ); + } + + #[tokio::test] + async fn test_commit_updates_rejects_while_schema_apply_locked() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + db.coordinator + .branch_create(SCHEMA_APPLY_LOCK_BRANCH) + .await + .unwrap(); + + let err = db.commit_updates(&[]).await.unwrap_err(); + assert!( + err.to_string() + .contains("write commit is unavailable while schema apply is in progress") + ); + } + + #[tokio::test] + async fn test_branch_list_hides_schema_apply_lock_branch() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + db.coordinator + .branch_create(SCHEMA_APPLY_LOCK_BRANCH) + .await + .unwrap(); + + let branches = db.branch_list().await.unwrap(); + assert_eq!(branches, vec!["main".to_string()]); + } + #[tokio::test] async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/src/exec/mod.rs b/crates/omnigraph/src/exec/mod.rs index f8bf9e6..59a5ecc 100644 --- a/crates/omnigraph/src/exec/mod.rs +++ b/crates/omnigraph/src/exec/mod.rs @@ -3371,6 +3371,7 @@ impl Omnigraph { target: &str, actor_id: Option<&str>, ) -> Result { + self.ensure_schema_apply_idle("branch_merge").await?; let previous_actor = self.audit_actor_id.clone(); self.audit_actor_id = actor_id.map(str::to_string); let result = self.branch_merge_impl(source, target, false).await;