mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Merge pull request #7 from ModernRelay/codex-schema-apply-concurrency
Harden schema apply against write races
This commit is contained in:
commit
75070c4632
4 changed files with 221 additions and 28 deletions
|
|
@ -8,8 +8,9 @@ use crate::failpoints;
|
|||
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
|
||||
|
||||
use super::commit_graph::{CommitGraph, GraphCommit};
|
||||
use super::is_internal_system_branch;
|
||||
use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
|
||||
use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri, is_internal_run_branch};
|
||||
use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri};
|
||||
|
||||
const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
|
||||
|
||||
|
|
@ -203,7 +204,7 @@ impl GraphCoordinator {
|
|||
self.manifest.list_branches().await.map(|branches| {
|
||||
branches
|
||||
.into_iter()
|
||||
.filter(|branch| !is_internal_run_branch(branch))
|
||||
.filter(|branch| !is_internal_system_branch(branch))
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
|
@ -219,7 +220,7 @@ impl GraphCoordinator {
|
|||
.map(|branches| {
|
||||
branches
|
||||
.into_iter()
|
||||
.filter(|branch| !is_internal_run_branch(branch))
|
||||
.filter(|branch| !is_internal_system_branch(branch))
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,3 +11,13 @@ pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
|
|||
pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult};
|
||||
pub(crate) use run_registry::is_internal_run_branch;
|
||||
pub use run_registry::{RunId, RunRecord, RunStatus};
|
||||
|
||||
pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__";
|
||||
|
||||
pub(crate) fn is_schema_apply_lock_branch(name: &str) -> bool {
|
||||
name.trim_start_matches('/') == SCHEMA_APPLY_LOCK_BRANCH
|
||||
}
|
||||
|
||||
pub(crate) fn is_internal_system_branch(name: &str) -> bool {
|
||||
is_internal_run_branch(name) || is_schema_apply_lock_branch(name)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ use omnigraph_compiler::{
|
|||
};
|
||||
|
||||
use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
|
||||
use crate::db::run_registry::{RunRecord, RunStatus, is_internal_run_branch};
|
||||
use crate::db::run_registry::{RunRecord, RunStatus};
|
||||
use crate::error::{ManifestErrorKind, OmniError, Result};
|
||||
use crate::runtime_cache::RuntimeCache;
|
||||
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
|
||||
|
|
@ -30,13 +30,17 @@ use crate::table_store::TableStore;
|
|||
|
||||
use super::commit_graph::GraphCommit;
|
||||
use super::manifest::{
|
||||
ManifestChange, Snapshot, TableRegistration, TableTombstone, table_path_for_table_key,
|
||||
ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
|
||||
table_path_for_table_key,
|
||||
};
|
||||
use super::schema_state::{
|
||||
SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
|
||||
validate_schema_contract, write_schema_contract,
|
||||
};
|
||||
use super::{ReadTarget, ResolvedTarget, RunId, SnapshotId};
|
||||
use super::{
|
||||
ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
|
||||
is_internal_system_branch, is_schema_apply_lock_branch,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum MergeOutcome {
|
||||
|
|
@ -172,17 +176,31 @@ impl Omnigraph {
|
|||
}
|
||||
|
||||
pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
|
||||
self.acquire_schema_apply_lock().await?;
|
||||
let result = self.apply_schema_with_lock(desired_schema_source).await;
|
||||
let release_result = self.release_schema_apply_lock().await;
|
||||
match (result, release_result) {
|
||||
(Ok(result), Ok(())) => Ok(result),
|
||||
(Ok(_), Err(err)) => Err(err),
|
||||
(Err(err), Ok(())) => Err(err),
|
||||
(Err(err), Err(_)) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_schema_with_lock(
|
||||
&mut self,
|
||||
desired_schema_source: &str,
|
||||
) -> Result<SchemaApplyResult> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
let branches = self.coordinator.all_branches().await?;
|
||||
let public_non_main = branches
|
||||
.iter()
|
||||
.filter(|branch| branch.as_str() != "main")
|
||||
.cloned()
|
||||
let blocking_branches = branches
|
||||
.into_iter()
|
||||
.filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch))
|
||||
.collect::<Vec<_>>();
|
||||
if !public_non_main.is_empty() {
|
||||
if !blocking_branches.is_empty() {
|
||||
return Err(OmniError::manifest_conflict(format!(
|
||||
"schema apply requires a repo with only main; found non-main branches: {}",
|
||||
public_non_main.join(", ")
|
||||
blocking_branches.join(", ")
|
||||
)));
|
||||
}
|
||||
|
||||
|
|
@ -214,6 +232,7 @@ impl Omnigraph {
|
|||
fixup_blob_schemas(&mut desired_catalog);
|
||||
|
||||
let snapshot = self.snapshot();
|
||||
let base_manifest_version = snapshot.version();
|
||||
let mut added_tables = BTreeSet::new();
|
||||
let mut renamed_tables = HashMap::new();
|
||||
let mut rewritten_tables = BTreeSet::new();
|
||||
|
|
@ -316,6 +335,8 @@ impl Omnigraph {
|
|||
source_table_key
|
||||
))
|
||||
})?;
|
||||
self.ensure_snapshot_entry_head_matches(source_entry)
|
||||
.await?;
|
||||
let source_ds = snapshot.open(source_table_key).await?;
|
||||
let batch = self
|
||||
.batch_for_schema_apply_rewrite(
|
||||
|
|
@ -367,6 +388,7 @@ impl Omnigraph {
|
|||
table_key
|
||||
))
|
||||
})?;
|
||||
self.ensure_snapshot_entry_head_matches(entry).await?;
|
||||
let source_ds = snapshot.open(table_key).await?;
|
||||
let batch = self
|
||||
.batch_for_schema_apply_rewrite(
|
||||
|
|
@ -380,22 +402,12 @@ impl Omnigraph {
|
|||
.await?;
|
||||
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
|
||||
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
|
||||
let mut state = self
|
||||
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
|
||||
.await?;
|
||||
let state = self
|
||||
.table_store
|
||||
.table_state(&dataset_uri, &target_ds)
|
||||
.await?;
|
||||
if indexed_tables.contains(table_key) {
|
||||
self.build_indices_on_dataset_for_catalog(
|
||||
&desired_catalog,
|
||||
table_key,
|
||||
&mut target_ds,
|
||||
)
|
||||
.await?;
|
||||
state = self
|
||||
.table_store
|
||||
.table_state(&dataset_uri, &target_ds)
|
||||
.await?;
|
||||
}
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
|
|
@ -421,11 +433,14 @@ impl Omnigraph {
|
|||
table_key
|
||||
))
|
||||
})?;
|
||||
self.ensure_snapshot_entry_head_matches(entry).await?;
|
||||
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
|
||||
let mut ds = self
|
||||
.table_store
|
||||
.open_dataset_head_for_write(table_key, &dataset_uri, None)
|
||||
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
|
||||
.await?;
|
||||
self.table_store
|
||||
.ensure_expected_version(&ds, table_key, entry.table_version)?;
|
||||
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
|
||||
.await?;
|
||||
let state = self.table_store.table_state(&dataset_uri, &ds).await?;
|
||||
|
|
@ -458,6 +473,15 @@ impl Omnigraph {
|
|||
}));
|
||||
}
|
||||
|
||||
self.refresh().await?;
|
||||
if self.version() != base_manifest_version {
|
||||
return Err(OmniError::manifest_conflict(format!(
|
||||
"schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
|
||||
base_manifest_version,
|
||||
self.version()
|
||||
)));
|
||||
}
|
||||
|
||||
let actor_id = self.current_audit_actor().map(str::to_string);
|
||||
let PublishedSnapshot {
|
||||
manifest_version,
|
||||
|
|
@ -489,6 +513,84 @@ impl Omnigraph {
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
|
||||
self.refresh().await?;
|
||||
self.ensure_schema_apply_not_locked(operation).await
|
||||
}
|
||||
|
||||
async fn acquire_schema_apply_lock(&mut self) -> Result<()> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
self.refresh().await?;
|
||||
let branches = self.coordinator.all_branches().await?;
|
||||
if branches
|
||||
.iter()
|
||||
.any(|branch| is_schema_apply_lock_branch(branch))
|
||||
{
|
||||
return Err(OmniError::manifest_conflict(
|
||||
"schema apply is already in progress".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
self.coordinator
|
||||
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
|
||||
.await?;
|
||||
self.refresh().await?;
|
||||
|
||||
let blocking_branches = self
|
||||
.coordinator
|
||||
.all_branches()
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch))
|
||||
.collect::<Vec<_>>();
|
||||
if !blocking_branches.is_empty() {
|
||||
let _ = self.release_schema_apply_lock().await;
|
||||
return Err(OmniError::manifest_conflict(format!(
|
||||
"schema apply requires a repo with only main; found non-main branches: {}",
|
||||
blocking_branches.join(", ")
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn release_schema_apply_lock(&mut self) -> Result<()> {
|
||||
self.coordinator
|
||||
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
|
||||
.await?;
|
||||
self.refresh().await
|
||||
}
|
||||
|
||||
async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
|
||||
if self
|
||||
.coordinator
|
||||
.all_branches()
|
||||
.await?
|
||||
.iter()
|
||||
.any(|branch| is_schema_apply_lock_branch(branch))
|
||||
{
|
||||
return Err(OmniError::manifest_conflict(format!(
|
||||
"{} is unavailable while schema apply is in progress",
|
||||
operation
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ensure_snapshot_entry_head_matches(&self, entry: &SubTableEntry) -> Result<()> {
|
||||
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
|
||||
let ds = self
|
||||
.table_store
|
||||
.open_dataset_head_for_write(
|
||||
&entry.table_key,
|
||||
&dataset_uri,
|
||||
entry.table_branch.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
self.table_store
|
||||
.ensure_expected_version(&ds, &entry.table_key, entry.table_version)
|
||||
}
|
||||
|
||||
pub(crate) fn table_store(&self) -> &TableStore {
|
||||
&self.table_store
|
||||
}
|
||||
|
|
@ -906,6 +1008,7 @@ impl Omnigraph {
|
|||
|
||||
pub(crate) async fn ensure_indices_for_branch(&mut self, branch: Option<&str>) -> Result<()> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
self.ensure_schema_apply_idle("ensure_indices").await?;
|
||||
let resolved = self.resolved_branch_target(branch).await?;
|
||||
let snapshot = resolved.snapshot;
|
||||
let mut updates = Vec::new();
|
||||
|
|
@ -1196,6 +1299,7 @@ impl Omnigraph {
|
|||
|
||||
pub async fn branch_create(&mut self, name: &str) -> Result<()> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
self.ensure_schema_apply_idle("branch_create").await?;
|
||||
ensure_public_branch_ref(name, "branch_create")?;
|
||||
self.coordinator.branch_create(name).await
|
||||
}
|
||||
|
|
@ -1209,6 +1313,7 @@ impl Omnigraph {
|
|||
from: impl Into<ReadTarget>,
|
||||
name: &str,
|
||||
) -> Result<()> {
|
||||
self.ensure_schema_apply_idle("branch_create_from").await?;
|
||||
self.branch_create_from_impl(from, name, false).await
|
||||
}
|
||||
|
||||
|
|
@ -1242,6 +1347,7 @@ impl Omnigraph {
|
|||
|
||||
pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
self.ensure_schema_apply_idle("branch_delete").await?;
|
||||
ensure_public_branch_ref(name, "branch_delete")?;
|
||||
self.refresh().await?;
|
||||
let branch = normalize_branch_name(name)?
|
||||
|
|
@ -1283,6 +1389,7 @@ impl Omnigraph {
|
|||
actor_id: Option<&str>,
|
||||
) -> Result<RunRecord> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
self.ensure_schema_apply_idle("begin_run").await?;
|
||||
ensure_public_branch_ref(target_branch, "begin_run")?;
|
||||
let target_branch =
|
||||
normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string());
|
||||
|
|
@ -1389,6 +1496,7 @@ impl Omnigraph {
|
|||
actor_id: Option<&str>,
|
||||
) -> Result<SnapshotId> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
self.ensure_schema_apply_idle("publish_run").await?;
|
||||
let run = self.get_run(run_id).await?;
|
||||
match run.status {
|
||||
RunStatus::Running => {}
|
||||
|
|
@ -1577,6 +1685,7 @@ impl Omnigraph {
|
|||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
) -> Result<(Dataset, String, Option<String>)> {
|
||||
self.ensure_schema_apply_not_locked("write").await?;
|
||||
let resolved = self.resolved_branch_target(branch).await?;
|
||||
let entry = resolved
|
||||
.snapshot
|
||||
|
|
@ -1677,6 +1786,7 @@ impl Omnigraph {
|
|||
table_branch: Option<&str>,
|
||||
expected_version: u64,
|
||||
) -> Result<Dataset> {
|
||||
self.ensure_schema_apply_not_locked("write").await?;
|
||||
self.table_store
|
||||
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
|
||||
.await
|
||||
|
|
@ -1883,6 +1993,7 @@ impl Omnigraph {
|
|||
&mut self,
|
||||
updates: &[crate::db::SubTableUpdate],
|
||||
) -> Result<u64> {
|
||||
self.ensure_schema_apply_not_locked("write commit").await?;
|
||||
let current_branch = self.coordinator.current_branch().map(str::to_string);
|
||||
let prepared = self
|
||||
.prepare_updates_for_commit(current_branch.as_deref(), updates)
|
||||
|
|
@ -1920,6 +2031,7 @@ impl Omnigraph {
|
|||
branch: Option<&str>,
|
||||
updates: &[crate::db::SubTableUpdate],
|
||||
) -> Result<u64> {
|
||||
self.ensure_schema_apply_not_locked("write commit").await?;
|
||||
let prepared = self.prepare_updates_for_commit(branch, updates).await?;
|
||||
self.commit_prepared_updates_on_branch(branch, &prepared)
|
||||
.await
|
||||
|
|
@ -2362,9 +2474,9 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
|
|||
}
|
||||
|
||||
fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
|
||||
if is_internal_run_branch(branch) {
|
||||
if is_internal_system_branch(branch) {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"{} does not allow internal run ref '{}'",
|
||||
"{} does not allow internal system ref '{}'",
|
||||
operation, branch
|
||||
)));
|
||||
}
|
||||
|
|
@ -3175,6 +3287,75 @@ edge WorksAt: Person -> Company
|
|||
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_rewrite_preserves_existing_indices() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
|
||||
let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
|
||||
seed_person_row(&mut db, "Alice", Some(30)).await;
|
||||
|
||||
let desired = initial_schema.replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
db.apply_schema(&desired).await.unwrap();
|
||||
|
||||
let snapshot = db.snapshot();
|
||||
let ds = snapshot.open("node:Person").await.unwrap();
|
||||
assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
|
||||
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
let mut db = db;
|
||||
db.coordinator
|
||||
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = db.open_for_mutation("node:Person").await.unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("write is unavailable while schema apply is in progress")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_updates_rejects_while_schema_apply_locked() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
db.coordinator
|
||||
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = db.commit_updates(&[]).await.unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("write commit is unavailable while schema apply is in progress")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_list_hides_schema_apply_lock_branch() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
db.coordinator
|
||||
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let branches = db.branch_list().await.unwrap();
|
||||
assert_eq!(branches, vec!["main".to_string()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -3371,6 +3371,7 @@ impl Omnigraph {
|
|||
target: &str,
|
||||
actor_id: Option<&str>,
|
||||
) -> Result<MergeOutcome> {
|
||||
self.ensure_schema_apply_idle("branch_merge").await?;
|
||||
let previous_actor = self.audit_actor_id.clone();
|
||||
self.audit_actor_id = actor_id.map(str::to_string);
|
||||
let result = self.branch_merge_impl(source, target, false).await;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue