Merge pull request #10 from ModernRelay/codex/mr-610-db-refactor

Refactor omnigraph DB module layout
This commit is contained in:
Andrew Altshuler 2026-04-12 17:22:10 +03:00 committed by GitHub
commit 3192da8b3e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 1562 additions and 1381 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,410 @@
use super::*;
pub(super) async fn entity_at_target(
db: &Omnigraph,
target: impl Into<ReadTarget>,
table_key: &str,
id: &str,
) -> Result<Option<serde_json::Value>> {
let resolved = db.resolved_target(target).await?;
entity_from_snapshot(db, &resolved.snapshot, table_key, id).await
}
pub(super) async fn entity_at(
db: &Omnigraph,
table_key: &str,
id: &str,
version: u64,
) -> Result<Option<serde_json::Value>> {
let snap = db.coordinator.snapshot_at_version(version).await?;
entity_from_snapshot(db, &snap, table_key, id).await
}
pub(super) async fn export_jsonl(
db: &Omnigraph,
branch: &str,
type_names: &[String],
table_keys: &[String],
) -> Result<String> {
let mut out = Vec::new();
export_jsonl_to_writer(db, branch, type_names, table_keys, &mut out).await?;
String::from_utf8(out)
.map_err(|err| OmniError::manifest(format!("export produced invalid UTF-8: {}", err)))
}
pub(super) async fn export_jsonl_to_writer<W: Write>(
db: &Omnigraph,
branch: &str,
type_names: &[String],
table_keys: &[String],
writer: &mut W,
) -> Result<()> {
db.ensure_schema_state_valid().await?;
let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?;
export_snapshot_jsonl_to_writer(db, &snapshot, type_names, table_keys, writer).await
}
async fn entity_from_snapshot(
db: &Omnigraph,
snapshot: &Snapshot,
table_key: &str,
id: &str,
) -> Result<Option<serde_json::Value>> {
if snapshot.entry(table_key).is_none() {
return Ok(None);
}
let ds = db
.table_store
.open_snapshot_table(snapshot, table_key)
.await?;
let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
let batches = db
.table_store
.scan(&ds, None, Some(&filter_sql), None)
.await?;
let Some(batch) = batches.iter().find(|batch| batch.num_rows() > 0) else {
return Ok(None);
};
Ok(Some(record_batch_row_to_json(batch, 0)?))
}
async fn export_snapshot_jsonl_to_writer<W: Write>(
db: &Omnigraph,
snapshot: &Snapshot,
type_names: &[String],
table_keys: &[String],
writer: &mut W,
) -> Result<()> {
let selected_tables = export_table_keys(snapshot, type_names, table_keys)?;
for table_key in selected_tables {
export_table_to_writer(db, snapshot, &table_key, writer).await?;
}
Ok(())
}
fn export_table_keys(
snapshot: &Snapshot,
type_names: &[String],
table_keys: &[String],
) -> Result<Vec<String>> {
let available = snapshot
.entries()
.map(|entry| entry.table_key.clone())
.collect::<BTreeSet<_>>();
let mut selected = BTreeSet::new();
for table_key in table_keys {
if !available.contains(table_key) {
return Err(OmniError::manifest(format!(
"unknown export table '{}'",
table_key
)));
}
selected.insert(table_key.clone());
}
for type_name in type_names {
let mut matched = false;
let node_key = format!("node:{}", type_name);
if available.contains(&node_key) {
selected.insert(node_key);
matched = true;
}
let edge_key = format!("edge:{}", type_name);
if available.contains(&edge_key) {
selected.insert(edge_key);
matched = true;
}
if !matched {
return Err(OmniError::manifest(format!(
"unknown export type '{}'",
type_name
)));
}
}
if selected.is_empty() {
return Ok(available.into_iter().collect());
}
Ok(selected.into_iter().collect())
}
async fn export_table_to_writer<W: Write>(
db: &Omnigraph,
snapshot: &Snapshot,
table_key: &str,
writer: &mut W,
) -> Result<()> {
let ds = db
.table_store
.open_snapshot_table(snapshot, table_key)
.await?;
let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]);
let blob_properties = blob_properties_for_table_key(db.catalog(), table_key)?;
if blob_properties.is_empty() {
for batch in db.table_store.scan(&ds, None, None, ordering).await? {
write_export_rows_from_batch(db, table_key, &batch, None, writer)?;
}
return Ok(());
}
let batches = db
.table_store
.scan_with(&ds, None, None, ordering, true, |_| Ok(()))
.await?;
for batch in batches {
let row_ids = batch
.column_by_name("_rowid")
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| {
OmniError::Lance(format!(
"expected _rowid column when exporting '{}'",
table_key
))
})?
.values()
.iter()
.copied()
.collect::<Vec<_>>();
let blob_values = export_blob_values(&ds, &batch, &row_ids, blob_properties).await?;
write_export_rows_from_batch(db, table_key, &batch, Some(&blob_values), writer)?;
}
Ok(())
}
async fn export_blob_values(
source_ds: &Dataset,
batch: &RecordBatch,
row_ids: &[u64],
blob_properties: &std::collections::HashSet<String>,
) -> Result<HashMap<String, Vec<Option<String>>>> {
let mut values = HashMap::with_capacity(blob_properties.len());
for property in blob_properties {
let descriptions = batch
.column_by_name(property)
.and_then(|col| col.as_any().downcast_ref::<StructArray>())
.ok_or_else(|| {
OmniError::Lance(format!(
"expected blob descriptions for export column '{}'",
property
))
})?;
values.insert(
property.clone(),
export_blob_column_values(source_ds, property, descriptions, row_ids).await?,
);
}
Ok(values)
}
fn write_export_rows_from_batch<W: Write>(
db: &Omnigraph,
table_key: &str,
batch: &RecordBatch,
blob_values: Option<&HashMap<String, Vec<Option<String>>>>,
writer: &mut W,
) -> Result<()> {
if let Some(type_name) = table_key.strip_prefix("node:") {
let node_type = db
.catalog
.node_types
.get(type_name)
.ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
for row in 0..batch.num_rows() {
let mut data = serde_json::Map::new();
data.insert(
"id".to_string(),
json_value_from_named_column(batch, "id", row)?,
);
for field in node_type.arrow_schema.fields().iter().skip(1) {
data.insert(
field.name().clone(),
export_value_for_field(
batch,
field.name(),
row,
blob_values.and_then(|values| values.get(field.name())),
)?,
);
}
write_export_jsonl_row(
writer,
table_key,
&serde_json::json!({
"type": type_name,
"data": serde_json::Value::Object(data),
}),
)?;
}
return Ok(());
}
if let Some(edge_name) = table_key.strip_prefix("edge:") {
let edge_type = db
.catalog
.edge_types
.get(edge_name)
.ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;
for row in 0..batch.num_rows() {
let from = named_string_value(batch, "src", row)?;
let to = named_string_value(batch, "dst", row)?;
let mut data = serde_json::Map::new();
data.insert(
"id".to_string(),
json_value_from_named_column(batch, "id", row)?,
);
for field in edge_type.arrow_schema.fields().iter().skip(3) {
data.insert(
field.name().clone(),
export_value_for_field(
batch,
field.name(),
row,
blob_values.and_then(|values| values.get(field.name())),
)?,
);
}
write_export_jsonl_row(
writer,
table_key,
&serde_json::json!({
"edge": edge_name,
"from": from,
"to": to,
"data": serde_json::Value::Object(data),
}),
)?;
}
return Ok(());
}
Err(OmniError::manifest(format!(
"invalid export table key '{}'",
table_key
)))
}
fn write_export_jsonl_row<W: Write>(
writer: &mut W,
table_key: &str,
row: &serde_json::Value,
) -> Result<()> {
serde_json::to_writer(&mut *writer, row).map_err(|err| {
OmniError::manifest(format!(
"failed to serialize export row for '{}': {}",
table_key, err
))
})?;
writer.write_all(b"\n")?;
Ok(())
}
async fn export_blob_column_values(
source_ds: &Dataset,
column_name: &str,
descriptions: &StructArray,
row_ids: &[u64],
) -> Result<Vec<Option<String>>> {
let mut non_null_row_ids = Vec::new();
let mut non_null_positions = Vec::new();
let mut values = vec![None; row_ids.len()];
for (row, row_id) in row_ids.iter().enumerate() {
if blob_description_is_null(descriptions, row)? {
continue;
}
non_null_row_ids.push(*row_id);
non_null_positions.push(row);
}
if non_null_row_ids.is_empty() {
return Ok(values);
}
let mut perm: Vec<usize> = (0..non_null_row_ids.len()).collect();
perm.sort_by_key(|&i| non_null_row_ids[i]);
let sorted_ids: Vec<u64> = perm.iter().map(|&i| non_null_row_ids[i]).collect();
let sorted_blobs = Arc::new(source_ds.clone())
.take_blobs(&sorted_ids, column_name)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if sorted_blobs.len() != non_null_positions.len() {
return Err(OmniError::Lance(format!(
"blob export for '{}' lost alignment with selected rows",
column_name
)));
}
let mut inverse_perm = vec![0usize; perm.len()];
for (sorted_pos, &orig_pos) in perm.iter().enumerate() {
inverse_perm[orig_pos] = sorted_pos;
}
for (idx, position) in non_null_positions.into_iter().enumerate() {
let blob = &sorted_blobs[inverse_perm[idx]];
let value = if let Some(uri) = blob.uri() {
uri.to_string()
} else {
let bytes = blob
.read()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
format!(
"base64:{}",
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, bytes)
)
};
values[position] = Some(value);
}
Ok(values)
}
fn export_value_for_field(
batch: &RecordBatch,
field_name: &str,
row: usize,
blob_values: Option<&Vec<Option<String>>>,
) -> Result<serde_json::Value> {
if let Some(blob_values) = blob_values {
return Ok(blob_values
.get(row)
.and_then(|value| value.clone())
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null));
}
json_value_from_named_column(batch, field_name, row)
}
fn json_value_from_named_column(
batch: &RecordBatch,
field_name: &str,
row: usize,
) -> Result<serde_json::Value> {
let column = batch.column_by_name(field_name).ok_or_else(|| {
OmniError::Lance(format!("missing column '{}' in export batch", field_name))
})?;
json_value_from_array(column.as_ref(), row)
}
fn named_string_value(batch: &RecordBatch, field_name: &str, row: usize) -> Result<String> {
let column = batch.column_by_name(field_name).ok_or_else(|| {
OmniError::Lance(format!("missing column '{}' in export batch", field_name))
})?;
let array = column
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance(format!("expected Utf8 column '{}'", field_name)))?;
if array.is_null(row) {
return Err(OmniError::Lance(format!(
"unexpected null in export column '{}'",
field_name
)));
}
Ok(array.value(row).to_string())
}

View file

@ -0,0 +1,596 @@
use super::*;
pub(super) async fn plan_schema(
db: &Omnigraph,
desired_schema_source: &str,
) -> Result<SchemaMigrationPlan> {
db.ensure_schema_state_valid().await?;
let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
plan_schema_migration(&accepted_ir, &desired_ir)
.map_err(|err| OmniError::manifest(err.to_string()))
}
pub(super) async fn apply_schema(
db: &mut Omnigraph,
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
acquire_schema_apply_lock(db).await?;
let result = apply_schema_with_lock(db, desired_schema_source).await;
let release_result = release_schema_apply_lock(db).await;
match (result, release_result) {
(Ok(result), Ok(())) => Ok(result),
(Ok(_), Err(err)) => Err(err),
(Err(err), Ok(())) => Err(err),
(Err(err), Err(_)) => Err(err),
}
}
pub(super) async fn apply_schema_with_lock(
db: &mut Omnigraph,
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
db.ensure_schema_state_valid().await?;
let branches = db.coordinator.all_branches().await?;
let blocking_branches = branches
.into_iter()
.filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch))
.collect::<Vec<_>>();
if !blocking_branches.is_empty() {
return Err(OmniError::manifest_conflict(format!(
"schema apply requires a repo with only main; found non-main branches: {}",
blocking_branches.join(", ")
)));
}
let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
let plan = plan_schema_migration(&accepted_ir, &desired_ir)
.map_err(|err| OmniError::manifest(err.to_string()))?;
if !plan.supported {
let reason = plan
.steps
.iter()
.find_map(|step| match step {
SchemaMigrationStep::UnsupportedChange { reason, .. } => Some(reason.as_str()),
_ => None,
})
.unwrap_or("unsupported schema migration plan");
return Err(OmniError::manifest(reason.to_string()));
}
if plan.steps.is_empty() {
return Ok(SchemaApplyResult {
supported: true,
applied: false,
manifest_version: db.version(),
steps: plan.steps,
});
}
let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
fixup_blob_schemas(&mut desired_catalog);
let snapshot = db.snapshot();
let base_manifest_version = snapshot.version();
let mut added_tables = BTreeSet::new();
let mut renamed_tables = HashMap::new();
let mut rewritten_tables = BTreeSet::new();
let mut indexed_tables = BTreeSet::new();
let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
let mut changed_edge_tables = false;
for step in &plan.steps {
match step {
SchemaMigrationStep::AddType { type_kind, name } => {
let table_key = schema_table_key(*type_kind, name);
if table_key.starts_with("edge:") {
changed_edge_tables = true;
}
added_tables.insert(table_key);
}
SchemaMigrationStep::RenameType {
type_kind,
from,
to,
} => {
let source_key = schema_table_key(*type_kind, from);
let target_key = schema_table_key(*type_kind, to);
if source_key.starts_with("edge:") {
changed_edge_tables = true;
}
renamed_tables.insert(target_key, source_key);
}
SchemaMigrationStep::AddProperty {
type_kind,
type_name,
..
} => {
let table_key = schema_table_key(*type_kind, type_name);
if table_key.starts_with("edge:") {
changed_edge_tables = true;
}
rewritten_tables.insert(table_key);
}
SchemaMigrationStep::RenameProperty {
type_kind,
type_name,
from,
to,
} => {
let table_key = schema_table_key(*type_kind, type_name);
if table_key.starts_with("edge:") {
changed_edge_tables = true;
}
rewritten_tables.insert(table_key.clone());
property_renames
.entry(table_key)
.or_default()
.insert(to.clone(), from.clone());
}
SchemaMigrationStep::AddConstraint {
type_kind,
type_name,
..
} => {
indexed_tables.insert(schema_table_key(*type_kind, type_name));
}
SchemaMigrationStep::UpdateTypeMetadata { .. }
| SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
SchemaMigrationStep::UnsupportedChange { reason, .. } => {
return Err(OmniError::manifest(reason.clone()));
}
}
}
let mut table_registrations = HashMap::<String, String>::new();
let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
let mut table_tombstones = HashMap::<String, u64>::new();
for table_key in &added_tables {
let table_path = table_path_for_table_key(table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
let schema = schema_for_table_key(&desired_catalog, table_key)?;
let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
table_registrations.insert(table_key.clone(), table_path);
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
}
for (target_table_key, source_table_key) in &renamed_tables {
let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing source table '{}' for schema rename",
source_table_key
))
})?;
ensure_snapshot_entry_head_matches(db, source_entry).await?;
let source_ds = snapshot.open(source_table_key).await?;
let batch = batch_for_schema_apply_rewrite(
db,
&source_ds,
source_table_key,
&db.catalog,
target_table_key,
&desired_catalog,
property_renames.get(target_table_key),
)
.await?;
let table_path = table_path_for_table_key(target_table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
table_registrations.insert(target_table_key.clone(), table_path);
table_updates.insert(
target_table_key.clone(),
crate::db::SubTableUpdate {
table_key: target_table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
table_tombstones.insert(
source_table_key.clone(),
source_entry.table_version.saturating_add(1),
);
}
for table_key in &rewritten_tables {
if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
continue;
}
let entry = snapshot.entry(table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing source table '{}' for schema apply",
table_key
))
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
let source_ds = snapshot.open(table_key).await?;
let batch = batch_for_schema_apply_rewrite(
db,
&source_ds,
table_key,
&db.catalog,
table_key,
&desired_catalog,
property_renames.get(table_key),
)
.await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
}
for table_key in &indexed_tables {
if added_tables.contains(table_key)
|| renamed_tables.contains_key(table_key)
|| rewritten_tables.contains(table_key)
{
continue;
}
let entry = snapshot.entry(table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing table '{}' for schema index apply",
table_key
))
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let mut ds = db
.table_store
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
},
);
}
let mut manifest_changes = Vec::new();
for (table_key, table_path) in table_registrations {
manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
table_key,
table_path,
}));
}
for update in table_updates.into_values() {
manifest_changes.push(ManifestChange::Update(update));
}
for (table_key, tombstone_version) in table_tombstones {
manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
table_key,
tombstone_version,
}));
}
db.refresh().await?;
if db.version() != base_manifest_version {
return Err(OmniError::manifest_conflict(format!(
"schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
base_manifest_version,
db.version()
)));
}
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = db
.coordinator
.commit_changes_with_actor(&manifest_changes, actor_id.as_deref())
.await?;
let schema_path = join_uri(&db.root_uri, SCHEMA_SOURCE_FILENAME);
db.storage
.write_text(&schema_path, desired_schema_source)
.await?;
write_schema_contract(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
db.catalog = desired_catalog;
db.schema_source = desired_schema_source.to_string();
db.coordinator.refresh().await?;
db.runtime_cache.invalidate_all().await;
if changed_edge_tables {
db.invalidate_graph_index().await;
}
Ok(SchemaApplyResult {
supported: true,
applied: true,
manifest_version,
steps: plan.steps,
})
}
pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str) -> Result<()> {
db.refresh().await?;
ensure_schema_apply_not_locked(db, operation).await
}
pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.refresh().await?;
let branches = db.coordinator.all_branches().await?;
if branches
.iter()
.any(|branch| is_schema_apply_lock_branch(branch))
{
return Err(OmniError::manifest_conflict(
"schema apply is already in progress".to_string(),
));
}
db.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh().await?;
let blocking_branches = db
.coordinator
.all_branches()
.await?
.into_iter()
.filter(|branch| branch != "main" && !is_schema_apply_lock_branch(branch))
.collect::<Vec<_>>();
if !blocking_branches.is_empty() {
let _ = release_schema_apply_lock(db).await;
return Err(OmniError::manifest_conflict(format!(
"schema apply requires a repo with only main; found non-main branches: {}",
blocking_branches.join(", ")
)));
}
Ok(())
}
pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
db.coordinator
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh().await
}
pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
if db
.coordinator
.all_branches()
.await?
.iter()
.any(|branch| is_schema_apply_lock_branch(branch))
{
return Err(OmniError::manifest_conflict(format!(
"{} is unavailable while schema apply is in progress",
operation
)));
}
Ok(())
}
pub(super) async fn ensure_snapshot_entry_head_matches(
db: &Omnigraph,
entry: &SubTableEntry,
) -> Result<()> {
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let ds = db
.table_store
.open_dataset_head_for_write(
&entry.table_key,
&dataset_uri,
entry.table_branch.as_deref(),
)
.await?;
db.table_store
.ensure_expected_version(&ds, &entry.table_key, entry.table_version)
}
pub(super) async fn batch_for_table_rewrite(
db: &Omnigraph,
source_ds: &Dataset,
table_key: &str,
) -> Result<RecordBatch> {
batch_for_schema_apply_rewrite(
db,
source_ds,
table_key,
&db.catalog,
table_key,
&db.catalog,
None,
)
.await
}
pub(super) async fn batch_for_schema_apply_rewrite(
db: &Omnigraph,
source_ds: &Dataset,
source_table_key: &str,
source_catalog: &Catalog,
target_table_key: &str,
target_catalog: &Catalog,
property_renames: Option<&HashMap<String, String>>,
) -> Result<RecordBatch> {
let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
let source_blob_properties = blob_properties_for_table_key(source_catalog, source_table_key)?;
let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
let batches = if needs_row_ids {
db.table_store()
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
.await?
} else {
db.table_store().scan_batches(source_ds).await?
};
if batches.is_empty() {
return Ok(RecordBatch::new_empty(target_schema));
}
let source_schema = batches[0].schema();
let batch = concat_or_empty_batches(source_schema, batches)?;
let row_ids = if needs_row_ids {
Some(
batch
.column_by_name("_rowid")
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| {
OmniError::Lance(format!(
"expected _rowid column when rewriting '{}'",
source_table_key
))
})?
.values()
.iter()
.copied()
.collect::<Vec<_>>(),
)
} else {
None
};
let mut columns = Vec::with_capacity(target_schema.fields().len());
for field in target_schema.fields() {
let source_name = property_renames
.and_then(|renames| renames.get(field.name()))
.map(String::as_str)
.unwrap_or_else(|| field.name().as_str());
if let Some(column) = batch.column_by_name(source_name) {
if target_blob_properties.contains(field.name())
&& source_blob_properties.contains(source_name)
{
let descriptions =
column
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
OmniError::Lance(format!(
"expected blob descriptions for '{}.{}'",
source_table_key, source_name
))
})?;
let rebuilt = rebuild_blob_column(
db,
source_ds,
source_name,
descriptions,
row_ids.as_deref().unwrap_or(&[]),
)
.await?;
columns.push(rebuilt);
} else {
columns.push(column.clone());
}
} else {
columns.push(new_null_array(field.data_type(), batch.num_rows()));
}
}
RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
}
async fn rebuild_blob_column(
_db: &Omnigraph,
source_ds: &Dataset,
column_name: &str,
descriptions: &StructArray,
row_ids: &[u64],
) -> Result<Arc<dyn Array>> {
let mut builder = BlobArrayBuilder::new(row_ids.len());
let mut non_null_row_ids = Vec::new();
let mut row_has_blob = Vec::with_capacity(row_ids.len());
for row in 0..row_ids.len() {
let is_null = blob_description_is_null(descriptions, row)?;
row_has_blob.push(!is_null);
if !is_null {
non_null_row_ids.push(row_ids[row]);
}
}
let blob_files = if non_null_row_ids.is_empty() {
Vec::new()
} else {
Arc::new(source_ds.clone())
.take_blobs(&non_null_row_ids, column_name)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
};
let mut files = blob_files.into_iter();
for has_blob in row_has_blob {
if !has_blob {
builder
.push_null()
.map_err(|e| OmniError::Lance(e.to_string()))?;
continue;
}
let blob = files.next().ok_or_else(|| {
OmniError::Lance(format!(
"blob rewrite for '{}' lost alignment with source rows",
column_name
))
})?;
if let Some(uri) = blob.uri() {
builder
.push_uri(uri)
.map_err(|e| OmniError::Lance(e.to_string()))?;
} else {
builder
.push_bytes(
blob.read()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
}
if files.next().is_some() {
return Err(OmniError::Lance(format!(
"blob rewrite for '{}' produced extra source blobs",
column_name
)));
}
builder
.finish()
.map_err(|e| OmniError::Lance(e.to_string()))
}

View file

@ -0,0 +1,506 @@
use super::*;
pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index::GraphIndex>> {
db.ensure_schema_state_valid().await?;
let resolved = db
.coordinator
.resolve_target(&ReadTarget::Branch(
db.coordinator
.current_branch()
.unwrap_or("main")
.to_string(),
))
.await?;
db.runtime_cache.graph_index(&resolved, &db.catalog).await
}
pub(super) async fn graph_index_for_resolved(
db: &Omnigraph,
resolved: &ResolvedTarget,
) -> Result<Arc<crate::graph_index::GraphIndex>> {
db.runtime_cache.graph_index(resolved, &db.catalog).await
}
pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
ensure_indices_for_branch(db, current_branch.as_deref()).await
}
pub(super) async fn ensure_indices_on(db: &mut Omnigraph, branch: &str) -> Result<()> {
let branch = normalize_branch_name(branch)?;
ensure_indices_for_branch(db, branch.as_deref()).await
}
pub(super) async fn ensure_indices_for_branch(
db: &mut Omnigraph,
branch: Option<&str>,
) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("ensure_indices").await?;
let resolved = db.resolved_branch_target(branch).await?;
let snapshot = resolved.snapshot;
let mut updates = Vec::new();
let active_branch = resolved.branch;
for type_name in db.catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let (mut ds, resolved_branch) = match active_branch.as_deref() {
Some(active_branch) => match entry.table_branch.as_deref() {
None => continue,
_ => {
open_owned_dataset_for_branch_write(
db,
&table_key,
&full_path,
entry.table_branch.as_deref(),
entry.table_version,
active_branch,
)
.await?
}
},
None => (
db.table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?,
None,
),
};
let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0);
if row_count > 0 {
build_indices_on_dataset(db, &table_key, &mut ds).await?;
}
let state = db.table_store.table_state(&full_path, &ds).await?;
if state.version != entry.table_version
|| resolved_branch.as_deref() != entry.table_branch.as_deref()
{
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch: resolved_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
}
}
for edge_name in db.catalog.edge_types.keys() {
let table_key = format!("edge:{}", edge_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let (mut ds, resolved_branch) = match active_branch.as_deref() {
Some(active_branch) => match entry.table_branch.as_deref() {
None => continue,
_ => {
open_owned_dataset_for_branch_write(
db,
&table_key,
&full_path,
entry.table_branch.as_deref(),
entry.table_version,
active_branch,
)
.await?
}
},
None => (
db.table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?,
None,
),
};
let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0);
if row_count > 0 {
build_indices_on_dataset(db, &table_key, &mut ds).await?;
}
let state = db.table_store.table_state(&full_path, &ds).await?;
if state.version != entry.table_version
|| resolved_branch.as_deref() != entry.table_branch.as_deref()
{
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch: resolved_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
}
}
if !updates.is_empty() {
commit_prepared_updates_on_branch(db, branch, &updates).await?;
}
Ok(())
}
pub(super) async fn open_for_mutation(
db: &Omnigraph,
table_key: &str,
) -> Result<(Dataset, String, Option<String>)> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
open_for_mutation_on_branch(db, current_branch.as_deref(), table_key).await
}
pub(super) async fn open_for_mutation_on_branch(
db: &Omnigraph,
branch: Option<&str>,
table_key: &str,
) -> Result<(Dataset, String, Option<String>)> {
db.ensure_schema_apply_not_locked("write").await?;
let resolved = db.resolved_branch_target(branch).await?;
let entry = resolved
.snapshot
.entry(table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
match resolved.branch.as_deref() {
None => {
let ds = db
.table_store
.open_dataset_head_for_write(table_key, &full_path, None)
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
Ok((ds, full_path, None))
}
Some(active_branch) => {
let (ds, table_branch) = open_owned_dataset_for_branch_write(
db,
table_key,
&full_path,
entry.table_branch.as_deref(),
entry.table_version,
active_branch,
)
.await?;
Ok((ds, full_path, table_branch))
}
}
}
pub(super) async fn open_owned_dataset_for_branch_write(
db: &Omnigraph,
table_key: &str,
full_path: &str,
entry_branch: Option<&str>,
entry_version: u64,
active_branch: &str,
) -> Result<(Dataset, Option<String>)> {
match entry_branch {
Some(branch) if branch == active_branch => {
let ds = db
.table_store
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
Ok((ds, Some(active_branch.to_string())))
}
source_branch => {
fork_dataset_from_entry_state(
db,
table_key,
full_path,
source_branch,
entry_version,
active_branch,
)
.await?;
let ds = db
.table_store
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
Ok((ds, Some(active_branch.to_string())))
}
}
}
pub(super) async fn fork_dataset_from_entry_state(
db: &Omnigraph,
table_key: &str,
full_path: &str,
source_branch: Option<&str>,
source_version: u64,
active_branch: &str,
) -> Result<Dataset> {
db.table_store
.fork_branch_from_state(
full_path,
source_branch,
table_key,
source_version,
active_branch,
)
.await
}
pub(super) async fn reopen_for_mutation(
db: &Omnigraph,
table_key: &str,
full_path: &str,
table_branch: Option<&str>,
expected_version: u64,
) -> Result<Dataset> {
db.ensure_schema_apply_not_locked("write").await?;
db.table_store
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
.await
}
pub(super) async fn open_dataset_at_state(
db: &Omnigraph,
table_path: &str,
table_branch: Option<&str>,
table_version: u64,
) -> Result<Dataset> {
db.table_store
.open_dataset_at_state(table_path, table_branch, table_version)
.await
}
pub(super) async fn build_indices_on_dataset(
db: &Omnigraph,
table_key: &str,
ds: &mut Dataset,
) -> Result<()> {
build_indices_on_dataset_for_catalog(db, &db.catalog, table_key, ds).await
}
pub(super) async fn build_indices_on_dataset_for_catalog(
db: &Omnigraph,
catalog: &Catalog,
table_key: &str,
ds: &mut Dataset,
) -> Result<()> {
if let Some(type_name) = table_key.strip_prefix("node:") {
if !db.table_store.has_btree_index(ds, "id").await? {
db.table_store
.create_btree_index(ds, &["id"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e))
})?;
}
if let Some(node_type) = catalog.node_types.get(type_name) {
for index_cols in &node_type.indices {
if index_cols.len() != 1 {
continue;
}
let prop_name = &index_cols[0];
if let Some(prop_type) = node_type.properties.get(prop_name) {
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.table_store.has_fts_index(ds, prop_name).await? {
db.table_store
.create_inverted_index(ds, prop_name.as_str())
.await
.map_err(|e| {
OmniError::Lance(format!(
"create Inverted index on {}({}): {}",
table_key, prop_name, e
))
})?;
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.table_store.has_vector_index(ds, prop_name).await? {
db.table_store
.create_vector_index(ds, prop_name.as_str())
.await
.map_err(|e| {
OmniError::Lance(format!(
"create Vector index on {}({}): {}",
table_key, prop_name, e
))
})?;
}
}
}
}
}
return Ok(());
}
if table_key.starts_with("edge:") {
if !db.table_store.has_btree_index(ds, "id").await? {
db.table_store
.create_btree_index(ds, &["id"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e))
})?;
}
if !db.table_store.has_btree_index(ds, "src").await? {
db.table_store
.create_btree_index(ds, &["src"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(src): {}", table_key, e))
})?;
}
if !db.table_store.has_btree_index(ds, "dst").await? {
db.table_store
.create_btree_index(ds, &["dst"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(dst): {}", table_key, e))
})?;
}
return Ok(());
}
Err(OmniError::manifest(format!(
"invalid table key '{}'",
table_key
)))
}
async fn prepare_updates_for_commit(
db: &Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
) -> Result<Vec<crate::db::SubTableUpdate>> {
if updates.is_empty() {
return Ok(Vec::new());
}
let snapshot = db.snapshot_for_branch(branch).await?;
let mut prepared = Vec::with_capacity(updates.len());
for update in updates {
let Some(entry) = snapshot.entry(&update.table_key) else {
return Err(OmniError::manifest(format!(
"no manifest entry for {}",
update.table_key
)));
};
let mut prepared_update = update.clone();
if prepared_update.row_count > 0 {
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let mut ds = reopen_for_mutation(
db,
&prepared_update.table_key,
&full_path,
prepared_update.table_branch.as_deref(),
prepared_update.table_version,
)
.await?;
build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
let state = db.table_store.table_state(&full_path, &ds).await?;
prepared_update.table_version = state.version;
prepared_update.row_count = state.row_count;
prepared_update.version_metadata = state.version_metadata;
}
prepared.push(prepared_update);
}
Ok(prepared)
}
async fn commit_prepared_updates(
db: &mut Omnigraph,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = db
.coordinator
.commit_updates_with_actor(updates, actor_id.as_deref())
.await?;
Ok(manifest_version)
}
pub(super) async fn commit_prepared_updates_on_branch(
db: &mut Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates(db, updates).await;
}
let mut coordinator = match requested_branch.as_deref() {
Some(branch) => {
GraphCoordinator::open_branch(db.uri(), branch, Arc::clone(&db.storage)).await?
}
None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
};
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
_snapshot_id: _,
} = coordinator
.commit_updates_with_actor(updates, actor_id.as_deref())
.await?;
Ok(manifest_version)
}
pub(super) async fn commit_updates(
db: &mut Omnigraph,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.ensure_schema_apply_not_locked("write commit").await?;
let current_branch = db.coordinator.current_branch().map(str::to_string);
let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
commit_prepared_updates(db, &prepared).await
}
pub(super) async fn commit_manifest_updates(
db: &mut Omnigraph,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.coordinator.commit_manifest_updates(updates).await
}
pub(super) async fn record_merge_commit(
db: &mut Omnigraph,
manifest_version: u64,
parent_commit_id: &str,
merged_parent_commit_id: &str,
) -> Result<String> {
let actor_id = db.current_audit_actor().map(str::to_string);
db.coordinator
.record_merge_commit(
manifest_version,
parent_commit_id,
merged_parent_commit_id,
actor_id.as_deref(),
)
.await
.map(|snapshot_id| snapshot_id.as_str().to_string())
}
pub(super) async fn commit_updates_on_branch(
db: &mut Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.ensure_schema_apply_not_locked("write commit").await?;
let prepared = prepare_updates_for_commit(db, branch, updates).await?;
commit_prepared_updates_on_branch(db, branch, &prepared).await
}
pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> {
db.coordinator.ensure_commit_graph_initialized().await
}
pub(super) async fn invalidate_graph_index(db: &Omnigraph) {
db.runtime_cache.invalidate_all().await;
}