Add schema apply command and policy support

This commit is contained in:
andrew 2026-04-12 04:01:14 +03:00
parent a844e0ba68
commit 92fa3189f7
22 changed files with 1903 additions and 146 deletions

View file

@ -8,7 +8,7 @@ use crate::failpoints;
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
use super::commit_graph::{CommitGraph, GraphCommit};
use super::manifest::{ManifestCoordinator, Snapshot, SubTableUpdate};
use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri, is_internal_run_branch};
const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
@ -208,6 +208,10 @@ impl GraphCoordinator {
})
}
pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
self.manifest.list_branches().await
}
pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
self.manifest
.descendant_branches(name)
@ -414,6 +418,28 @@ impl GraphCoordinator {
Ok(manifest_version)
}
pub(crate) async fn commit_manifest_changes(
&mut self,
changes: &[ManifestChange],
) -> Result<u64> {
let manifest_version = self.manifest.commit_changes(changes).await?;
failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
Ok(manifest_version)
}
pub(crate) async fn commit_changes_with_actor(
&mut self,
changes: &[ManifestChange],
actor_id: Option<&str>,
) -> Result<PublishedSnapshot> {
let manifest_version = self.commit_manifest_changes(changes).await?;
let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
Ok(PublishedSnapshot {
manifest_version,
_snapshot_id: snapshot_id,
})
}
pub(crate) async fn record_graph_commit(
&mut self,
manifest_version: u64,

View file

@ -19,7 +19,7 @@ mod repo;
#[path = "manifest/state.rs"]
mod state;
use layout::{manifest_uri, open_manifest_dataset};
use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
pub(crate) use metadata::TableVersionMetadata;
#[cfg(test)]
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
@ -36,6 +36,7 @@ use state::{ManifestState, read_manifest_state};
const OBJECT_TYPE_TABLE: &str = "table";
const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
/// Immutable point-in-time view of the database.
@ -85,6 +86,25 @@ impl SubTableUpdate {
}
}
#[derive(Debug, Clone)]
pub(crate) struct TableRegistration {
pub(crate) table_key: String,
pub(crate) table_path: String,
}
#[derive(Debug, Clone)]
pub(crate) struct TableTombstone {
pub(crate) table_key: String,
pub(crate) tombstone_version: u64,
}
#[derive(Debug, Clone)]
pub(crate) enum ManifestChange {
Update(SubTableUpdate),
RegisterTable(TableRegistration),
Tombstone(TableTombstone),
}
impl SubTableEntry {
pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
open_table_at_version_from_manifest(
@ -97,6 +117,19 @@ impl SubTableEntry {
}
}
pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
if let Some(type_name) = table_key.strip_prefix("node:") {
return Ok(format!("nodes/{}", type_name_hash(type_name)));
}
if let Some(type_name) = table_key.strip_prefix("edge:") {
return Ok(format!("edges/{}", type_name_hash(type_name)));
}
Err(OmniError::manifest(format!(
"invalid table key '{}'",
table_key
)))
}
/// An update to apply to the manifest via `commit`.
#[derive(Debug, Clone)]
pub struct SubTableUpdate {
@ -245,11 +278,20 @@ impl ManifestCoordinator {
/// Atomically inserts one immutable `table_version` row per updated table.
/// The merge-insert commit on `__manifest` is the graph-level publish point.
pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
if updates.is_empty() {
let changes = updates
.iter()
.cloned()
.map(ManifestChange::Update)
.collect::<Vec<_>>();
self.commit_changes(&changes).await
}
pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
if changes.is_empty() {
return Ok(self.version());
}
self.dataset = self.publisher.publish(updates).await?;
self.dataset = self.publisher.publish(changes).await?;
self.known_state = read_manifest_state(&self.dataset).await?;
Ok(self.version())

View file

@ -40,6 +40,10 @@ pub(super) fn version_object_id(table_key: &str, version: u64) -> String {
format!("{}${}", table_key, format_table_version(version))
}
pub(super) fn tombstone_object_id(table_key: &str, version: u64) -> String {
format!("{}$tombstone${}", table_key, format_table_version(version))
}
pub(super) fn table_id_to_key(request_id: Option<&Vec<String>>) -> lance_namespace::Result<String> {
match request_id {
Some(request_id) if request_id.len() == 1 && !request_id[0].is_empty() => {

View file

@ -15,11 +15,11 @@
//! This module should disappear once Lance Rust can do branch-aware batch table
//! version publication against a managed namespace manifest.
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatchIterator;
use async_trait::async_trait;
use lance::Dataset;
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
use lance_namespace::NamespaceError;
@ -27,16 +27,20 @@ use lance_namespace::models::CreateTableVersionRequest;
use crate::error::{OmniError, Result};
use super::layout::{open_manifest_dataset, version_object_id};
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
use super::metadata::parse_namespace_version_request;
use super::state::{
manifest_rows_batch, manifest_schema, read_manifest_entries, read_manifest_state,
manifest_rows_batch, manifest_schema, read_manifest_entries, read_registered_table_locations,
read_tombstone_versions,
};
use super::{
ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION,
SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone,
};
use super::{OBJECT_TYPE_TABLE_VERSION, SubTableEntry, SubTableUpdate};
#[async_trait]
pub(super) trait ManifestBatchPublisher: Send + Sync {
async fn publish(&self, updates: &[SubTableUpdate]) -> Result<Dataset>;
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset>;
}
pub(super) struct GraphNamespacePublisher {
@ -47,6 +51,8 @@ pub(super) struct GraphNamespacePublisher {
#[derive(Debug)]
struct PendingVersionRow {
object_id: String,
object_type: String,
location: Option<String>,
metadata: Option<String>,
table_key: String,
table_version: Option<u64>,
@ -72,17 +78,13 @@ impl GraphNamespacePublisher {
&self,
) -> Result<(
Dataset,
HashMap<String, ()>,
HashMap<String, String>,
HashMap<(String, u64), SubTableEntry>,
HashMap<(String, u64), ()>,
)> {
let dataset = self.dataset().await?;
let current = read_manifest_state(&dataset).await?;
let registered_tables = read_registered_table_locations(&dataset).await?;
let existing_entries = read_manifest_entries(&dataset).await?;
let known_tables = current
.entries
.iter()
.map(|entry| (entry.table_key.clone(), ()))
.collect();
let existing_versions = existing_entries
.iter()
.map(|entry| {
@ -92,67 +94,153 @@ impl GraphNamespacePublisher {
)
})
.collect();
Ok((dataset, known_tables, existing_versions))
let existing_tombstones = read_tombstone_versions(&dataset).await?;
Ok((
dataset,
registered_tables,
existing_versions,
existing_tombstones,
))
}
fn build_pending_rows(
requests: &[CreateTableVersionRequest],
known_tables: &HashMap<String, ()>,
changes: &[ManifestChange],
known_tables: &HashMap<String, String>,
existing_versions: &HashMap<(String, u64), SubTableEntry>,
existing_tombstones: &HashMap<(String, u64), ()>,
) -> Result<Vec<PendingVersionRow>> {
let mut request_versions = HashMap::<(String, u64), ()>::new();
let mut rows = Vec::with_capacity(requests.len());
let mut known_tables = known_tables.clone();
let mut rows = Vec::with_capacity(changes.len());
for request in requests {
let (table_key, table_version, row_count, table_branch, version_metadata) =
parse_namespace_version_request(request)
.map_err(|e| OmniError::Lance(e.to_string()))?;
if !known_tables.contains_key(table_key.as_str()) {
return Err(OmniError::Lance(
NamespaceError::TableNotFound {
message: format!("table {} not found", table_key),
}
.to_string(),
));
}
if request_versions
.insert((table_key.clone(), table_version), ())
.is_some()
for change in changes {
if let ManifestChange::RegisterTable(TableRegistration {
table_key,
table_path,
}) = change
{
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table version {} already exists for {}",
table_version, table_key
),
if let Some(existing_path) = known_tables.get(table_key) {
if existing_path != table_path {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table {} already exists with different path {}",
table_key, existing_path
),
}
.to_string(),
));
}
.to_string(),
));
} else {
known_tables.insert(table_key.clone(), table_path.clone());
}
rows.push(PendingVersionRow {
object_id: table_key.clone(),
object_type: OBJECT_TYPE_TABLE.to_string(),
location: Some(table_path.clone()),
metadata: None,
table_key: table_key.clone(),
table_version: None,
table_branch: None,
row_count: None,
});
}
if let Some(existing) = existing_versions.get(&(table_key.clone(), table_version)) {
let is_owner_branch_handoff =
existing.row_count == row_count && existing.table_branch != table_branch;
if !is_owner_branch_handoff {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table version {} already exists for {}",
table_version, table_key
),
}
for change in changes {
match change {
ManifestChange::RegisterTable(_) => {}
ManifestChange::Update(update) => {
let request = update.to_create_table_version_request();
let (table_key, table_version, row_count, table_branch, version_metadata) =
parse_namespace_version_request(&request)
.map_err(|e| OmniError::Lance(e.to_string()))?;
if !known_tables.contains_key(table_key.as_str()) {
return Err(OmniError::Lance(
NamespaceError::TableNotFound {
message: format!("table {} not found", table_key),
}
.to_string(),
));
}
if request_versions
.insert((table_key.clone(), table_version), ())
.is_some()
{
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table version {} already exists for {}",
table_version, table_key
),
}
.to_string(),
));
}
if let Some(existing) =
existing_versions.get(&(table_key.clone(), table_version))
{
let is_owner_branch_handoff = existing.row_count == row_count
&& existing.table_branch != table_branch;
if !is_owner_branch_handoff {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table version {} already exists for {}",
table_version, table_key
),
}
.to_string(),
));
}
.to_string(),
));
}
rows.push(PendingVersionRow {
object_id: version_object_id(&table_key, table_version),
object_type: OBJECT_TYPE_TABLE_VERSION.to_string(),
location: None,
metadata: Some(version_metadata.to_json_string()?),
table_key,
table_version: Some(table_version),
table_branch,
row_count: Some(row_count),
});
}
ManifestChange::Tombstone(TableTombstone {
table_key,
tombstone_version,
}) => {
if !known_tables.contains_key(table_key.as_str()) {
return Err(OmniError::Lance(
NamespaceError::TableNotFound {
message: format!("table {} not found", table_key),
}
.to_string(),
));
}
if existing_tombstones.contains_key(&(table_key.clone(), *tombstone_version)) {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table tombstone {} already exists for {}",
tombstone_version, table_key
),
}
.to_string(),
));
}
rows.push(PendingVersionRow {
object_id: tombstone_object_id(table_key, *tombstone_version),
object_type: OBJECT_TYPE_TABLE_TOMBSTONE.to_string(),
location: None,
metadata: None,
table_key: table_key.clone(),
table_version: Some(*tombstone_version),
table_branch: None,
row_count: None,
});
}
}
rows.push(PendingVersionRow {
object_id: version_object_id(&table_key, table_version),
metadata: Some(version_metadata.to_json_string()?),
table_key,
table_version: Some(table_version),
table_branch,
row_count: Some(row_count),
});
}
Ok(rows)
@ -170,8 +258,8 @@ impl GraphNamespacePublisher {
for row in rows {
object_ids.push(row.object_id);
object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string());
locations.push(None);
object_types.push(row.object_type);
locations.push(row.location);
metadata.push(row.metadata);
table_keys.push(row.table_key);
table_versions.push(row.table_version);
@ -214,23 +302,41 @@ impl GraphNamespacePublisher {
&self,
requests: &[CreateTableVersionRequest],
) -> Result<Dataset> {
if requests.is_empty() {
return self.dataset().await;
}
let (dataset, known_tables, existing_versions) = self.load_publish_state().await?;
let rows = Self::build_pending_rows(requests, &known_tables, &existing_versions)?;
self.merge_rows(dataset, rows).await
let changes = requests
.iter()
.cloned()
.map(|request| {
let (table_key, table_version, row_count, table_branch, version_metadata) =
parse_namespace_version_request(&request)
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(ManifestChange::Update(SubTableUpdate {
table_key,
table_version,
table_branch,
row_count,
version_metadata,
}))
})
.collect::<Result<Vec<_>>>()?;
self.publish(&changes).await
}
}
#[async_trait]
impl ManifestBatchPublisher for GraphNamespacePublisher {
async fn publish(&self, updates: &[SubTableUpdate]) -> Result<Dataset> {
let requests: Vec<CreateTableVersionRequest> = updates
.iter()
.map(SubTableUpdate::to_create_table_version_request)
.collect();
self.publish_requests(&requests).await
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
if changes.is_empty() {
return self.dataset().await;
}
let (dataset, known_tables, existing_versions, existing_tombstones) =
self.load_publish_state().await?;
let rows = Self::build_pending_rows(
changes,
&known_tables,
&existing_versions,
&existing_tombstones,
)?;
self.merge_rows(dataset, rows).await
}
}

View file

@ -10,7 +10,7 @@ use crate::error::{OmniError, Result};
use super::layout::version_object_id;
use super::metadata::TableVersionMetadata;
use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_VERSION};
use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION};
#[derive(Debug, Clone)]
pub struct SubTableEntry {
@ -28,6 +28,19 @@ pub(super) struct ManifestState {
pub(super) entries: Vec<SubTableEntry>,
}
#[derive(Debug, Clone)]
struct TableTombstoneEntry {
table_key: String,
tombstone_version: u64,
}
#[derive(Debug, Clone)]
struct ManifestScan {
table_locations: HashMap<String, String>,
version_entries: Vec<SubTableEntry>,
tombstones: Vec<TableTombstoneEntry>,
}
pub(super) fn manifest_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("object_id", DataType::Utf8, false),
@ -48,10 +61,10 @@ pub(super) fn manifest_schema() -> SchemaRef {
pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestState> {
let version = dataset.version().version;
let entries = read_manifest_entries(dataset).await?;
let scan = read_manifest_scan(dataset).await?;
let mut latest_versions = HashMap::<String, SubTableEntry>::new();
for entry in entries {
for entry in scan.version_entries {
match latest_versions.get(&entry.table_key) {
Some(existing) if existing.table_version >= entry.table_version => {}
_ => {
@ -60,13 +73,52 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestSta
}
}
let mut entries: Vec<SubTableEntry> = latest_versions.into_values().collect();
let mut tombstones = HashMap::<String, u64>::new();
for tombstone in scan.tombstones {
match tombstones.get(&tombstone.table_key) {
Some(existing) if *existing >= tombstone.tombstone_version => {}
_ => {
tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
}
}
}
let mut entries: Vec<SubTableEntry> = latest_versions
.into_values()
.filter(|entry| {
tombstones
.get(&entry.table_key)
.map(|tombstone_version| *tombstone_version < entry.table_version)
.unwrap_or(true)
})
.collect();
entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
Ok(ManifestState { version, entries })
}
pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTableEntry>> {
Ok(read_manifest_scan(dataset).await?.version_entries)
}
pub(super) async fn read_registered_table_locations(
dataset: &Dataset,
) -> Result<HashMap<String, String>> {
Ok(read_manifest_scan(dataset).await?.table_locations)
}
pub(super) async fn read_tombstone_versions(
dataset: &Dataset,
) -> Result<HashMap<(String, u64), ()>> {
Ok(read_manifest_scan(dataset)
.await?
.tombstones
.into_iter()
.map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ()))
.collect())
}
async fn read_manifest_scan(dataset: &Dataset) -> Result<ManifestScan> {
let batches: Vec<RecordBatch> = dataset
.scan()
.try_into_stream()
@ -78,6 +130,7 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTa
let mut table_locations = HashMap::new();
let mut version_entries = Vec::new();
let mut tombstones = Vec::new();
for batch in &batches {
let object_types = string_column(batch, "object_type")?;
@ -123,6 +176,13 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTa
version_metadata: TableVersionMetadata::from_json_str(metadata.value(row))?,
});
}
OBJECT_TYPE_TABLE_TOMBSTONE => {
let tombstone_version = required_u64(versions, row, "table_version")?;
tombstones.push(TableTombstoneEntry {
table_key,
tombstone_version,
});
}
_ => {}
}
}
@ -149,7 +209,11 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTa
.then(a.table_version.cmp(&b.table_version))
});
Ok(entries)
Ok(ManifestScan {
table_locations,
version_entries: entries,
tombstones,
})
}
pub(super) fn entries_to_batch(

View file

@ -132,6 +132,63 @@ async fn test_commit_advances_version() {
assert_eq!(company.row_count, 0);
}
#[tokio::test]
async fn test_commit_changes_can_register_new_table_and_tombstone_old_one() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let catalog = build_test_catalog();
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
let before_version = mc.version();
let person_entry = mc.snapshot().entry("node:Person").unwrap().clone();
let table_key = "node:Human".to_string();
let table_path = table_path_for_table_key(&table_key).unwrap();
let dataset_uri = format!("{}/{}", uri, table_path);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
]));
let ds = crate::table_store::TableStore::create_empty_dataset(&dataset_uri, &schema)
.await
.unwrap();
let state = crate::table_store::TableStore::new(uri)
.table_state(&dataset_uri, &ds)
.await
.unwrap();
mc.commit_changes(&[
ManifestChange::RegisterTable(TableRegistration {
table_key: table_key.clone(),
table_path: table_path.clone(),
}),
ManifestChange::Update(SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
}),
ManifestChange::Tombstone(TableTombstone {
table_key: "node:Person".to_string(),
tombstone_version: person_entry.table_version + 1,
}),
])
.await
.unwrap();
let head = mc.snapshot();
assert!(head.entry("node:Human").is_some());
assert!(head.entry("node:Person").is_none());
let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
.await
.unwrap();
assert!(historical.entry("node:Person").is_some());
assert!(historical.entry("node:Human").is_none());
}
#[tokio::test]
async fn test_snapshot_open_sub_table() {
let dir = tempfile::tempdir().unwrap();
@ -889,13 +946,16 @@ impl RecordingPublisher {
#[async_trait]
impl ManifestBatchPublisher for RecordingPublisher {
async fn publish(&self, updates: &[SubTableUpdate]) -> Result<Dataset> {
let requests: Vec<CreateTableVersionRequest> = updates
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
let requests: Vec<CreateTableVersionRequest> = changes
.iter()
.map(SubTableUpdate::to_create_table_version_request)
.filter_map(|change| match change {
ManifestChange::Update(update) => Some(update.to_create_table_version_request()),
ManifestChange::RegisterTable(_) | ManifestChange::Tombstone(_) => None,
})
.collect();
self.requests.lock().await.extend_from_slice(&requests);
self.inner.publish_requests(&requests).await
self.inner.publish(changes).await
}
}
@ -903,7 +963,7 @@ struct FailingPublisher;
#[async_trait]
impl ManifestBatchPublisher for FailingPublisher {
async fn publish(&self, _updates: &[SubTableUpdate]) -> Result<Dataset> {
async fn publish(&self, _changes: &[ManifestChange]) -> Result<Dataset> {
Err(OmniError::manifest(
"injected batch publisher failure".to_string(),
))

View file

@ -8,6 +8,6 @@ mod schema_state;
pub use commit_graph::GraphCommit;
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub use omnigraph::{MergeOutcome, Omnigraph};
pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult};
pub(crate) use run_registry::is_internal_run_branch;
pub use run_registry::{RunId, RunRecord, RunStatus};

View file

@ -5,7 +5,7 @@ use std::sync::Arc;
use arrow_array::{
Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array,
RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
};
use arrow_schema::{DataType, Field, Schema};
use lance::Dataset;
@ -17,7 +17,8 @@ use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::types::ScalarType;
use omnigraph_compiler::{
SchemaIR, SchemaMigrationPlan, build_catalog_from_ir, build_schema_ir, plan_schema_migration,
SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
build_schema_ir, plan_schema_migration,
};
use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
@ -28,7 +29,9 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_u
use crate::table_store::TableStore;
use super::commit_graph::GraphCommit;
use super::manifest::Snapshot;
use super::manifest::{
ManifestChange, Snapshot, TableRegistration, TableTombstone, table_path_for_table_key,
};
use super::schema_state::{
SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
validate_schema_contract, write_schema_contract,
@ -42,6 +45,14 @@ pub enum MergeOutcome {
Merged,
}
#[derive(Debug, Clone)]
pub struct SchemaApplyResult {
pub supported: bool,
pub applied: bool,
pub manifest_version: u64,
pub steps: Vec<SchemaMigrationStep>,
}
/// Top-level handle to an Omnigraph database.
///
/// An Omnigraph is a Lance-native graph database with git-style branching.
@ -160,6 +171,324 @@ impl Omnigraph {
.map_err(|err| OmniError::manifest(err.to_string()))
}
pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
self.ensure_schema_state_valid().await?;
let branches = self.coordinator.all_branches().await?;
let public_non_main = branches
.iter()
.filter(|branch| branch.as_str() != "main")
.cloned()
.collect::<Vec<_>>();
if !public_non_main.is_empty() {
return Err(OmniError::manifest_conflict(format!(
"schema apply requires a repo with only main; found non-main branches: {}",
public_non_main.join(", ")
)));
}
let accepted_ir = read_accepted_schema_ir(self.uri(), Arc::clone(&self.storage)).await?;
let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
let plan = plan_schema_migration(&accepted_ir, &desired_ir)
.map_err(|err| OmniError::manifest(err.to_string()))?;
if !plan.supported {
let reason = plan
.steps
.iter()
.find_map(|step| match step {
SchemaMigrationStep::UnsupportedChange { reason, .. } => Some(reason.as_str()),
_ => None,
})
.unwrap_or("unsupported schema migration plan");
return Err(OmniError::manifest(reason.to_string()));
}
if plan.steps.is_empty() {
return Ok(SchemaApplyResult {
supported: true,
applied: false,
manifest_version: self.version(),
steps: plan.steps,
});
}
let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
fixup_blob_schemas(&mut desired_catalog);
let snapshot = self.snapshot();
let mut added_tables = BTreeSet::new();
let mut renamed_tables = HashMap::new();
let mut rewritten_tables = BTreeSet::new();
let mut indexed_tables = BTreeSet::new();
let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
let mut changed_edge_tables = false;
for step in &plan.steps {
match step {
SchemaMigrationStep::AddType { type_kind, name } => {
let table_key = schema_table_key(*type_kind, name);
if table_key.starts_with("edge:") {
changed_edge_tables = true;
}
added_tables.insert(table_key);
}
SchemaMigrationStep::RenameType {
type_kind,
from,
to,
} => {
let source_key = schema_table_key(*type_kind, from);
let target_key = schema_table_key(*type_kind, to);
if source_key.starts_with("edge:") {
changed_edge_tables = true;
}
renamed_tables.insert(target_key, source_key);
}
SchemaMigrationStep::AddProperty {
type_kind,
type_name,
..
} => {
let table_key = schema_table_key(*type_kind, type_name);
if table_key.starts_with("edge:") {
changed_edge_tables = true;
}
rewritten_tables.insert(table_key);
}
SchemaMigrationStep::RenameProperty {
type_kind,
type_name,
from,
to,
} => {
let table_key = schema_table_key(*type_kind, type_name);
if table_key.starts_with("edge:") {
changed_edge_tables = true;
}
rewritten_tables.insert(table_key.clone());
property_renames
.entry(table_key)
.or_default()
.insert(to.clone(), from.clone());
}
SchemaMigrationStep::AddConstraint {
type_kind,
type_name,
..
} => {
indexed_tables.insert(schema_table_key(*type_kind, type_name));
}
SchemaMigrationStep::UpdateTypeMetadata { .. }
| SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
SchemaMigrationStep::UnsupportedChange { reason, .. } => {
return Err(OmniError::manifest(reason.clone()));
}
}
}
let mut table_registrations = HashMap::<String, String>::new();
let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
let mut table_tombstones = HashMap::<String, u64>::new();
for table_key in &added_tables {
let table_path = table_path_for_table_key(table_key)?;
let dataset_uri = self.table_store.dataset_uri(&table_path);
let schema = schema_for_table_key(&desired_catalog, table_key)?;
let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = self.table_store.table_state(&dataset_uri, &ds).await?;
table_registrations.insert(table_key.clone(), table_path);
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
}
for (target_table_key, source_table_key) in &renamed_tables {
let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing source table '{}' for schema rename",
source_table_key
))
})?;
let source_ds = snapshot.open(source_table_key).await?;
let batch = self
.batch_for_schema_apply_rewrite(
&source_ds,
source_table_key,
&self.catalog,
target_table_key,
&desired_catalog,
property_renames.get(target_table_key),
)
.await?;
let table_path = table_path_for_table_key(target_table_key)?;
let dataset_uri = self.table_store.dataset_uri(&table_path);
let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
self.build_indices_on_dataset_for_catalog(
&desired_catalog,
target_table_key,
&mut target_ds,
)
.await?;
let state = self
.table_store
.table_state(&dataset_uri, &target_ds)
.await?;
table_registrations.insert(target_table_key.clone(), table_path);
table_updates.insert(
target_table_key.clone(),
crate::db::SubTableUpdate {
table_key: target_table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
table_tombstones.insert(
source_table_key.clone(),
source_entry.table_version.saturating_add(1),
);
}
for table_key in &rewritten_tables {
if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
continue;
}
let entry = snapshot.entry(table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing source table '{}' for schema apply",
table_key
))
})?;
let source_ds = snapshot.open(table_key).await?;
let batch = self
.batch_for_schema_apply_rewrite(
&source_ds,
table_key,
&self.catalog,
table_key,
&desired_catalog,
property_renames.get(table_key),
)
.await?;
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
let mut state = self
.table_store
.table_state(&dataset_uri, &target_ds)
.await?;
if indexed_tables.contains(table_key) {
self.build_indices_on_dataset_for_catalog(
&desired_catalog,
table_key,
&mut target_ds,
)
.await?;
state = self
.table_store
.table_state(&dataset_uri, &target_ds)
.await?;
}
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
}
for table_key in &indexed_tables {
if added_tables.contains(table_key)
|| renamed_tables.contains_key(table_key)
|| rewritten_tables.contains(table_key)
{
continue;
}
let entry = snapshot.entry(table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing table '{}' for schema index apply",
table_key
))
})?;
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
let mut ds = self
.table_store
.open_dataset_head_for_write(table_key, &dataset_uri, None)
.await?;
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = self.table_store.table_state(&dataset_uri, &ds).await?;
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
}
let mut manifest_changes = Vec::new();
for (table_key, table_path) in table_registrations {
manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
table_key,
table_path,
}));
}
for update in table_updates.into_values() {
manifest_changes.push(ManifestChange::Update(update));
}
for (table_key, tombstone_version) in table_tombstones {
manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
table_key,
tombstone_version,
}));
}
let actor_id = self.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = self
.coordinator
.commit_changes_with_actor(&manifest_changes, actor_id.as_deref())
.await?;
let schema_path = join_uri(&self.root_uri, SCHEMA_SOURCE_FILENAME);
self.storage
.write_text(&schema_path, desired_schema_source)
.await?;
write_schema_contract(&self.root_uri, self.storage.as_ref(), &desired_ir).await?;
self.catalog = desired_catalog;
self.schema_source = desired_schema_source.to_string();
self.coordinator.refresh().await?;
self.runtime_cache.invalidate_all().await;
if changed_edge_tables {
self.invalidate_graph_index().await;
}
Ok(SchemaApplyResult {
supported: true,
applied: true,
manifest_version,
steps: plan.steps,
})
}
pub(crate) fn table_store(&self) -> &TableStore {
&self.table_store
}
@ -1368,6 +1697,16 @@ impl Omnigraph {
&self,
table_key: &str,
ds: &mut Dataset,
) -> Result<()> {
self.build_indices_on_dataset_for_catalog(&self.catalog, table_key, ds)
.await
}
pub(crate) async fn build_indices_on_dataset_for_catalog(
&self,
catalog: &Catalog,
table_key: &str,
ds: &mut Dataset,
) -> Result<()> {
if let Some(type_name) = table_key.strip_prefix("node:") {
if !self.table_store.has_btree_index(ds, "id").await? {
@ -1379,7 +1718,7 @@ impl Omnigraph {
})?;
}
if let Some(node_type) = self.catalog.node_types.get(type_name) {
if let Some(node_type) = catalog.node_types.get(type_name) {
for index_cols in &node_type.indices {
if index_cols.len() != 1 {
continue;
@ -1600,58 +1939,100 @@ impl Omnigraph {
source_ds: &Dataset,
table_key: &str,
) -> Result<RecordBatch> {
let target_schema = schema_for_table_key(self.catalog(), table_key)?;
let blob_properties = blob_properties_for_table_key(self.catalog(), table_key)?;
if blob_properties.is_empty() {
let batches = self.table_store().scan_batches(source_ds).await?;
return concat_or_empty_batches(target_schema, batches);
}
self.batch_for_schema_apply_rewrite(
source_ds,
table_key,
&self.catalog,
table_key,
&self.catalog,
None,
)
.await
}
let batches = self
.table_store()
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
.await?;
let batch = concat_or_empty_batches(target_schema.clone(), batches)?;
if batch.num_rows() == 0 {
return Ok(batch);
async fn batch_for_schema_apply_rewrite(
&self,
source_ds: &Dataset,
source_table_key: &str,
source_catalog: &Catalog,
target_table_key: &str,
target_catalog: &Catalog,
property_renames: Option<&HashMap<String, String>>,
) -> Result<RecordBatch> {
let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
let source_blob_properties =
blob_properties_for_table_key(source_catalog, source_table_key)?;
let target_blob_properties =
blob_properties_for_table_key(target_catalog, target_table_key)?;
let needs_row_ids =
!source_blob_properties.is_empty() || !target_blob_properties.is_empty();
let batches = if needs_row_ids {
self.table_store()
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
.await?
} else {
self.table_store().scan_batches(source_ds).await?
};
if batches.is_empty() {
return Ok(RecordBatch::new_empty(target_schema));
}
let source_schema = batches[0].schema();
let batch = concat_or_empty_batches(source_schema, batches)?;
let row_ids = batch
.column_by_name("_rowid")
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| {
OmniError::Lance(format!(
"expected _rowid column when rewriting '{}'",
table_key
))
})?;
let row_ids: Vec<u64> = row_ids.values().iter().copied().collect();
let row_ids = if needs_row_ids {
Some(
batch
.column_by_name("_rowid")
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| {
OmniError::Lance(format!(
"expected _rowid column when rewriting '{}'",
source_table_key
))
})?
.values()
.iter()
.copied()
.collect::<Vec<_>>(),
)
} else {
None
};
let mut columns = Vec::with_capacity(target_schema.fields().len());
for field in target_schema.fields() {
if blob_properties.contains(field.name()) {
let descriptions = batch
.column_by_name(field.name())
.and_then(|col| col.as_any().downcast_ref::<StructArray>())
.ok_or_else(|| {
OmniError::Lance(format!(
"expected blob descriptions for '{}.{}'",
table_key,
field.name()
))
})?;
columns.push(
self.rebuild_blob_column(source_ds, field.name(), descriptions, &row_ids)
.await?,
);
let source_name = property_renames
.and_then(|renames| renames.get(field.name()))
.map(String::as_str)
.unwrap_or_else(|| field.name().as_str());
if let Some(column) = batch.column_by_name(source_name) {
if target_blob_properties.contains(field.name())
&& source_blob_properties.contains(source_name)
{
let descriptions =
column
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
OmniError::Lance(format!(
"expected blob descriptions for '{}.{}'",
source_table_key, source_name
))
})?;
let rebuilt = self
.rebuild_blob_column(
source_ds,
source_name,
descriptions,
row_ids.as_deref().unwrap_or(&[]),
)
.await?;
columns.push(rebuilt);
} else {
columns.push(column.clone());
}
} else {
columns.push(batch.column_by_name(field.name()).cloned().ok_or_else(|| {
OmniError::Lance(format!(
"missing column '{}.{}' in rewrite batch",
table_key,
field.name()
))
})?);
columns.push(new_null_array(field.data_type(), batch.num_rows()));
}
}
@ -2130,6 +2511,14 @@ fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
}
fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
match type_kind {
SchemaTypeKind::Node => format!("node:{}", name),
SchemaTypeKind::Edge => format!("edge:{}", name),
SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
}
}
fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
if let Some(type_name) = table_key.strip_prefix("node:") {
let node_type: &NodeType = catalog
@ -2327,8 +2716,10 @@ fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Va
#[cfg(test)]
mod tests {
use super::*;
use crate::db::manifest::ManifestCoordinator;
use async_trait::async_trait;
use omnigraph_compiler::{SchemaMigrationStep, SchemaTypeKind};
use serde_json::Value;
use std::fs;
use std::sync::Mutex;
@ -2621,6 +3012,182 @@ edge WorksAt: Person -> Company
);
}
async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
let snapshot = db.snapshot();
let ds = snapshot.open(table_key).await.unwrap();
let batches = db.table_store().scan_batches(&ds).await.unwrap();
batches
.into_iter()
.flat_map(|batch| {
(0..batch.num_rows())
.map(|row| record_batch_row_to_json(&batch, row).unwrap())
.collect::<Vec<_>>()
})
.collect()
}
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
let schema: Arc<Schema> = Arc::new(ds.schema().into());
let columns: Vec<Arc<dyn Array>> = schema
.fields()
.iter()
.map(|field| match field.name().as_str() {
"id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
"name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
"age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
_ => new_null_array(field.data_type(), 1),
})
.collect();
let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
let state = db
.table_store()
.append_batch(&full_path, &mut ds, batch)
.await
.unwrap();
db.commit_updates(&[crate::db::SubTableUpdate {
table_key: "node:Person".to_string(),
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
}])
.await
.unwrap();
}
#[tokio::test]
async fn test_apply_schema_noop_returns_not_applied() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let result = db.apply_schema(TEST_SCHEMA).await.unwrap();
assert!(result.supported);
assert!(!result.applied);
assert!(result.steps.is_empty());
}
#[tokio::test]
async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
seed_person_row(&mut db, "Alice", Some(30)).await;
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let result = db.apply_schema(&desired).await.unwrap();
assert!(result.applied);
let reopened = Omnigraph::open(uri).await.unwrap();
let rows = table_rows_json(&reopened, "node:Person").await;
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["name"], "Alice");
assert_eq!(rows[0]["age"], 30);
assert!(rows[0]["nickname"].is_null());
assert!(
reopened.catalog().node_types["Person"]
.properties
.contains_key("nickname")
);
assert!(dir.path().join("_schema.pg").exists());
}
#[tokio::test]
async fn test_apply_schema_renames_property_and_preserves_values() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
seed_person_row(&mut db, "Alice", Some(30)).await;
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" years: I32? @rename_from(\"age\")\n}",
);
db.apply_schema(&desired).await.unwrap();
let reopened = Omnigraph::open(uri).await.unwrap();
let rows = table_rows_json(&reopened, "node:Person").await;
assert_eq!(rows[0]["name"], "Alice");
assert_eq!(rows[0]["years"], 30);
assert!(rows[0].get("age").is_none());
}
#[tokio::test]
async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
seed_person_row(&mut db, "Alice", Some(30)).await;
let before_version = db.snapshot().version();
let desired = TEST_SCHEMA
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
.replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
.replace(
"edge WorksAt: Person -> Company",
"edge WorksAt: Human -> Company",
);
db.apply_schema(&desired).await.unwrap();
let head = db.snapshot();
assert!(head.entry("node:Person").is_none());
assert!(head.entry("node:Human").is_some());
let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
.await
.unwrap();
assert!(historical.entry("node:Person").is_some());
assert!(historical.entry("node:Human").is_none());
}
#[tokio::test]
async fn test_apply_schema_rejects_when_non_main_branch_exists() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.branch_create("feature").await.unwrap();
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let err = db.apply_schema(&desired).await.unwrap_err();
assert!(
err.to_string()
.contains("schema apply requires a repo with only main")
);
}
#[tokio::test]
async fn test_apply_schema_adds_index_for_existing_property() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
db.apply_schema(&desired).await.unwrap();
let snapshot = db.snapshot();
let ds = snapshot.open("node:Person").await.unwrap();
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
}
#[tokio::test]
async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let before_version = db.snapshot().version();
let desired = TEST_SCHEMA.replace("age: I32?", "age: I64?");
let err = db.apply_schema(&desired).await.unwrap_err();
assert!(err.to_string().contains("changing property type"));
assert_eq!(db.snapshot().version(), before_version);
}
#[tokio::test]
async fn test_open_nonexistent_fails() {
let result = Omnigraph::open("/tmp/nonexistent_omnigraph_test_xyz").await;

View file

@ -399,6 +399,20 @@ impl TableStore {
self.append_batch(dataset_uri, ds, batch).await
}
pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let params = WriteParams {
mode: WriteMode::Overwrite,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
allow_external_blob_outside_bases: true,
..Default::default()
};
Dataset::write(reader, dataset_uri, Some(params))
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub async fn merge_insert_batch(
&self,
dataset_uri: &str,