diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index f63f9f0..66a510e 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -28,6 +28,10 @@ use crate::runtime_cache::RuntimeCache; use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri}; use crate::table_store::TableStore; +mod export; +mod schema_apply; +mod table_ops; + use super::commit_graph::GraphCommit; use super::manifest::{ ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone, @@ -168,427 +172,19 @@ impl Omnigraph { } pub async fn plan_schema(&self, desired_schema_source: &str) -> Result { - self.ensure_schema_state_valid().await?; - let accepted_ir = read_accepted_schema_ir(self.uri(), Arc::clone(&self.storage)).await?; - let desired_ir = read_schema_ir_from_source(desired_schema_source)?; - plan_schema_migration(&accepted_ir, &desired_ir) - .map_err(|err| OmniError::manifest(err.to_string())) + schema_apply::plan_schema(self, desired_schema_source).await } pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result { - self.acquire_schema_apply_lock().await?; - let result = self.apply_schema_with_lock(desired_schema_source).await; - let release_result = self.release_schema_apply_lock().await; - match (result, release_result) { - (Ok(result), Ok(())) => Ok(result), - (Ok(_), Err(err)) => Err(err), - (Err(err), Ok(())) => Err(err), - (Err(err), Err(_)) => Err(err), - } - } - - async fn apply_schema_with_lock( - &mut self, - desired_schema_source: &str, - ) -> Result { - self.ensure_schema_state_valid().await?; - let branches = self.coordinator.all_branches().await?; - let blocking_branches = branches - .into_iter() - .filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch)) - .collect::>(); - if !blocking_branches.is_empty() { - return Err(OmniError::manifest_conflict(format!( - "schema apply requires a repo with only main; found non-main branches: {}", - blocking_branches.join(", ") - ))); - } - - let accepted_ir = read_accepted_schema_ir(self.uri(), Arc::clone(&self.storage)).await?; - let desired_ir = read_schema_ir_from_source(desired_schema_source)?; - let plan = plan_schema_migration(&accepted_ir, &desired_ir) - .map_err(|err| OmniError::manifest(err.to_string()))?; - if !plan.supported { - let reason = plan - .steps - .iter() - .find_map(|step| match step { - SchemaMigrationStep::UnsupportedChange { reason, .. } => Some(reason.as_str()), - _ => None, - }) - .unwrap_or("unsupported schema migration plan"); - return Err(OmniError::manifest(reason.to_string())); - } - if plan.steps.is_empty() { - return Ok(SchemaApplyResult { - supported: true, - applied: false, - manifest_version: self.version(), - steps: plan.steps, - }); - } - - let mut desired_catalog = build_catalog_from_ir(&desired_ir)?; - fixup_blob_schemas(&mut desired_catalog); - - let snapshot = self.snapshot(); - let base_manifest_version = snapshot.version(); - let mut added_tables = BTreeSet::new(); - let mut renamed_tables = HashMap::new(); - let mut rewritten_tables = BTreeSet::new(); - let mut indexed_tables = BTreeSet::new(); - let mut property_renames = HashMap::>::new(); - let mut changed_edge_tables = false; - - for step in &plan.steps { - match step { - SchemaMigrationStep::AddType { type_kind, name } => { - let table_key = schema_table_key(*type_kind, name); - if table_key.starts_with("edge:") { - changed_edge_tables = true; - } - added_tables.insert(table_key); - } - SchemaMigrationStep::RenameType { - type_kind, - from, - to, - } => { - let source_key = schema_table_key(*type_kind, from); - let target_key = schema_table_key(*type_kind, to); - if source_key.starts_with("edge:") { - changed_edge_tables = true; - } - renamed_tables.insert(target_key, source_key); - } - SchemaMigrationStep::AddProperty { - type_kind, - type_name, - .. - } => { - let table_key = schema_table_key(*type_kind, type_name); - if table_key.starts_with("edge:") { - changed_edge_tables = true; - } - rewritten_tables.insert(table_key); - } - SchemaMigrationStep::RenameProperty { - type_kind, - type_name, - from, - to, - } => { - let table_key = schema_table_key(*type_kind, type_name); - if table_key.starts_with("edge:") { - changed_edge_tables = true; - } - rewritten_tables.insert(table_key.clone()); - property_renames - .entry(table_key) - .or_default() - .insert(to.clone(), from.clone()); - } - SchemaMigrationStep::AddConstraint { - type_kind, - type_name, - .. - } => { - indexed_tables.insert(schema_table_key(*type_kind, type_name)); - } - SchemaMigrationStep::UpdateTypeMetadata { .. } - | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {} - SchemaMigrationStep::UnsupportedChange { reason, .. } => { - return Err(OmniError::manifest(reason.clone())); - } - } - } - - let mut table_registrations = HashMap::::new(); - let mut table_updates = HashMap::::new(); - let mut table_tombstones = HashMap::::new(); - - for table_key in &added_tables { - let table_path = table_path_for_table_key(table_key)?; - let dataset_uri = self.table_store.dataset_uri(&table_path); - let schema = schema_for_table_key(&desired_catalog, table_key)?; - let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?; - self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) - .await?; - let state = self.table_store.table_state(&dataset_uri, &ds).await?; - table_registrations.insert(table_key.clone(), table_path); - table_updates.insert( - table_key.clone(), - crate::db::SubTableUpdate { - table_key: table_key.clone(), - table_version: state.version, - table_branch: None, - row_count: state.row_count, - version_metadata: state.version_metadata, - }, - ); - } - - for (target_table_key, source_table_key) in &renamed_tables { - let source_entry = snapshot.entry(source_table_key).ok_or_else(|| { - OmniError::manifest(format!( - "missing source table '{}' for schema rename", - source_table_key - )) - })?; - self.ensure_snapshot_entry_head_matches(source_entry) - .await?; - let source_ds = snapshot.open(source_table_key).await?; - let batch = self - .batch_for_schema_apply_rewrite( - &source_ds, - source_table_key, - &self.catalog, - target_table_key, - &desired_catalog, - property_renames.get(target_table_key), - ) - .await?; - let table_path = table_path_for_table_key(target_table_key)?; - let dataset_uri = self.table_store.dataset_uri(&table_path); - let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?; - self.build_indices_on_dataset_for_catalog( - &desired_catalog, - target_table_key, - &mut target_ds, - ) - .await?; - let state = self - .table_store - .table_state(&dataset_uri, &target_ds) - .await?; - table_registrations.insert(target_table_key.clone(), table_path); - table_updates.insert( - target_table_key.clone(), - crate::db::SubTableUpdate { - table_key: target_table_key.clone(), - table_version: state.version, - table_branch: None, - row_count: state.row_count, - version_metadata: state.version_metadata, - }, - ); - table_tombstones.insert( - source_table_key.clone(), - source_entry.table_version.saturating_add(1), - ); - } - - for table_key in &rewritten_tables { - if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) { - continue; - } - let entry = snapshot.entry(table_key).ok_or_else(|| { - OmniError::manifest(format!( - "missing source table '{}' for schema apply", - table_key - )) - })?; - self.ensure_snapshot_entry_head_matches(entry).await?; - let source_ds = snapshot.open(table_key).await?; - let batch = self - .batch_for_schema_apply_rewrite( - &source_ds, - table_key, - &self.catalog, - table_key, - &desired_catalog, - property_renames.get(table_key), - ) - .await?; - let dataset_uri = self.table_store.dataset_uri(&entry.table_path); - let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?; - self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds) - .await?; - let state = self - .table_store - .table_state(&dataset_uri, &target_ds) - .await?; - table_updates.insert( - table_key.clone(), - crate::db::SubTableUpdate { - table_key: table_key.clone(), - table_version: state.version, - table_branch: None, - row_count: state.row_count, - version_metadata: state.version_metadata, - }, - ); - } - - for table_key in &indexed_tables { - if added_tables.contains(table_key) - || renamed_tables.contains_key(table_key) - || rewritten_tables.contains(table_key) - { - continue; - } - let entry = snapshot.entry(table_key).ok_or_else(|| { - OmniError::manifest(format!( - "missing table '{}' for schema index apply", - table_key - )) - })?; - self.ensure_snapshot_entry_head_matches(entry).await?; - let dataset_uri = self.table_store.dataset_uri(&entry.table_path); - let mut ds = self - .table_store - .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) - .await?; - self.table_store - .ensure_expected_version(&ds, table_key, entry.table_version)?; - self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) - .await?; - let state = self.table_store.table_state(&dataset_uri, &ds).await?; - table_updates.insert( - table_key.clone(), - crate::db::SubTableUpdate { - table_key: table_key.clone(), - table_version: state.version, - table_branch: None, - row_count: state.row_count, - version_metadata: state.version_metadata, - }, - ); - } - - let mut manifest_changes = Vec::new(); - for (table_key, table_path) in table_registrations { - manifest_changes.push(ManifestChange::RegisterTable(TableRegistration { - table_key, - table_path, - })); - } - for update in table_updates.into_values() { - manifest_changes.push(ManifestChange::Update(update)); - } - for (table_key, tombstone_version) in table_tombstones { - manifest_changes.push(ManifestChange::Tombstone(TableTombstone { - table_key, - tombstone_version, - })); - } - - self.refresh().await?; - if self.version() != base_manifest_version { - return Err(OmniError::manifest_conflict(format!( - "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress", - base_manifest_version, - self.version() - ))); - } - - let actor_id = self.current_audit_actor().map(str::to_string); - let PublishedSnapshot { - manifest_version, - _snapshot_id: _, - } = self - .coordinator - .commit_changes_with_actor(&manifest_changes, actor_id.as_deref()) - .await?; - - let schema_path = join_uri(&self.root_uri, SCHEMA_SOURCE_FILENAME); - self.storage - .write_text(&schema_path, desired_schema_source) - .await?; - write_schema_contract(&self.root_uri, self.storage.as_ref(), &desired_ir).await?; - - self.catalog = desired_catalog; - self.schema_source = desired_schema_source.to_string(); - self.coordinator.refresh().await?; - self.runtime_cache.invalidate_all().await; - if changed_edge_tables { - self.invalidate_graph_index().await; - } - - Ok(SchemaApplyResult { - supported: true, - applied: true, - manifest_version, - steps: plan.steps, - }) + schema_apply::apply_schema(self, desired_schema_source).await } pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> { - self.refresh().await?; - self.ensure_schema_apply_not_locked(operation).await - } - - async fn acquire_schema_apply_lock(&mut self) -> Result<()> { - self.ensure_schema_state_valid().await?; - self.refresh().await?; - let branches = self.coordinator.all_branches().await?; - if branches - .iter() - .any(|branch| is_schema_apply_lock_branch(branch)) - { - return Err(OmniError::manifest_conflict( - "schema apply is already in progress".to_string(), - )); - } - - self.coordinator - .branch_create(SCHEMA_APPLY_LOCK_BRANCH) - .await?; - self.refresh().await?; - - let blocking_branches = self - .coordinator - .all_branches() - .await? - .into_iter() - .filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch)) - .collect::>(); - if !blocking_branches.is_empty() { - let _ = self.release_schema_apply_lock().await; - return Err(OmniError::manifest_conflict(format!( - "schema apply requires a repo with only main; found non-main branches: {}", - blocking_branches.join(", ") - ))); - } - - Ok(()) - } - - async fn release_schema_apply_lock(&mut self) -> Result<()> { - self.coordinator - .branch_delete(SCHEMA_APPLY_LOCK_BRANCH) - .await?; - self.refresh().await + schema_apply::ensure_schema_apply_idle(self, operation).await } async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> { - if self - .coordinator - .all_branches() - .await? - .iter() - .any(|branch| is_schema_apply_lock_branch(branch)) - { - return Err(OmniError::manifest_conflict(format!( - "{} is unavailable while schema apply is in progress", - operation - ))); - } - Ok(()) - } - - async fn ensure_snapshot_entry_head_matches(&self, entry: &SubTableEntry) -> Result<()> { - let dataset_uri = self.table_store.dataset_uri(&entry.table_path); - let ds = self - .table_store - .open_dataset_head_for_write( - &entry.table_key, - &dataset_uri, - entry.table_branch.as_deref(), - ) - .await?; - self.table_store - .ensure_expected_version(&ds, &entry.table_key, entry.table_version) + schema_apply::ensure_schema_apply_not_locked(self, operation).await } pub(crate) fn table_store(&self) -> &TableStore { @@ -772,9 +368,7 @@ impl Omnigraph { table_key: &str, id: &str, ) -> Result> { - let resolved = self.resolved_target(target).await?; - self.entity_from_snapshot(&resolved.snapshot, table_key, id) - .await + export::entity_at_target(self, target, table_key, id).await } /// Read one entity at a specific manifest version via time travel (on-demand enrichment). @@ -784,8 +378,7 @@ impl Omnigraph { id: &str, version: u64, ) -> Result> { - let snap = self.coordinator.snapshot_at_version(version).await?; - self.entity_from_snapshot(&snap, table_key, id).await + export::entity_at(self, table_key, id, version).await } /// Create a Snapshot at any historical manifest version. @@ -800,11 +393,7 @@ impl Omnigraph { type_names: &[String], table_keys: &[String], ) -> Result { - let mut out = Vec::new(); - self.export_jsonl_to_writer(branch, type_names, table_keys, &mut out) - .await?; - String::from_utf8(out) - .map_err(|err| OmniError::manifest(format!("export produced invalid UTF-8: {}", err))) + export::export_jsonl(self, branch, type_names, table_keys).await } pub async fn export_jsonl_to_writer( @@ -814,173 +403,21 @@ impl Omnigraph { table_keys: &[String], writer: &mut W, ) -> Result<()> { - self.ensure_schema_state_valid().await?; - let snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?; - self.export_snapshot_jsonl_to_writer(&snapshot, type_names, table_keys, writer) - .await - } - - async fn entity_from_snapshot( - &self, - snapshot: &Snapshot, - table_key: &str, - id: &str, - ) -> Result> { - if snapshot.entry(table_key).is_none() { - return Ok(None); - } - - let ds = self - .table_store - .open_snapshot_table(snapshot, table_key) - .await?; - let filter_sql = format!("id = '{}'", id.replace('\'', "''")); - let batches = self - .table_store - .scan(&ds, None, Some(&filter_sql), None) - .await?; - let Some(batch) = batches.iter().find(|batch| batch.num_rows() > 0) else { - return Ok(None); - }; - Ok(Some(record_batch_row_to_json(batch, 0)?)) - } - - async fn export_snapshot_jsonl_to_writer( - &self, - snapshot: &Snapshot, - type_names: &[String], - table_keys: &[String], - writer: &mut W, - ) -> Result<()> { - let selected_tables = self.export_table_keys(snapshot, type_names, table_keys)?; - for table_key in selected_tables { - self.export_table_to_writer(snapshot, &table_key, writer) - .await?; - } - Ok(()) - } - - fn export_table_keys( - &self, - snapshot: &Snapshot, - type_names: &[String], - table_keys: &[String], - ) -> Result> { - let available = snapshot - .entries() - .map(|entry| entry.table_key.clone()) - .collect::>(); - let mut selected = BTreeSet::new(); - - for table_key in table_keys { - if !available.contains(table_key) { - return Err(OmniError::manifest(format!( - "unknown export table '{}'", - table_key - ))); - } - selected.insert(table_key.clone()); - } - - for type_name in type_names { - let mut matched = false; - let node_key = format!("node:{}", type_name); - if available.contains(&node_key) { - selected.insert(node_key); - matched = true; - } - let edge_key = format!("edge:{}", type_name); - if available.contains(&edge_key) { - selected.insert(edge_key); - matched = true; - } - if !matched { - return Err(OmniError::manifest(format!( - "unknown export type '{}'", - type_name - ))); - } - } - - if selected.is_empty() { - return Ok(available.into_iter().collect()); - } - - Ok(selected.into_iter().collect()) - } - - async fn export_table_to_writer( - &self, - snapshot: &Snapshot, - table_key: &str, - writer: &mut W, - ) -> Result<()> { - let ds = self - .table_store - .open_snapshot_table(snapshot, table_key) - .await?; - let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]); - let blob_properties = blob_properties_for_table_key(self.catalog(), table_key)?; - - if blob_properties.is_empty() { - for batch in self.table_store.scan(&ds, None, None, ordering).await? { - self.write_export_rows_from_batch(table_key, &batch, None, writer)?; - } - return Ok(()); - } - - let batches = self - .table_store - .scan_with(&ds, None, None, ordering, true, |_| Ok(())) - .await?; - for batch in batches { - let row_ids = batch - .column_by_name("_rowid") - .and_then(|col| col.as_any().downcast_ref::()) - .ok_or_else(|| { - OmniError::Lance(format!( - "expected _rowid column when exporting '{}'", - table_key - )) - })? - .values() - .iter() - .copied() - .collect::>(); - let blob_values = self - .export_blob_values(&ds, &batch, &row_ids, blob_properties) - .await?; - self.write_export_rows_from_batch(table_key, &batch, Some(&blob_values), writer)?; - } - Ok(()) + export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await } // ─── Graph index ────────────────────────────────────────────────────── /// Get or build the graph index for the current snapshot. pub async fn graph_index(&self) -> Result> { - self.ensure_schema_state_valid().await?; - let resolved = self - .coordinator - .resolve_target(&ReadTarget::Branch( - self.coordinator - .current_branch() - .unwrap_or("main") - .to_string(), - )) - .await?; - self.runtime_cache - .graph_index(&resolved, &self.catalog) - .await + table_ops::graph_index(self).await } pub(crate) async fn graph_index_for_resolved( &self, resolved: &ResolvedTarget, ) -> Result> { - self.runtime_cache - .graph_index(resolved, &self.catalog) - .await + table_ops::graph_index_for_resolved(self, resolved).await } /// Ensure BTree scalar indices exist on key columns. @@ -996,122 +433,11 @@ impl Omnigraph { /// from an ancestor branch are first forked into the active branch before /// their index metadata is updated. pub async fn ensure_indices(&mut self) -> Result<()> { - let current_branch = self.coordinator.current_branch().map(str::to_string); - self.ensure_indices_for_branch(current_branch.as_deref()) - .await + table_ops::ensure_indices(self).await } pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> { - let branch = normalize_branch_name(branch)?; - self.ensure_indices_for_branch(branch.as_deref()).await - } - - pub(crate) async fn ensure_indices_for_branch(&mut self, branch: Option<&str>) -> Result<()> { - self.ensure_schema_state_valid().await?; - self.ensure_schema_apply_idle("ensure_indices").await?; - let resolved = self.resolved_branch_target(branch).await?; - let snapshot = resolved.snapshot; - let mut updates = Vec::new(); - let active_branch = resolved.branch; - - for type_name in self.catalog.node_types.keys() { - let table_key = format!("node:{}", type_name); - let Some(entry) = snapshot.entry(&table_key) else { - continue; - }; - let full_path = format!("{}/{}", self.root_uri, entry.table_path); - let (mut ds, resolved_branch) = match active_branch.as_deref() { - Some(active_branch) => match entry.table_branch.as_deref() { - None => continue, - _ => { - self.open_owned_dataset_for_branch_write( - &table_key, - &full_path, - entry.table_branch.as_deref(), - entry.table_version, - active_branch, - ) - .await? - } - }, - None => ( - self.table_store - .open_dataset_head_for_write(&table_key, &full_path, None) - .await?, - None, - ), - }; - let row_count = self.table_store.count_rows(&ds, None).await.unwrap_or(0); - if row_count > 0 { - self.build_indices_on_dataset(&table_key, &mut ds).await?; - } - - let state = self.table_store.table_state(&full_path, &ds).await?; - if state.version != entry.table_version - || resolved_branch.as_deref() != entry.table_branch.as_deref() - { - updates.push(crate::db::SubTableUpdate { - table_key, - table_version: state.version, - table_branch: resolved_branch, - row_count: state.row_count, - version_metadata: state.version_metadata, - }); - } - } - - for edge_name in self.catalog.edge_types.keys() { - let table_key = format!("edge:{}", edge_name); - let Some(entry) = snapshot.entry(&table_key) else { - continue; - }; - let full_path = format!("{}/{}", self.root_uri, entry.table_path); - let (mut ds, resolved_branch) = match active_branch.as_deref() { - Some(active_branch) => match entry.table_branch.as_deref() { - None => continue, - _ => { - self.open_owned_dataset_for_branch_write( - &table_key, - &full_path, - entry.table_branch.as_deref(), - entry.table_version, - active_branch, - ) - .await? - } - }, - None => ( - self.table_store - .open_dataset_head_for_write(&table_key, &full_path, None) - .await?, - None, - ), - }; - let row_count = self.table_store.count_rows(&ds, None).await.unwrap_or(0); - if row_count > 0 { - self.build_indices_on_dataset(&table_key, &mut ds).await?; - } - - let state = self.table_store.table_state(&full_path, &ds).await?; - if state.version != entry.table_version - || resolved_branch.as_deref() != entry.table_branch.as_deref() - { - updates.push(crate::db::SubTableUpdate { - table_key, - table_version: state.version, - table_branch: resolved_branch, - row_count: state.row_count, - version_metadata: state.version_metadata, - }); - } - } - - if !updates.is_empty() { - self.commit_prepared_updates_on_branch(branch, &updates) - .await?; - } - - Ok(()) + table_ops::ensure_indices_on(self, branch).await } /// Read a blob from a node by its string ID and property name. @@ -1675,9 +1001,7 @@ impl Omnigraph { &self, table_key: &str, ) -> Result<(Dataset, String, Option)> { - let current_branch = self.coordinator.current_branch().map(str::to_string); - self.open_for_mutation_on_branch(current_branch.as_deref(), table_key) - .await + table_ops::open_for_mutation(self, table_key).await } pub(crate) async fn open_for_mutation_on_branch( @@ -1685,77 +1009,7 @@ impl Omnigraph { branch: Option<&str>, table_key: &str, ) -> Result<(Dataset, String, Option)> { - self.ensure_schema_apply_not_locked("write").await?; - let resolved = self.resolved_branch_target(branch).await?; - let entry = resolved - .snapshot - .entry(table_key) - .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; - let full_path = format!("{}/{}", self.root_uri, entry.table_path); - match resolved.branch.as_deref() { - None => { - let ds = self - .table_store - .open_dataset_head_for_write(table_key, &full_path, None) - .await?; - self.table_store - .ensure_expected_version(&ds, table_key, entry.table_version)?; - Ok((ds, full_path, None)) - } - Some(active_branch) => { - let (ds, table_branch) = self - .open_owned_dataset_for_branch_write( - table_key, - &full_path, - entry.table_branch.as_deref(), - entry.table_version, - active_branch, - ) - .await?; - Ok((ds, full_path, table_branch)) - } - } - } - - /// Open the dataset that should receive a branch-local metadata or data - /// write, forking it from the manifest-pinned source state when the active - /// branch does not yet own the subtable. - pub(crate) async fn open_owned_dataset_for_branch_write( - &self, - table_key: &str, - full_path: &str, - entry_branch: Option<&str>, - entry_version: u64, - active_branch: &str, - ) -> Result<(Dataset, Option)> { - match entry_branch { - Some(branch) if branch == active_branch => { - let ds = self - .table_store - .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) - .await?; - self.table_store - .ensure_expected_version(&ds, table_key, entry_version)?; - Ok((ds, Some(active_branch.to_string()))) - } - source_branch => { - self.fork_dataset_from_entry_state( - table_key, - full_path, - source_branch, - entry_version, - active_branch, - ) - .await?; - let ds = self - .table_store - .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) - .await?; - self.table_store - .ensure_expected_version(&ds, table_key, entry_version)?; - Ok((ds, Some(active_branch.to_string()))) - } - } + table_ops::open_for_mutation_on_branch(self, branch, table_key).await } pub(crate) async fn fork_dataset_from_entry_state( @@ -1766,17 +1020,15 @@ impl Omnigraph { source_version: u64, active_branch: &str, ) -> Result { - let ds = self - .table_store - .fork_branch_from_state( - full_path, - source_branch, - table_key, - source_version, - active_branch, - ) - .await?; - Ok(ds) + table_ops::fork_dataset_from_entry_state( + self, + table_key, + full_path, + source_branch, + source_version, + active_branch, + ) + .await } pub(crate) async fn reopen_for_mutation( @@ -1786,9 +1038,7 @@ impl Omnigraph { table_branch: Option<&str>, expected_version: u64, ) -> Result { - self.ensure_schema_apply_not_locked("write").await?; - self.table_store - .reopen_for_mutation(full_path, table_branch, table_key, expected_version) + table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version) .await } @@ -1798,9 +1048,7 @@ impl Omnigraph { table_branch: Option<&str>, table_version: u64, ) -> Result { - self.table_store - .open_dataset_at_state(table_path, table_branch, table_version) - .await + table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await } pub(crate) async fn build_indices_on_dataset( @@ -1808,8 +1056,7 @@ impl Omnigraph { table_key: &str, ds: &mut Dataset, ) -> Result<()> { - self.build_indices_on_dataset_for_catalog(&self.catalog, table_key, ds) - .await + table_ops::build_indices_on_dataset(self, table_key, ds).await } pub(crate) async fn build_indices_on_dataset_for_catalog( @@ -1818,194 +1065,21 @@ impl Omnigraph { table_key: &str, ds: &mut Dataset, ) -> Result<()> { - if let Some(type_name) = table_key.strip_prefix("node:") { - if !self.table_store.has_btree_index(ds, "id").await? { - self.table_store - .create_btree_index(ds, &["id"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e)) - })?; - } - - if let Some(node_type) = catalog.node_types.get(type_name) { - for index_cols in &node_type.indices { - if index_cols.len() != 1 { - continue; - } - let prop_name = &index_cols[0]; - if let Some(prop_type) = node_type.properties.get(prop_name) { - if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { - if !self.table_store.has_fts_index(ds, prop_name).await? { - self.table_store - .create_inverted_index(ds, prop_name.as_str()) - .await - .map_err(|e| { - OmniError::Lance(format!( - "create Inverted index on {}({}): {}", - table_key, prop_name, e - )) - })?; - } - } else if matches!(prop_type.scalar, ScalarType::Vector(_)) - && !prop_type.list - { - if !self.table_store.has_vector_index(ds, prop_name).await? { - self.table_store - .create_vector_index(ds, prop_name.as_str()) - .await - .map_err(|e| { - OmniError::Lance(format!( - "create Vector index on {}({}): {}", - table_key, prop_name, e - )) - })?; - } - } - } - } - } - return Ok(()); - } - - if table_key.starts_with("edge:") { - if !self.table_store.has_btree_index(ds, "id").await? { - self.table_store - .create_btree_index(ds, &["id"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e)) - })?; - } - if !self.table_store.has_btree_index(ds, "src").await? { - self.table_store - .create_btree_index(ds, &["src"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(src): {}", table_key, e)) - })?; - } - if !self.table_store.has_btree_index(ds, "dst").await? { - self.table_store - .create_btree_index(ds, &["dst"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(dst): {}", table_key, e)) - })?; - } - return Ok(()); - } - - Err(OmniError::manifest(format!( - "invalid table key '{}'", - table_key - ))) - } - - async fn prepare_updates_for_commit( - &self, - branch: Option<&str>, - updates: &[crate::db::SubTableUpdate], - ) -> Result> { - if updates.is_empty() { - return Ok(Vec::new()); - } - - let snapshot = self.snapshot_for_branch(branch).await?; - let mut prepared = Vec::with_capacity(updates.len()); - - for update in updates { - let Some(entry) = snapshot.entry(&update.table_key) else { - return Err(OmniError::manifest(format!( - "no manifest entry for {}", - update.table_key - ))); - }; - - let mut prepared_update = update.clone(); - if prepared_update.row_count > 0 { - let full_path = format!("{}/{}", self.root_uri, entry.table_path); - let mut ds = self - .reopen_for_mutation( - &prepared_update.table_key, - &full_path, - prepared_update.table_branch.as_deref(), - prepared_update.table_version, - ) - .await?; - self.build_indices_on_dataset(&prepared_update.table_key, &mut ds) - .await?; - let state = self.table_store.table_state(&full_path, &ds).await?; - prepared_update.table_version = state.version; - prepared_update.row_count = state.row_count; - prepared_update.version_metadata = state.version_metadata; - } - - prepared.push(prepared_update); - } - - Ok(prepared) - } - - async fn commit_prepared_updates( - &mut self, - updates: &[crate::db::SubTableUpdate], - ) -> Result { - let actor_id = self.current_audit_actor().map(str::to_string); - let PublishedSnapshot { - manifest_version, - _snapshot_id: _, - } = self - .coordinator - .commit_updates_with_actor(updates, actor_id.as_deref()) - .await?; - Ok(manifest_version) - } - - async fn commit_prepared_updates_on_branch( - &mut self, - branch: Option<&str>, - updates: &[crate::db::SubTableUpdate], - ) -> Result { - let current_branch = self.coordinator.current_branch().map(str::to_string); - let requested_branch = branch.map(str::to_string); - if requested_branch == current_branch { - return self.commit_prepared_updates(updates).await; - } - - let mut coordinator = match requested_branch.as_deref() { - Some(branch) => { - GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await? - } - None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await?, - }; - let actor_id = self.current_audit_actor().map(str::to_string); - let PublishedSnapshot { - manifest_version, - _snapshot_id: _, - } = coordinator - .commit_updates_with_actor(updates, actor_id.as_deref()) - .await?; - Ok(manifest_version) + table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await } pub(crate) async fn commit_updates( &mut self, updates: &[crate::db::SubTableUpdate], ) -> Result { - self.ensure_schema_apply_not_locked("write commit").await?; - let current_branch = self.coordinator.current_branch().map(str::to_string); - let prepared = self - .prepare_updates_for_commit(current_branch.as_deref(), updates) - .await?; - self.commit_prepared_updates(&prepared).await + table_ops::commit_updates(self, updates).await } pub(crate) async fn commit_manifest_updates( &mut self, updates: &[crate::db::SubTableUpdate], ) -> Result { - self.coordinator.commit_manifest_updates(updates).await + table_ops::commit_manifest_updates(self, updates).await } pub(crate) async fn record_merge_commit( @@ -2014,16 +1088,13 @@ impl Omnigraph { parent_commit_id: &str, merged_parent_commit_id: &str, ) -> Result { - let actor_id = self.current_audit_actor().map(str::to_string); - self.coordinator - .record_merge_commit( - manifest_version, - parent_commit_id, - merged_parent_commit_id, - actor_id.as_deref(), - ) - .await - .map(|snapshot_id| snapshot_id.as_str().to_string()) + table_ops::record_merge_commit( + self, + manifest_version, + parent_commit_id, + merged_parent_commit_id, + ) + .await } pub(crate) async fn commit_updates_on_branch( @@ -2031,19 +1102,16 @@ impl Omnigraph { branch: Option<&str>, updates: &[crate::db::SubTableUpdate], ) -> Result { - self.ensure_schema_apply_not_locked("write commit").await?; - let prepared = self.prepare_updates_for_commit(branch, updates).await?; - self.commit_prepared_updates_on_branch(branch, &prepared) - .await + table_ops::commit_updates_on_branch(self, branch, updates).await } pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> { - self.coordinator.ensure_commit_graph_initialized().await + table_ops::ensure_commit_graph_initialized(self).await } /// Invalidate the cached graph index. Called after edge mutations. pub(crate) async fn invalidate_graph_index(&self) { - self.runtime_cache.invalidate_all().await; + table_ops::invalidate_graph_index(self).await } async fn batch_for_table_rewrite( @@ -2051,413 +1119,8 @@ impl Omnigraph { source_ds: &Dataset, table_key: &str, ) -> Result { - self.batch_for_schema_apply_rewrite( - source_ds, - table_key, - &self.catalog, - table_key, - &self.catalog, - None, - ) - .await + schema_apply::batch_for_table_rewrite(self, source_ds, table_key).await } - - async fn batch_for_schema_apply_rewrite( - &self, - source_ds: &Dataset, - source_table_key: &str, - source_catalog: &Catalog, - target_table_key: &str, - target_catalog: &Catalog, - property_renames: Option<&HashMap>, - ) -> Result { - let target_schema = schema_for_table_key(target_catalog, target_table_key)?; - let source_blob_properties = - blob_properties_for_table_key(source_catalog, source_table_key)?; - let target_blob_properties = - blob_properties_for_table_key(target_catalog, target_table_key)?; - let needs_row_ids = - !source_blob_properties.is_empty() || !target_blob_properties.is_empty(); - let batches = if needs_row_ids { - self.table_store() - .scan_with(source_ds, None, None, None, true, |_| Ok(())) - .await? - } else { - self.table_store().scan_batches(source_ds).await? - }; - if batches.is_empty() { - return Ok(RecordBatch::new_empty(target_schema)); - } - let source_schema = batches[0].schema(); - let batch = concat_or_empty_batches(source_schema, batches)?; - - let row_ids = if needs_row_ids { - Some( - batch - .column_by_name("_rowid") - .and_then(|col| col.as_any().downcast_ref::()) - .ok_or_else(|| { - OmniError::Lance(format!( - "expected _rowid column when rewriting '{}'", - source_table_key - )) - })? - .values() - .iter() - .copied() - .collect::>(), - ) - } else { - None - }; - - let mut columns = Vec::with_capacity(target_schema.fields().len()); - for field in target_schema.fields() { - let source_name = property_renames - .and_then(|renames| renames.get(field.name())) - .map(String::as_str) - .unwrap_or_else(|| field.name().as_str()); - if let Some(column) = batch.column_by_name(source_name) { - if target_blob_properties.contains(field.name()) - && source_blob_properties.contains(source_name) - { - let descriptions = - column - .as_any() - .downcast_ref::() - .ok_or_else(|| { - OmniError::Lance(format!( - "expected blob descriptions for '{}.{}'", - source_table_key, source_name - )) - })?; - let rebuilt = self - .rebuild_blob_column( - source_ds, - source_name, - descriptions, - row_ids.as_deref().unwrap_or(&[]), - ) - .await?; - columns.push(rebuilt); - } else { - columns.push(column.clone()); - } - } else { - columns.push(new_null_array(field.data_type(), batch.num_rows())); - } - } - - RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string())) - } - - async fn rebuild_blob_column( - &self, - source_ds: &Dataset, - column_name: &str, - descriptions: &StructArray, - row_ids: &[u64], - ) -> Result> { - let mut builder = BlobArrayBuilder::new(row_ids.len()); - let mut non_null_row_ids = Vec::new(); - let mut row_has_blob = Vec::with_capacity(row_ids.len()); - - for row in 0..row_ids.len() { - let is_null = blob_description_is_null(descriptions, row)?; - row_has_blob.push(!is_null); - if !is_null { - non_null_row_ids.push(row_ids[row]); - } - } - - let blob_files = if non_null_row_ids.is_empty() { - Vec::new() - } else { - Arc::new(source_ds.clone()) - .take_blobs(&non_null_row_ids, column_name) - .await - .map_err(|e| OmniError::Lance(e.to_string()))? - }; - - let mut files = blob_files.into_iter(); - for has_blob in row_has_blob { - if !has_blob { - builder - .push_null() - .map_err(|e| OmniError::Lance(e.to_string()))?; - continue; - } - - let blob = files.next().ok_or_else(|| { - OmniError::Lance(format!( - "blob rewrite for '{}' lost alignment with source rows", - column_name - )) - })?; - if let Some(uri) = blob.uri() { - builder - .push_uri(uri) - .map_err(|e| OmniError::Lance(e.to_string()))?; - } else { - builder - .push_bytes( - blob.read() - .await - .map_err(|e| OmniError::Lance(e.to_string()))?, - ) - .map_err(|e| OmniError::Lance(e.to_string()))?; - } - } - - if files.next().is_some() { - return Err(OmniError::Lance(format!( - "blob rewrite for '{}' produced extra source blobs", - column_name - ))); - } - - builder - .finish() - .map_err(|e| OmniError::Lance(e.to_string())) - } - - async fn export_blob_values( - &self, - source_ds: &Dataset, - batch: &RecordBatch, - row_ids: &[u64], - blob_properties: &std::collections::HashSet, - ) -> Result>>> { - let mut values = HashMap::with_capacity(blob_properties.len()); - for property in blob_properties { - let descriptions = batch - .column_by_name(property) - .and_then(|col| col.as_any().downcast_ref::()) - .ok_or_else(|| { - OmniError::Lance(format!( - "expected blob descriptions for export column '{}'", - property - )) - })?; - values.insert( - property.clone(), - export_blob_column_values(source_ds, property, descriptions, row_ids).await?, - ); - } - Ok(values) - } - - fn write_export_rows_from_batch( - &self, - table_key: &str, - batch: &RecordBatch, - blob_values: Option<&HashMap>>>, - writer: &mut W, - ) -> Result<()> { - if let Some(type_name) = table_key.strip_prefix("node:") { - let node_type = - self.catalog.node_types.get(type_name).ok_or_else(|| { - OmniError::manifest(format!("unknown node type '{}'", type_name)) - })?; - for row in 0..batch.num_rows() { - let mut data = serde_json::Map::new(); - data.insert( - "id".to_string(), - json_value_from_named_column(batch, "id", row)?, - ); - for field in node_type.arrow_schema.fields().iter().skip(1) { - data.insert( - field.name().clone(), - export_value_for_field( - batch, - field.name(), - row, - blob_values.and_then(|values| values.get(field.name())), - )?, - ); - } - write_export_jsonl_row( - writer, - table_key, - &serde_json::json!({ - "type": type_name, - "data": serde_json::Value::Object(data), - }), - )?; - } - return Ok(()); - } - - if let Some(edge_name) = table_key.strip_prefix("edge:") { - let edge_type = - self.catalog.edge_types.get(edge_name).ok_or_else(|| { - OmniError::manifest(format!("unknown edge type '{}'", edge_name)) - })?; - for row in 0..batch.num_rows() { - let from = named_string_value(batch, "src", row)?; - let to = named_string_value(batch, "dst", row)?; - let mut data = serde_json::Map::new(); - data.insert( - "id".to_string(), - json_value_from_named_column(batch, "id", row)?, - ); - for field in edge_type.arrow_schema.fields().iter().skip(3) { - data.insert( - field.name().clone(), - export_value_for_field( - batch, - field.name(), - row, - blob_values.and_then(|values| values.get(field.name())), - )?, - ); - } - write_export_jsonl_row( - writer, - table_key, - &serde_json::json!({ - "edge": edge_name, - "from": from, - "to": to, - "data": serde_json::Value::Object(data), - }), - )?; - } - return Ok(()); - } - - Err(OmniError::manifest(format!( - "invalid export table key '{}'", - table_key - ))) - } -} - -fn write_export_jsonl_row( - writer: &mut W, - table_key: &str, - row: &serde_json::Value, -) -> Result<()> { - serde_json::to_writer(&mut *writer, row).map_err(|err| { - OmniError::manifest(format!( - "failed to serialize export row for '{}': {}", - table_key, err - )) - })?; - writer.write_all(b"\n")?; - Ok(()) -} - -async fn export_blob_column_values( - source_ds: &Dataset, - column_name: &str, - descriptions: &StructArray, - row_ids: &[u64], -) -> Result>> { - let mut non_null_row_ids = Vec::new(); - let mut non_null_positions = Vec::new(); - let mut values = vec![None; row_ids.len()]; - - for (row, row_id) in row_ids.iter().enumerate() { - if blob_description_is_null(descriptions, row)? { - continue; - } - non_null_row_ids.push(*row_id); - non_null_positions.push(row); - } - - if non_null_row_ids.is_empty() { - return Ok(values); - } - - // Sort row IDs before calling take_blobs — Lance 4's unsorted path has - // a bug that duplicates the _rowaddr column in the returned batch. - let mut perm: Vec = (0..non_null_row_ids.len()).collect(); - perm.sort_by_key(|&i| non_null_row_ids[i]); - let sorted_ids: Vec = perm.iter().map(|&i| non_null_row_ids[i]).collect(); - - let sorted_blobs = Arc::new(source_ds.clone()) - .take_blobs(&sorted_ids, column_name) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - - if sorted_blobs.len() != non_null_positions.len() { - return Err(OmniError::Lance(format!( - "blob export for '{}' lost alignment with selected rows", - column_name - ))); - } - - // Restore original order via inverse permutation. Build an index that - // maps each original position to the sorted position so we can iterate - // non_null_positions in order and pick the right blob. - let mut inverse_perm = vec![0usize; perm.len()]; - for (sorted_pos, &orig_pos) in perm.iter().enumerate() { - inverse_perm[orig_pos] = sorted_pos; - } - - for (idx, position) in non_null_positions.into_iter().enumerate() { - let blob = &sorted_blobs[inverse_perm[idx]]; - let value = if let Some(uri) = blob.uri() { - uri.to_string() - } else { - let bytes = blob - .read() - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - format!( - "base64:{}", - base64::Engine::encode(&base64::engine::general_purpose::STANDARD, bytes) - ) - }; - values[position] = Some(value); - } - - Ok(values) -} - -fn export_value_for_field( - batch: &RecordBatch, - field_name: &str, - row: usize, - blob_values: Option<&Vec>>, -) -> Result { - if let Some(blob_values) = blob_values { - return Ok(blob_values - .get(row) - .and_then(|value| value.clone()) - .map(serde_json::Value::String) - .unwrap_or(serde_json::Value::Null)); - } - json_value_from_named_column(batch, field_name, row) -} - -fn json_value_from_named_column( - batch: &RecordBatch, - field_name: &str, - row: usize, -) -> Result { - let column = batch.column_by_name(field_name).ok_or_else(|| { - OmniError::Lance(format!("missing column '{}' in export batch", field_name)) - })?; - json_value_from_array(column.as_ref(), row) -} - -fn named_string_value(batch: &RecordBatch, field_name: &str, row: usize) -> Result { - let column = batch.column_by_name(field_name).ok_or_else(|| { - OmniError::Lance(format!("missing column '{}' in export batch", field_name)) - })?; - let array = column - .as_any() - .downcast_ref::() - .ok_or_else(|| OmniError::Lance(format!("expected Utf8 column '{}'", field_name)))?; - if array.is_null(row) { - return Err(OmniError::Lance(format!( - "unexpected null in export column '{}'", - field_name - ))); - } - Ok(array.value(row).to_string()) } pub(crate) fn normalize_branch_name(branch: &str) -> Result> { @@ -2474,6 +1137,12 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result> { } fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> { + if super::is_internal_run_branch(branch) { + return Err(OmniError::manifest(format!( + "{} does not allow internal run ref '{}'", + operation, branch + ))); + } if is_internal_system_branch(branch) { return Err(OmniError::manifest(format!( "{} does not allow internal system ref '{}'", diff --git a/crates/omnigraph/src/db/omnigraph/export.rs b/crates/omnigraph/src/db/omnigraph/export.rs new file mode 100644 index 0000000..7269278 --- /dev/null +++ b/crates/omnigraph/src/db/omnigraph/export.rs @@ -0,0 +1,410 @@ +use super::*; + +pub(super) async fn entity_at_target( + db: &Omnigraph, + target: impl Into, + table_key: &str, + id: &str, +) -> Result> { + let resolved = db.resolved_target(target).await?; + entity_from_snapshot(db, &resolved.snapshot, table_key, id).await +} + +pub(super) async fn entity_at( + db: &Omnigraph, + table_key: &str, + id: &str, + version: u64, +) -> Result> { + let snap = db.coordinator.snapshot_at_version(version).await?; + entity_from_snapshot(db, &snap, table_key, id).await +} + +pub(super) async fn export_jsonl( + db: &Omnigraph, + branch: &str, + type_names: &[String], + table_keys: &[String], +) -> Result { + let mut out = Vec::new(); + export_jsonl_to_writer(db, branch, type_names, table_keys, &mut out).await?; + String::from_utf8(out) + .map_err(|err| OmniError::manifest(format!("export produced invalid UTF-8: {}", err))) +} + +pub(super) async fn export_jsonl_to_writer( + db: &Omnigraph, + branch: &str, + type_names: &[String], + table_keys: &[String], + writer: &mut W, +) -> Result<()> { + db.ensure_schema_state_valid().await?; + let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?; + export_snapshot_jsonl_to_writer(db, &snapshot, type_names, table_keys, writer).await +} + +async fn entity_from_snapshot( + db: &Omnigraph, + snapshot: &Snapshot, + table_key: &str, + id: &str, +) -> Result> { + if snapshot.entry(table_key).is_none() { + return Ok(None); + } + + let ds = db + .table_store + .open_snapshot_table(snapshot, table_key) + .await?; + let filter_sql = format!("id = '{}'", id.replace('\'', "''")); + let batches = db + .table_store + .scan(&ds, None, Some(&filter_sql), None) + .await?; + let Some(batch) = batches.iter().find(|batch| batch.num_rows() > 0) else { + return Ok(None); + }; + Ok(Some(record_batch_row_to_json(batch, 0)?)) +} + +async fn export_snapshot_jsonl_to_writer( + db: &Omnigraph, + snapshot: &Snapshot, + type_names: &[String], + table_keys: &[String], + writer: &mut W, +) -> Result<()> { + let selected_tables = export_table_keys(snapshot, type_names, table_keys)?; + for table_key in selected_tables { + export_table_to_writer(db, snapshot, &table_key, writer).await?; + } + Ok(()) +} + +fn export_table_keys( + snapshot: &Snapshot, + type_names: &[String], + table_keys: &[String], +) -> Result> { + let available = snapshot + .entries() + .map(|entry| entry.table_key.clone()) + .collect::>(); + let mut selected = BTreeSet::new(); + + for table_key in table_keys { + if !available.contains(table_key) { + return Err(OmniError::manifest(format!( + "unknown export table '{}'", + table_key + ))); + } + selected.insert(table_key.clone()); + } + + for type_name in type_names { + let mut matched = false; + let node_key = format!("node:{}", type_name); + if available.contains(&node_key) { + selected.insert(node_key); + matched = true; + } + let edge_key = format!("edge:{}", type_name); + if available.contains(&edge_key) { + selected.insert(edge_key); + matched = true; + } + if !matched { + return Err(OmniError::manifest(format!( + "unknown export type '{}'", + type_name + ))); + } + } + + if selected.is_empty() { + return Ok(available.into_iter().collect()); + } + + Ok(selected.into_iter().collect()) +} + +async fn export_table_to_writer( + db: &Omnigraph, + snapshot: &Snapshot, + table_key: &str, + writer: &mut W, +) -> Result<()> { + let ds = db + .table_store + .open_snapshot_table(snapshot, table_key) + .await?; + let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]); + let blob_properties = blob_properties_for_table_key(db.catalog(), table_key)?; + + if blob_properties.is_empty() { + for batch in db.table_store.scan(&ds, None, None, ordering).await? { + write_export_rows_from_batch(db, table_key, &batch, None, writer)?; + } + return Ok(()); + } + + let batches = db + .table_store + .scan_with(&ds, None, None, ordering, true, |_| Ok(())) + .await?; + for batch in batches { + let row_ids = batch + .column_by_name("_rowid") + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| { + OmniError::Lance(format!( + "expected _rowid column when exporting '{}'", + table_key + )) + })? + .values() + .iter() + .copied() + .collect::>(); + let blob_values = export_blob_values(&ds, &batch, &row_ids, blob_properties).await?; + write_export_rows_from_batch(db, table_key, &batch, Some(&blob_values), writer)?; + } + Ok(()) +} + +async fn export_blob_values( + source_ds: &Dataset, + batch: &RecordBatch, + row_ids: &[u64], + blob_properties: &std::collections::HashSet, +) -> Result>>> { + let mut values = HashMap::with_capacity(blob_properties.len()); + for property in blob_properties { + let descriptions = batch + .column_by_name(property) + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| { + OmniError::Lance(format!( + "expected blob descriptions for export column '{}'", + property + )) + })?; + values.insert( + property.clone(), + export_blob_column_values(source_ds, property, descriptions, row_ids).await?, + ); + } + Ok(values) +} + +fn write_export_rows_from_batch( + db: &Omnigraph, + table_key: &str, + batch: &RecordBatch, + blob_values: Option<&HashMap>>>, + writer: &mut W, +) -> Result<()> { + if let Some(type_name) = table_key.strip_prefix("node:") { + let node_type = db + .catalog + .node_types + .get(type_name) + .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?; + for row in 0..batch.num_rows() { + let mut data = serde_json::Map::new(); + data.insert( + "id".to_string(), + json_value_from_named_column(batch, "id", row)?, + ); + for field in node_type.arrow_schema.fields().iter().skip(1) { + data.insert( + field.name().clone(), + export_value_for_field( + batch, + field.name(), + row, + blob_values.and_then(|values| values.get(field.name())), + )?, + ); + } + write_export_jsonl_row( + writer, + table_key, + &serde_json::json!({ + "type": type_name, + "data": serde_json::Value::Object(data), + }), + )?; + } + return Ok(()); + } + + if let Some(edge_name) = table_key.strip_prefix("edge:") { + let edge_type = db + .catalog + .edge_types + .get(edge_name) + .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?; + for row in 0..batch.num_rows() { + let from = named_string_value(batch, "src", row)?; + let to = named_string_value(batch, "dst", row)?; + let mut data = serde_json::Map::new(); + data.insert( + "id".to_string(), + json_value_from_named_column(batch, "id", row)?, + ); + for field in edge_type.arrow_schema.fields().iter().skip(3) { + data.insert( + field.name().clone(), + export_value_for_field( + batch, + field.name(), + row, + blob_values.and_then(|values| values.get(field.name())), + )?, + ); + } + write_export_jsonl_row( + writer, + table_key, + &serde_json::json!({ + "edge": edge_name, + "from": from, + "to": to, + "data": serde_json::Value::Object(data), + }), + )?; + } + return Ok(()); + } + + Err(OmniError::manifest(format!( + "invalid export table key '{}'", + table_key + ))) +} + +fn write_export_jsonl_row( + writer: &mut W, + table_key: &str, + row: &serde_json::Value, +) -> Result<()> { + serde_json::to_writer(&mut *writer, row).map_err(|err| { + OmniError::manifest(format!( + "failed to serialize export row for '{}': {}", + table_key, err + )) + })?; + writer.write_all(b"\n")?; + Ok(()) +} + +async fn export_blob_column_values( + source_ds: &Dataset, + column_name: &str, + descriptions: &StructArray, + row_ids: &[u64], +) -> Result>> { + let mut non_null_row_ids = Vec::new(); + let mut non_null_positions = Vec::new(); + let mut values = vec![None; row_ids.len()]; + + for (row, row_id) in row_ids.iter().enumerate() { + if blob_description_is_null(descriptions, row)? { + continue; + } + non_null_row_ids.push(*row_id); + non_null_positions.push(row); + } + + if non_null_row_ids.is_empty() { + return Ok(values); + } + + let mut perm: Vec = (0..non_null_row_ids.len()).collect(); + perm.sort_by_key(|&i| non_null_row_ids[i]); + let sorted_ids: Vec = perm.iter().map(|&i| non_null_row_ids[i]).collect(); + + let sorted_blobs = Arc::new(source_ds.clone()) + .take_blobs(&sorted_ids, column_name) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + + if sorted_blobs.len() != non_null_positions.len() { + return Err(OmniError::Lance(format!( + "blob export for '{}' lost alignment with selected rows", + column_name + ))); + } + + let mut inverse_perm = vec![0usize; perm.len()]; + for (sorted_pos, &orig_pos) in perm.iter().enumerate() { + inverse_perm[orig_pos] = sorted_pos; + } + + for (idx, position) in non_null_positions.into_iter().enumerate() { + let blob = &sorted_blobs[inverse_perm[idx]]; + let value = if let Some(uri) = blob.uri() { + uri.to_string() + } else { + let bytes = blob + .read() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + format!( + "base64:{}", + base64::Engine::encode(&base64::engine::general_purpose::STANDARD, bytes) + ) + }; + values[position] = Some(value); + } + + Ok(values) +} + +fn export_value_for_field( + batch: &RecordBatch, + field_name: &str, + row: usize, + blob_values: Option<&Vec>>, +) -> Result { + if let Some(blob_values) = blob_values { + return Ok(blob_values + .get(row) + .and_then(|value| value.clone()) + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null)); + } + json_value_from_named_column(batch, field_name, row) +} + +fn json_value_from_named_column( + batch: &RecordBatch, + field_name: &str, + row: usize, +) -> Result { + let column = batch.column_by_name(field_name).ok_or_else(|| { + OmniError::Lance(format!("missing column '{}' in export batch", field_name)) + })?; + json_value_from_array(column.as_ref(), row) +} + +fn named_string_value(batch: &RecordBatch, field_name: &str, row: usize) -> Result { + let column = batch.column_by_name(field_name).ok_or_else(|| { + OmniError::Lance(format!("missing column '{}' in export batch", field_name)) + })?; + let array = column + .as_any() + .downcast_ref::() + .ok_or_else(|| OmniError::Lance(format!("expected Utf8 column '{}'", field_name)))?; + if array.is_null(row) { + return Err(OmniError::Lance(format!( + "unexpected null in export column '{}'", + field_name + ))); + } + Ok(array.value(row).to_string()) +} diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs new file mode 100644 index 0000000..17adc60 --- /dev/null +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -0,0 +1,596 @@ +use super::*; + +pub(super) async fn plan_schema( + db: &Omnigraph, + desired_schema_source: &str, +) -> Result { + db.ensure_schema_state_valid().await?; + let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?; + let desired_ir = read_schema_ir_from_source(desired_schema_source)?; + plan_schema_migration(&accepted_ir, &desired_ir) + .map_err(|err| OmniError::manifest(err.to_string())) +} + +pub(super) async fn apply_schema( + db: &mut Omnigraph, + desired_schema_source: &str, +) -> Result { + acquire_schema_apply_lock(db).await?; + let result = apply_schema_with_lock(db, desired_schema_source).await; + let release_result = release_schema_apply_lock(db).await; + match (result, release_result) { + (Ok(result), Ok(())) => Ok(result), + (Ok(_), Err(err)) => Err(err), + (Err(err), Ok(())) => Err(err), + (Err(err), Err(_)) => Err(err), + } +} + +pub(super) async fn apply_schema_with_lock( + db: &mut Omnigraph, + desired_schema_source: &str, +) -> Result { + db.ensure_schema_state_valid().await?; + let branches = db.coordinator.all_branches().await?; + let blocking_branches = branches + .into_iter() + .filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch)) + .collect::>(); + if !blocking_branches.is_empty() { + return Err(OmniError::manifest_conflict(format!( + "schema apply requires a repo with only main; found non-main branches: {}", + blocking_branches.join(", ") + ))); + } + + let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?; + let desired_ir = read_schema_ir_from_source(desired_schema_source)?; + let plan = plan_schema_migration(&accepted_ir, &desired_ir) + .map_err(|err| OmniError::manifest(err.to_string()))?; + if !plan.supported { + let reason = plan + .steps + .iter() + .find_map(|step| match step { + SchemaMigrationStep::UnsupportedChange { reason, .. } => Some(reason.as_str()), + _ => None, + }) + .unwrap_or("unsupported schema migration plan"); + return Err(OmniError::manifest(reason.to_string())); + } + if plan.steps.is_empty() { + return Ok(SchemaApplyResult { + supported: true, + applied: false, + manifest_version: db.version(), + steps: plan.steps, + }); + } + + let mut desired_catalog = build_catalog_from_ir(&desired_ir)?; + fixup_blob_schemas(&mut desired_catalog); + + let snapshot = db.snapshot(); + let base_manifest_version = snapshot.version(); + let mut added_tables = BTreeSet::new(); + let mut renamed_tables = HashMap::new(); + let mut rewritten_tables = BTreeSet::new(); + let mut indexed_tables = BTreeSet::new(); + let mut property_renames = HashMap::>::new(); + let mut changed_edge_tables = false; + + for step in &plan.steps { + match step { + SchemaMigrationStep::AddType { type_kind, name } => { + let table_key = schema_table_key(*type_kind, name); + if table_key.starts_with("edge:") { + changed_edge_tables = true; + } + added_tables.insert(table_key); + } + SchemaMigrationStep::RenameType { + type_kind, + from, + to, + } => { + let source_key = schema_table_key(*type_kind, from); + let target_key = schema_table_key(*type_kind, to); + if source_key.starts_with("edge:") { + changed_edge_tables = true; + } + renamed_tables.insert(target_key, source_key); + } + SchemaMigrationStep::AddProperty { + type_kind, + type_name, + .. + } => { + let table_key = schema_table_key(*type_kind, type_name); + if table_key.starts_with("edge:") { + changed_edge_tables = true; + } + rewritten_tables.insert(table_key); + } + SchemaMigrationStep::RenameProperty { + type_kind, + type_name, + from, + to, + } => { + let table_key = schema_table_key(*type_kind, type_name); + if table_key.starts_with("edge:") { + changed_edge_tables = true; + } + rewritten_tables.insert(table_key.clone()); + property_renames + .entry(table_key) + .or_default() + .insert(to.clone(), from.clone()); + } + SchemaMigrationStep::AddConstraint { + type_kind, + type_name, + .. + } => { + indexed_tables.insert(schema_table_key(*type_kind, type_name)); + } + SchemaMigrationStep::UpdateTypeMetadata { .. } + | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {} + SchemaMigrationStep::UnsupportedChange { reason, .. } => { + return Err(OmniError::manifest(reason.clone())); + } + } + } + + let mut table_registrations = HashMap::::new(); + let mut table_updates = HashMap::::new(); + let mut table_tombstones = HashMap::::new(); + + for table_key in &added_tables { + let table_path = table_path_for_table_key(table_key)?; + let dataset_uri = db.table_store.dataset_uri(&table_path); + let schema = schema_for_table_key(&desired_catalog, table_key)?; + let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?; + db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) + .await?; + let state = db.table_store.table_state(&dataset_uri, &ds).await?; + table_registrations.insert(table_key.clone(), table_path); + table_updates.insert( + table_key.clone(), + crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + } + + for (target_table_key, source_table_key) in &renamed_tables { + let source_entry = snapshot.entry(source_table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing source table '{}' for schema rename", + source_table_key + )) + })?; + ensure_snapshot_entry_head_matches(db, source_entry).await?; + let source_ds = snapshot.open(source_table_key).await?; + let batch = batch_for_schema_apply_rewrite( + db, + &source_ds, + source_table_key, + &db.catalog, + target_table_key, + &desired_catalog, + property_renames.get(target_table_key), + ) + .await?; + let table_path = table_path_for_table_key(target_table_key)?; + let dataset_uri = db.table_store.dataset_uri(&table_path); + let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?; + db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds) + .await?; + let state = db.table_store.table_state(&dataset_uri, &target_ds).await?; + table_registrations.insert(target_table_key.clone(), table_path); + table_updates.insert( + target_table_key.clone(), + crate::db::SubTableUpdate { + table_key: target_table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + table_tombstones.insert( + source_table_key.clone(), + source_entry.table_version.saturating_add(1), + ); + } + + for table_key in &rewritten_tables { + if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) { + continue; + } + let entry = snapshot.entry(table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing source table '{}' for schema apply", + table_key + )) + })?; + ensure_snapshot_entry_head_matches(db, entry).await?; + let source_ds = snapshot.open(table_key).await?; + let batch = batch_for_schema_apply_rewrite( + db, + &source_ds, + table_key, + &db.catalog, + table_key, + &desired_catalog, + property_renames.get(table_key), + ) + .await?; + let dataset_uri = db.table_store.dataset_uri(&entry.table_path); + let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?; + db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds) + .await?; + let state = db.table_store.table_state(&dataset_uri, &target_ds).await?; + table_updates.insert( + table_key.clone(), + crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + } + + for table_key in &indexed_tables { + if added_tables.contains(table_key) + || renamed_tables.contains_key(table_key) + || rewritten_tables.contains(table_key) + { + continue; + } + let entry = snapshot.entry(table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing table '{}' for schema index apply", + table_key + )) + })?; + ensure_snapshot_entry_head_matches(db, entry).await?; + let dataset_uri = db.table_store.dataset_uri(&entry.table_path); + let mut ds = db + .table_store + .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) + .await?; + db.table_store + .ensure_expected_version(&ds, table_key, entry.table_version)?; + db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) + .await?; + let state = db.table_store.table_state(&dataset_uri, &ds).await?; + table_updates.insert( + table_key.clone(), + crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }, + ); + } + + let mut manifest_changes = Vec::new(); + for (table_key, table_path) in table_registrations { + manifest_changes.push(ManifestChange::RegisterTable(TableRegistration { + table_key, + table_path, + })); + } + for update in table_updates.into_values() { + manifest_changes.push(ManifestChange::Update(update)); + } + for (table_key, tombstone_version) in table_tombstones { + manifest_changes.push(ManifestChange::Tombstone(TableTombstone { + table_key, + tombstone_version, + })); + } + + db.refresh().await?; + if db.version() != base_manifest_version { + return Err(OmniError::manifest_conflict(format!( + "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress", + base_manifest_version, + db.version() + ))); + } + + let actor_id = db.current_audit_actor().map(str::to_string); + let PublishedSnapshot { + manifest_version, + _snapshot_id: _, + } = db + .coordinator + .commit_changes_with_actor(&manifest_changes, actor_id.as_deref()) + .await?; + + let schema_path = join_uri(&db.root_uri, SCHEMA_SOURCE_FILENAME); + db.storage + .write_text(&schema_path, desired_schema_source) + .await?; + write_schema_contract(&db.root_uri, db.storage.as_ref(), &desired_ir).await?; + + db.catalog = desired_catalog; + db.schema_source = desired_schema_source.to_string(); + db.coordinator.refresh().await?; + db.runtime_cache.invalidate_all().await; + if changed_edge_tables { + db.invalidate_graph_index().await; + } + + Ok(SchemaApplyResult { + supported: true, + applied: true, + manifest_version, + steps: plan.steps, + }) +} + +pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str) -> Result<()> { + db.refresh().await?; + ensure_schema_apply_not_locked(db, operation).await +} + +pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> { + db.ensure_schema_state_valid().await?; + db.refresh().await?; + let branches = db.coordinator.all_branches().await?; + if branches + .iter() + .any(|branch| is_schema_apply_lock_branch(branch)) + { + return Err(OmniError::manifest_conflict( + "schema apply is already in progress".to_string(), + )); + } + + db.coordinator + .branch_create(SCHEMA_APPLY_LOCK_BRANCH) + .await?; + db.refresh().await?; + + let blocking_branches = db + .coordinator + .all_branches() + .await? + .into_iter() + .filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch)) + .collect::>(); + if !blocking_branches.is_empty() { + let _ = release_schema_apply_lock(db).await; + return Err(OmniError::manifest_conflict(format!( + "schema apply requires a repo with only main; found non-main branches: {}", + blocking_branches.join(", ") + ))); + } + + Ok(()) +} + +pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()> { + db.coordinator + .branch_delete(SCHEMA_APPLY_LOCK_BRANCH) + .await?; + db.refresh().await +} + +pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> { + if db + .coordinator + .all_branches() + .await? + .iter() + .any(|branch| is_schema_apply_lock_branch(branch)) + { + return Err(OmniError::manifest_conflict(format!( + "{} is unavailable while schema apply is in progress", + operation + ))); + } + Ok(()) +} + +pub(super) async fn ensure_snapshot_entry_head_matches( + db: &Omnigraph, + entry: &SubTableEntry, +) -> Result<()> { + let dataset_uri = db.table_store.dataset_uri(&entry.table_path); + let ds = db + .table_store + .open_dataset_head_for_write( + &entry.table_key, + &dataset_uri, + entry.table_branch.as_deref(), + ) + .await?; + db.table_store + .ensure_expected_version(&ds, &entry.table_key, entry.table_version) +} + +pub(super) async fn batch_for_table_rewrite( + db: &Omnigraph, + source_ds: &Dataset, + table_key: &str, +) -> Result { + batch_for_schema_apply_rewrite( + db, + source_ds, + table_key, + &db.catalog, + table_key, + &db.catalog, + None, + ) + .await +} + +pub(super) async fn batch_for_schema_apply_rewrite( + db: &Omnigraph, + source_ds: &Dataset, + source_table_key: &str, + source_catalog: &Catalog, + target_table_key: &str, + target_catalog: &Catalog, + property_renames: Option<&HashMap>, +) -> Result { + let target_schema = schema_for_table_key(target_catalog, target_table_key)?; + let source_blob_properties = blob_properties_for_table_key(source_catalog, source_table_key)?; + let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?; + let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty(); + let batches = if needs_row_ids { + db.table_store() + .scan_with(source_ds, None, None, None, true, |_| Ok(())) + .await? + } else { + db.table_store().scan_batches(source_ds).await? + }; + if batches.is_empty() { + return Ok(RecordBatch::new_empty(target_schema)); + } + let source_schema = batches[0].schema(); + let batch = concat_or_empty_batches(source_schema, batches)?; + + let row_ids = if needs_row_ids { + Some( + batch + .column_by_name("_rowid") + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| { + OmniError::Lance(format!( + "expected _rowid column when rewriting '{}'", + source_table_key + )) + })? + .values() + .iter() + .copied() + .collect::>(), + ) + } else { + None + }; + + let mut columns = Vec::with_capacity(target_schema.fields().len()); + for field in target_schema.fields() { + let source_name = property_renames + .and_then(|renames| renames.get(field.name())) + .map(String::as_str) + .unwrap_or_else(|| field.name().as_str()); + if let Some(column) = batch.column_by_name(source_name) { + if target_blob_properties.contains(field.name()) + && source_blob_properties.contains(source_name) + { + let descriptions = + column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OmniError::Lance(format!( + "expected blob descriptions for '{}.{}'", + source_table_key, source_name + )) + })?; + let rebuilt = rebuild_blob_column( + db, + source_ds, + source_name, + descriptions, + row_ids.as_deref().unwrap_or(&[]), + ) + .await?; + columns.push(rebuilt); + } else { + columns.push(column.clone()); + } + } else { + columns.push(new_null_array(field.data_type(), batch.num_rows())); + } + } + + RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string())) +} + +async fn rebuild_blob_column( + _db: &Omnigraph, + source_ds: &Dataset, + column_name: &str, + descriptions: &StructArray, + row_ids: &[u64], +) -> Result> { + let mut builder = BlobArrayBuilder::new(row_ids.len()); + let mut non_null_row_ids = Vec::new(); + let mut row_has_blob = Vec::with_capacity(row_ids.len()); + + for row in 0..row_ids.len() { + let is_null = blob_description_is_null(descriptions, row)?; + row_has_blob.push(!is_null); + if !is_null { + non_null_row_ids.push(row_ids[row]); + } + } + + let blob_files = if non_null_row_ids.is_empty() { + Vec::new() + } else { + Arc::new(source_ds.clone()) + .take_blobs(&non_null_row_ids, column_name) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + }; + + let mut files = blob_files.into_iter(); + for has_blob in row_has_blob { + if !has_blob { + builder + .push_null() + .map_err(|e| OmniError::Lance(e.to_string()))?; + continue; + } + + let blob = files.next().ok_or_else(|| { + OmniError::Lance(format!( + "blob rewrite for '{}' lost alignment with source rows", + column_name + )) + })?; + if let Some(uri) = blob.uri() { + builder + .push_uri(uri) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } else { + builder + .push_bytes( + blob.read() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + ) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + } + + if files.next().is_some() { + return Err(OmniError::Lance(format!( + "blob rewrite for '{}' produced extra source blobs", + column_name + ))); + } + + builder + .finish() + .map_err(|e| OmniError::Lance(e.to_string())) +} diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs new file mode 100644 index 0000000..8250a75 --- /dev/null +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -0,0 +1,506 @@ +use super::*; + +pub(super) async fn graph_index(db: &Omnigraph) -> Result> { + db.ensure_schema_state_valid().await?; + let resolved = db + .coordinator + .resolve_target(&ReadTarget::Branch( + db.coordinator + .current_branch() + .unwrap_or("main") + .to_string(), + )) + .await?; + db.runtime_cache.graph_index(&resolved, &db.catalog).await +} + +pub(super) async fn graph_index_for_resolved( + db: &Omnigraph, + resolved: &ResolvedTarget, +) -> Result> { + db.runtime_cache.graph_index(resolved, &db.catalog).await +} + +pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> { + let current_branch = db.coordinator.current_branch().map(str::to_string); + ensure_indices_for_branch(db, current_branch.as_deref()).await +} + +pub(super) async fn ensure_indices_on(db: &mut Omnigraph, branch: &str) -> Result<()> { + let branch = normalize_branch_name(branch)?; + ensure_indices_for_branch(db, branch.as_deref()).await +} + +pub(super) async fn ensure_indices_for_branch( + db: &mut Omnigraph, + branch: Option<&str>, +) -> Result<()> { + db.ensure_schema_state_valid().await?; + db.ensure_schema_apply_idle("ensure_indices").await?; + let resolved = db.resolved_branch_target(branch).await?; + let snapshot = resolved.snapshot; + let mut updates = Vec::new(); + let active_branch = resolved.branch; + + for type_name in db.catalog.node_types.keys() { + let table_key = format!("node:{}", type_name); + let Some(entry) = snapshot.entry(&table_key) else { + continue; + }; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + let (mut ds, resolved_branch) = match active_branch.as_deref() { + Some(active_branch) => match entry.table_branch.as_deref() { + None => continue, + _ => { + open_owned_dataset_for_branch_write( + db, + &table_key, + &full_path, + entry.table_branch.as_deref(), + entry.table_version, + active_branch, + ) + .await? + } + }, + None => ( + db.table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?, + None, + ), + }; + let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0); + if row_count > 0 { + build_indices_on_dataset(db, &table_key, &mut ds).await?; + } + + let state = db.table_store.table_state(&full_path, &ds).await?; + if state.version != entry.table_version + || resolved_branch.as_deref() != entry.table_branch.as_deref() + { + updates.push(crate::db::SubTableUpdate { + table_key, + table_version: state.version, + table_branch: resolved_branch, + row_count: state.row_count, + version_metadata: state.version_metadata, + }); + } + } + + for edge_name in db.catalog.edge_types.keys() { + let table_key = format!("edge:{}", edge_name); + let Some(entry) = snapshot.entry(&table_key) else { + continue; + }; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + let (mut ds, resolved_branch) = match active_branch.as_deref() { + Some(active_branch) => match entry.table_branch.as_deref() { + None => continue, + _ => { + open_owned_dataset_for_branch_write( + db, + &table_key, + &full_path, + entry.table_branch.as_deref(), + entry.table_version, + active_branch, + ) + .await? + } + }, + None => ( + db.table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?, + None, + ), + }; + let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0); + if row_count > 0 { + build_indices_on_dataset(db, &table_key, &mut ds).await?; + } + + let state = db.table_store.table_state(&full_path, &ds).await?; + if state.version != entry.table_version + || resolved_branch.as_deref() != entry.table_branch.as_deref() + { + updates.push(crate::db::SubTableUpdate { + table_key, + table_version: state.version, + table_branch: resolved_branch, + row_count: state.row_count, + version_metadata: state.version_metadata, + }); + } + } + + if !updates.is_empty() { + commit_prepared_updates_on_branch(db, branch, &updates).await?; + } + + Ok(()) +} + +pub(super) async fn open_for_mutation( + db: &Omnigraph, + table_key: &str, +) -> Result<(Dataset, String, Option)> { + let current_branch = db.coordinator.current_branch().map(str::to_string); + open_for_mutation_on_branch(db, current_branch.as_deref(), table_key).await +} + +pub(super) async fn open_for_mutation_on_branch( + db: &Omnigraph, + branch: Option<&str>, + table_key: &str, +) -> Result<(Dataset, String, Option)> { + db.ensure_schema_apply_not_locked("write").await?; + let resolved = db.resolved_branch_target(branch).await?; + let entry = resolved + .snapshot + .entry(table_key) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + match resolved.branch.as_deref() { + None => { + let ds = db + .table_store + .open_dataset_head_for_write(table_key, &full_path, None) + .await?; + db.table_store + .ensure_expected_version(&ds, table_key, entry.table_version)?; + Ok((ds, full_path, None)) + } + Some(active_branch) => { + let (ds, table_branch) = open_owned_dataset_for_branch_write( + db, + table_key, + &full_path, + entry.table_branch.as_deref(), + entry.table_version, + active_branch, + ) + .await?; + Ok((ds, full_path, table_branch)) + } + } +} + +pub(super) async fn open_owned_dataset_for_branch_write( + db: &Omnigraph, + table_key: &str, + full_path: &str, + entry_branch: Option<&str>, + entry_version: u64, + active_branch: &str, +) -> Result<(Dataset, Option)> { + match entry_branch { + Some(branch) if branch == active_branch => { + let ds = db + .table_store + .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) + .await?; + db.table_store + .ensure_expected_version(&ds, table_key, entry_version)?; + Ok((ds, Some(active_branch.to_string()))) + } + source_branch => { + fork_dataset_from_entry_state( + db, + table_key, + full_path, + source_branch, + entry_version, + active_branch, + ) + .await?; + let ds = db + .table_store + .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) + .await?; + db.table_store + .ensure_expected_version(&ds, table_key, entry_version)?; + Ok((ds, Some(active_branch.to_string()))) + } + } +} + +pub(super) async fn fork_dataset_from_entry_state( + db: &Omnigraph, + table_key: &str, + full_path: &str, + source_branch: Option<&str>, + source_version: u64, + active_branch: &str, +) -> Result { + db.table_store + .fork_branch_from_state( + full_path, + source_branch, + table_key, + source_version, + active_branch, + ) + .await +} + +pub(super) async fn reopen_for_mutation( + db: &Omnigraph, + table_key: &str, + full_path: &str, + table_branch: Option<&str>, + expected_version: u64, +) -> Result { + db.ensure_schema_apply_not_locked("write").await?; + db.table_store + .reopen_for_mutation(full_path, table_branch, table_key, expected_version) + .await +} + +pub(super) async fn open_dataset_at_state( + db: &Omnigraph, + table_path: &str, + table_branch: Option<&str>, + table_version: u64, +) -> Result { + db.table_store + .open_dataset_at_state(table_path, table_branch, table_version) + .await +} + +pub(super) async fn build_indices_on_dataset( + db: &Omnigraph, + table_key: &str, + ds: &mut Dataset, +) -> Result<()> { + build_indices_on_dataset_for_catalog(db, &db.catalog, table_key, ds).await +} + +pub(super) async fn build_indices_on_dataset_for_catalog( + db: &Omnigraph, + catalog: &Catalog, + table_key: &str, + ds: &mut Dataset, +) -> Result<()> { + if let Some(type_name) = table_key.strip_prefix("node:") { + if !db.table_store.has_btree_index(ds, "id").await? { + db.table_store + .create_btree_index(ds, &["id"]) + .await + .map_err(|e| { + OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e)) + })?; + } + + if let Some(node_type) = catalog.node_types.get(type_name) { + for index_cols in &node_type.indices { + if index_cols.len() != 1 { + continue; + } + let prop_name = &index_cols[0]; + if let Some(prop_type) = node_type.properties.get(prop_name) { + if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { + if !db.table_store.has_fts_index(ds, prop_name).await? { + db.table_store + .create_inverted_index(ds, prop_name.as_str()) + .await + .map_err(|e| { + OmniError::Lance(format!( + "create Inverted index on {}({}): {}", + table_key, prop_name, e + )) + })?; + } + } else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list { + if !db.table_store.has_vector_index(ds, prop_name).await? { + db.table_store + .create_vector_index(ds, prop_name.as_str()) + .await + .map_err(|e| { + OmniError::Lance(format!( + "create Vector index on {}({}): {}", + table_key, prop_name, e + )) + })?; + } + } + } + } + } + return Ok(()); + } + + if table_key.starts_with("edge:") { + if !db.table_store.has_btree_index(ds, "id").await? { + db.table_store + .create_btree_index(ds, &["id"]) + .await + .map_err(|e| { + OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e)) + })?; + } + if !db.table_store.has_btree_index(ds, "src").await? { + db.table_store + .create_btree_index(ds, &["src"]) + .await + .map_err(|e| { + OmniError::Lance(format!("create BTree index on {}(src): {}", table_key, e)) + })?; + } + if !db.table_store.has_btree_index(ds, "dst").await? { + db.table_store + .create_btree_index(ds, &["dst"]) + .await + .map_err(|e| { + OmniError::Lance(format!("create BTree index on {}(dst): {}", table_key, e)) + })?; + } + return Ok(()); + } + + Err(OmniError::manifest(format!( + "invalid table key '{}'", + table_key + ))) +} + +async fn prepare_updates_for_commit( + db: &Omnigraph, + branch: Option<&str>, + updates: &[crate::db::SubTableUpdate], +) -> Result> { + if updates.is_empty() { + return Ok(Vec::new()); + } + + let snapshot = db.snapshot_for_branch(branch).await?; + let mut prepared = Vec::with_capacity(updates.len()); + + for update in updates { + let Some(entry) = snapshot.entry(&update.table_key) else { + return Err(OmniError::manifest(format!( + "no manifest entry for {}", + update.table_key + ))); + }; + + let mut prepared_update = update.clone(); + if prepared_update.row_count > 0 { + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + let mut ds = reopen_for_mutation( + db, + &prepared_update.table_key, + &full_path, + prepared_update.table_branch.as_deref(), + prepared_update.table_version, + ) + .await?; + build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?; + let state = db.table_store.table_state(&full_path, &ds).await?; + prepared_update.table_version = state.version; + prepared_update.row_count = state.row_count; + prepared_update.version_metadata = state.version_metadata; + } + + prepared.push(prepared_update); + } + + Ok(prepared) +} + +async fn commit_prepared_updates( + db: &mut Omnigraph, + updates: &[crate::db::SubTableUpdate], +) -> Result { + let actor_id = db.current_audit_actor().map(str::to_string); + let PublishedSnapshot { + manifest_version, + _snapshot_id: _, + } = db + .coordinator + .commit_updates_with_actor(updates, actor_id.as_deref()) + .await?; + Ok(manifest_version) +} + +pub(super) async fn commit_prepared_updates_on_branch( + db: &mut Omnigraph, + branch: Option<&str>, + updates: &[crate::db::SubTableUpdate], +) -> Result { + let current_branch = db.coordinator.current_branch().map(str::to_string); + let requested_branch = branch.map(str::to_string); + if requested_branch == current_branch { + return commit_prepared_updates(db, updates).await; + } + + let mut coordinator = match requested_branch.as_deref() { + Some(branch) => { + GraphCoordinator::open_branch(db.uri(), branch, Arc::clone(&db.storage)).await? + } + None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?, + }; + let actor_id = db.current_audit_actor().map(str::to_string); + let PublishedSnapshot { + manifest_version, + _snapshot_id: _, + } = coordinator + .commit_updates_with_actor(updates, actor_id.as_deref()) + .await?; + Ok(manifest_version) +} + +pub(super) async fn commit_updates( + db: &mut Omnigraph, + updates: &[crate::db::SubTableUpdate], +) -> Result { + db.ensure_schema_apply_not_locked("write commit").await?; + let current_branch = db.coordinator.current_branch().map(str::to_string); + let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?; + commit_prepared_updates(db, &prepared).await +} + +pub(super) async fn commit_manifest_updates( + db: &mut Omnigraph, + updates: &[crate::db::SubTableUpdate], +) -> Result { + db.coordinator.commit_manifest_updates(updates).await +} + +pub(super) async fn record_merge_commit( + db: &mut Omnigraph, + manifest_version: u64, + parent_commit_id: &str, + merged_parent_commit_id: &str, +) -> Result { + let actor_id = db.current_audit_actor().map(str::to_string); + db.coordinator + .record_merge_commit( + manifest_version, + parent_commit_id, + merged_parent_commit_id, + actor_id.as_deref(), + ) + .await + .map(|snapshot_id| snapshot_id.as_str().to_string()) +} + +pub(super) async fn commit_updates_on_branch( + db: &mut Omnigraph, + branch: Option<&str>, + updates: &[crate::db::SubTableUpdate], +) -> Result { + db.ensure_schema_apply_not_locked("write commit").await?; + let prepared = prepare_updates_for_commit(db, branch, updates).await?; + commit_prepared_updates_on_branch(db, branch, &prepared).await +} + +pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> { + db.coordinator.ensure_commit_graph_initialized().await +} + +pub(super) async fn invalidate_graph_index(db: &Omnigraph) { + db.runtime_cache.invalidate_all().await; +}