Harden schema apply against write races

This commit is contained in:
andrew 2026-04-12 15:19:48 +03:00
parent e5528047a9
commit 6655cd65d5
4 changed files with 221 additions and 28 deletions

View file

@ -8,8 +8,9 @@ use crate::failpoints;
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
use super::commit_graph::{CommitGraph, GraphCommit};
use super::is_internal_system_branch;
use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri, is_internal_run_branch};
use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri};
const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
@ -203,7 +204,7 @@ impl GraphCoordinator {
self.manifest.list_branches().await.map(|branches| {
branches
.into_iter()
.filter(|branch| !is_internal_run_branch(branch))
.filter(|branch| !is_internal_system_branch(branch))
.collect()
})
}
@ -219,7 +220,7 @@ impl GraphCoordinator {
.map(|branches| {
branches
.into_iter()
.filter(|branch| !is_internal_run_branch(branch))
.filter(|branch| !is_internal_system_branch(branch))
.collect()
})
}

View file

@ -11,3 +11,13 @@ pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult};
pub(crate) use run_registry::is_internal_run_branch;
pub use run_registry::{RunId, RunRecord, RunStatus};
pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__";
pub(crate) fn is_schema_apply_lock_branch(name: &str) -> bool {
name.trim_start_matches('/') == SCHEMA_APPLY_LOCK_BRANCH
}
pub(crate) fn is_internal_system_branch(name: &str) -> bool {
is_internal_run_branch(name) || is_schema_apply_lock_branch(name)
}

View file

@ -22,7 +22,7 @@ use omnigraph_compiler::{
};
use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
use crate::db::run_registry::{RunRecord, RunStatus, is_internal_run_branch};
use crate::db::run_registry::{RunRecord, RunStatus};
use crate::error::{ManifestErrorKind, OmniError, Result};
use crate::runtime_cache::RuntimeCache;
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
@ -30,13 +30,17 @@ use crate::table_store::TableStore;
use super::commit_graph::GraphCommit;
use super::manifest::{
ManifestChange, Snapshot, TableRegistration, TableTombstone, table_path_for_table_key,
ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
table_path_for_table_key,
};
use super::schema_state::{
SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
validate_schema_contract, write_schema_contract,
};
use super::{ReadTarget, ResolvedTarget, RunId, SnapshotId};
use super::{
ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
is_internal_system_branch, is_schema_apply_lock_branch,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MergeOutcome {
@ -172,17 +176,31 @@ impl Omnigraph {
}
pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
self.acquire_schema_apply_lock().await?;
let result = self.apply_schema_with_lock(desired_schema_source).await;
let release_result = self.release_schema_apply_lock().await;
match (result, release_result) {
(Ok(result), Ok(())) => Ok(result),
(Ok(_), Err(err)) => Err(err),
(Err(err), Ok(())) => Err(err),
(Err(err), Err(_)) => Err(err),
}
}
async fn apply_schema_with_lock(
&mut self,
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
self.ensure_schema_state_valid().await?;
let branches = self.coordinator.all_branches().await?;
let public_non_main = branches
.iter()
.filter(|branch| branch.as_str() != "main")
.cloned()
let blocking_branches = branches
.into_iter()
.filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch))
.collect::<Vec<_>>();
if !public_non_main.is_empty() {
if !blocking_branches.is_empty() {
return Err(OmniError::manifest_conflict(format!(
"schema apply requires a repo with only main; found non-main branches: {}",
public_non_main.join(", ")
blocking_branches.join(", ")
)));
}
@ -214,6 +232,7 @@ impl Omnigraph {
fixup_blob_schemas(&mut desired_catalog);
let snapshot = self.snapshot();
let base_manifest_version = snapshot.version();
let mut added_tables = BTreeSet::new();
let mut renamed_tables = HashMap::new();
let mut rewritten_tables = BTreeSet::new();
@ -316,6 +335,8 @@ impl Omnigraph {
source_table_key
))
})?;
self.ensure_snapshot_entry_head_matches(source_entry)
.await?;
let source_ds = snapshot.open(source_table_key).await?;
let batch = self
.batch_for_schema_apply_rewrite(
@ -367,6 +388,7 @@ impl Omnigraph {
table_key
))
})?;
self.ensure_snapshot_entry_head_matches(entry).await?;
let source_ds = snapshot.open(table_key).await?;
let batch = self
.batch_for_schema_apply_rewrite(
@ -380,22 +402,12 @@ impl Omnigraph {
.await?;
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
let mut state = self
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
.await?;
let state = self
.table_store
.table_state(&dataset_uri, &target_ds)
.await?;
if indexed_tables.contains(table_key) {
self.build_indices_on_dataset_for_catalog(
&desired_catalog,
table_key,
&mut target_ds,
)
.await?;
state = self
.table_store
.table_state(&dataset_uri, &target_ds)
.await?;
}
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
@ -421,11 +433,14 @@ impl Omnigraph {
table_key
))
})?;
self.ensure_snapshot_entry_head_matches(entry).await?;
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
let mut ds = self
.table_store
.open_dataset_head_for_write(table_key, &dataset_uri, None)
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
self.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = self.table_store.table_state(&dataset_uri, &ds).await?;
@ -458,6 +473,15 @@ impl Omnigraph {
}));
}
self.refresh().await?;
if self.version() != base_manifest_version {
return Err(OmniError::manifest_conflict(format!(
"schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
base_manifest_version,
self.version()
)));
}
let actor_id = self.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
@ -489,6 +513,84 @@ impl Omnigraph {
})
}
pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
self.refresh().await?;
self.ensure_schema_apply_not_locked(operation).await
}
async fn acquire_schema_apply_lock(&mut self) -> Result<()> {
self.ensure_schema_state_valid().await?;
self.refresh().await?;
let branches = self.coordinator.all_branches().await?;
if branches
.iter()
.any(|branch| is_schema_apply_lock_branch(branch))
{
return Err(OmniError::manifest_conflict(
"schema apply is already in progress".to_string(),
));
}
self.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
self.refresh().await?;
let blocking_branches = self
.coordinator
.all_branches()
.await?
.into_iter()
.filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch))
.collect::<Vec<_>>();
if !blocking_branches.is_empty() {
let _ = self.release_schema_apply_lock().await;
return Err(OmniError::manifest_conflict(format!(
"schema apply requires a repo with only main; found non-main branches: {}",
blocking_branches.join(", ")
)));
}
Ok(())
}
async fn release_schema_apply_lock(&mut self) -> Result<()> {
self.coordinator
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
self.refresh().await
}
async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
if self
.coordinator
.all_branches()
.await?
.iter()
.any(|branch| is_schema_apply_lock_branch(branch))
{
return Err(OmniError::manifest_conflict(format!(
"{} is unavailable while schema apply is in progress",
operation
)));
}
Ok(())
}
async fn ensure_snapshot_entry_head_matches(&self, entry: &SubTableEntry) -> Result<()> {
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
let ds = self
.table_store
.open_dataset_head_for_write(
&entry.table_key,
&dataset_uri,
entry.table_branch.as_deref(),
)
.await?;
self.table_store
.ensure_expected_version(&ds, &entry.table_key, entry.table_version)
}
pub(crate) fn table_store(&self) -> &TableStore {
&self.table_store
}
@ -906,6 +1008,7 @@ impl Omnigraph {
pub(crate) async fn ensure_indices_for_branch(&mut self, branch: Option<&str>) -> Result<()> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("ensure_indices").await?;
let resolved = self.resolved_branch_target(branch).await?;
let snapshot = resolved.snapshot;
let mut updates = Vec::new();
@ -1196,6 +1299,7 @@ impl Omnigraph {
pub async fn branch_create(&mut self, name: &str) -> Result<()> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("branch_create").await?;
ensure_public_branch_ref(name, "branch_create")?;
self.coordinator.branch_create(name).await
}
@ -1209,6 +1313,7 @@ impl Omnigraph {
from: impl Into<ReadTarget>,
name: &str,
) -> Result<()> {
self.ensure_schema_apply_idle("branch_create_from").await?;
self.branch_create_from_impl(from, name, false).await
}
@ -1242,6 +1347,7 @@ impl Omnigraph {
pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("branch_delete").await?;
ensure_public_branch_ref(name, "branch_delete")?;
self.refresh().await?;
let branch = normalize_branch_name(name)?
@ -1283,6 +1389,7 @@ impl Omnigraph {
actor_id: Option<&str>,
) -> Result<RunRecord> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("begin_run").await?;
ensure_public_branch_ref(target_branch, "begin_run")?;
let target_branch =
normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string());
@ -1389,6 +1496,7 @@ impl Omnigraph {
actor_id: Option<&str>,
) -> Result<SnapshotId> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("publish_run").await?;
let run = self.get_run(run_id).await?;
match run.status {
RunStatus::Running => {}
@ -1577,6 +1685,7 @@ impl Omnigraph {
branch: Option<&str>,
table_key: &str,
) -> Result<(Dataset, String, Option<String>)> {
self.ensure_schema_apply_not_locked("write").await?;
let resolved = self.resolved_branch_target(branch).await?;
let entry = resolved
.snapshot
@ -1677,6 +1786,7 @@ impl Omnigraph {
table_branch: Option<&str>,
expected_version: u64,
) -> Result<Dataset> {
self.ensure_schema_apply_not_locked("write").await?;
self.table_store
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
.await
@ -1883,6 +1993,7 @@ impl Omnigraph {
&mut self,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
self.ensure_schema_apply_not_locked("write commit").await?;
let current_branch = self.coordinator.current_branch().map(str::to_string);
let prepared = self
.prepare_updates_for_commit(current_branch.as_deref(), updates)
@ -1920,6 +2031,7 @@ impl Omnigraph {
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
self.ensure_schema_apply_not_locked("write commit").await?;
let prepared = self.prepare_updates_for_commit(branch, updates).await?;
self.commit_prepared_updates_on_branch(branch, &prepared)
.await
@ -2362,9 +2474,9 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
}
fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
if is_internal_run_branch(branch) {
if is_internal_system_branch(branch) {
return Err(OmniError::manifest(format!(
"{} does not allow internal run ref '{}'",
"{} does not allow internal system ref '{}'",
operation, branch
)));
}
@ -3175,6 +3287,75 @@ edge WorksAt: Person -> Company
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
}
#[tokio::test]
async fn test_apply_schema_rewrite_preserves_existing_indices() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
seed_person_row(&mut db, "Alice", Some(30)).await;
let desired = initial_schema.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
db.apply_schema(&desired).await.unwrap();
let snapshot = db.snapshot();
let ds = snapshot.open("node:Person").await.unwrap();
assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
}
#[tokio::test]
async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let mut db = db;
db.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();
let err = db.open_for_mutation("node:Person").await.unwrap_err();
assert!(
err.to_string()
.contains("write is unavailable while schema apply is in progress")
);
}
#[tokio::test]
async fn test_commit_updates_rejects_while_schema_apply_locked() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();
let err = db.commit_updates(&[]).await.unwrap_err();
assert!(
err.to_string()
.contains("write commit is unavailable while schema apply is in progress")
);
}
#[tokio::test]
async fn test_branch_list_hides_schema_apply_lock_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();
let branches = db.branch_list().await.unwrap();
assert_eq!(branches, vec!["main".to_string()]);
}
#[tokio::test]
async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() {
let dir = tempfile::tempdir().unwrap();

View file

@ -3371,6 +3371,7 @@ impl Omnigraph {
target: &str,
actor_id: Option<&str>,
) -> Result<MergeOutcome> {
self.ensure_schema_apply_idle("branch_merge").await?;
let previous_actor = self.audit_actor_id.clone();
self.audit_actor_id = actor_id.map(str::to_string);
let result = self.branch_merge_impl(source, target, false).await;