Parallel per-type load writes + omnigraph optimize/cleanup CLI (#46)

* Parallel per-type load writes + omnigraph optimize/cleanup CLI

## MR-677.3 — parallel per-type load writes

The load path already groups records into one RecordBatch per type and
makes one Lance commit per table (loader::mod.rs:249-..), but those
commits ran sequentially. Wrap node and edge write loops in
`futures::stream::buffered(N)` against a new helper
`write_batches_concurrently`. Concurrency tunable via
`OMNIGRAPH_LOAD_CONCURRENCY` (default 8).

## MR-676 — `omnigraph optimize` and `omnigraph cleanup`

New CLI subcommands that walk every node + edge table in the repo:

- `omnigraph optimize <uri>` — runs Lance `compact_files` on each
  table to merge small fragments into fewer larger ones.
- `omnigraph cleanup <uri> --keep N | --older-than 7d --confirm` —
  runs Lance `cleanup_old_versions` to prune historical manifests +
  unique fragments. Requires `--confirm` because it's destructive.
  Supports both count-based and time-based retention (or both AND'd
  together). Time uses chrono `DateTime<Utc>` (added as a workspace
  dep, default-features off).

Both commands run their per-table loops in parallel (8-way bounded,
`OMNIGRAPH_MAINTENANCE_CONCURRENCY` env override). Smoke-tested
against the 114-table prod graph: optimize went 7m15s sequential
→ 1m28s parallel. cleanup --keep 1 removed 137 historical versions
across 114 tables in 1m57s without disrupting `/healthz` or query
responses.

Public API on `Omnigraph`:

  pub async fn optimize(&mut self) -> Result<Vec<TableOptimizeStats>>
  pub async fn cleanup(&mut self, opts: CleanupPolicyOptions)
      -> Result<Vec<TableCleanupStats>>

All 10 existing loader tests still pass.

Closes MR-676.
Partially addresses MR-677 (the .3 — parallel by type — piece;
MR-677.1 is for the `omnigraph embed` path, not load, since load
doesn't call Gemini directly. .2 was already in place).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: regenerate openapi.json

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Andrew Altshuler 2026-04-25 14:22:14 +03:00 committed by GitHub
parent 628bc2e607
commit 74eb5a5380
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 498 additions and 31 deletions

9
Cargo.lock generated
View file

@ -4596,7 +4596,7 @@ dependencies = [
[[package]]
name = "omnigraph-cli"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"assert_cmd",
"clap",
@ -4616,7 +4616,7 @@ dependencies = [
[[package]]
name = "omnigraph-compiler"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"ahash",
"arrow-array",
@ -4637,7 +4637,7 @@ dependencies = [
[[package]]
name = "omnigraph-engine"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"arrow-array",
"arrow-cast",
@ -4646,6 +4646,7 @@ dependencies = [
"arrow-select",
"async-trait",
"base64",
"chrono",
"fail",
"futures",
"lance",
@ -4674,7 +4675,7 @@ dependencies = [
[[package]]
name = "omnigraph-server"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"async-trait",
"aws-config",

View file

@ -39,6 +39,7 @@ lance-table = "4.0.0"
ulid = "1"
futures = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
pest = "2"
pest_derive = "2"
thiserror = "2"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-cli"
version = "0.3.0"
version = "0.3.1"
edition = "2024"
description = "CLI for the Omnigraph graph database."
license = "MIT"
@ -13,9 +13,9 @@ name = "omnigraph"
path = "src/main.rs"
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" }
omnigraph-server = { path = "../omnigraph-server", version = "0.3.0" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" }
omnigraph-server = { path = "../omnigraph-server", version = "0.3.1" }
clap = { workspace = true }
color-eyre = { workspace = true }
serde = { workspace = true }

View file

@ -214,6 +214,39 @@ enum Command {
#[command(subcommand)]
command: PolicyCommand,
},
/// Compact small Lance fragments in every table of the repo
Optimize {
/// Repo URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
/// Remove old Lance versions from every table of the repo (destructive)
Cleanup {
/// Repo URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Number of recent versions to keep per table. Either `--keep` or
/// `--older-than` (or both) must be set.
#[arg(long)]
keep: Option<u32>,
/// Only remove versions older than this duration. Accepts Go-style
/// durations: `7d`, `24h`, `90m`. At least one of --keep / --older-than.
#[arg(long)]
older_than: Option<String>,
/// Required to actually run; without it, prints what would be removed
#[arg(long)]
confirm: bool,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Subcommand)]
@ -795,6 +828,31 @@ fn resolve_uri(
config.resolve_target_uri(cli_uri, cli_target, config.cli_graph_name())
}
/// Parse a Go-style compact duration: `7d`, `24h`, `30m`, `90s`, or a plain
/// integer as seconds. Used by the `cleanup --older-than` flag.
fn parse_duration_arg(s: &str) -> Result<std::time::Duration> {
let s = s.trim();
if s.is_empty() {
bail!("duration is empty");
}
let (num_part, unit) = match s.char_indices().rev().find(|(_, c)| c.is_ascii_alphabetic()) {
Some((i, _)) => (&s[..i + 1 - s[i..].chars().next().unwrap().len_utf8()], &s[i..]),
None => (s, ""),
};
let n: u64 = num_part
.parse()
.map_err(|e| color_eyre::eyre::eyre!("invalid duration '{}': {}", s, e))?;
let secs = match unit {
"" | "s" => n,
"m" => n * 60,
"h" => n * 60 * 60,
"d" => n * 60 * 60 * 24,
"w" => n * 60 * 60 * 24 * 7,
_ => bail!("unknown duration unit '{}'. Supported: s, m, h, d, w", unit),
};
Ok(std::time::Duration::from_secs(secs))
}
fn resolve_local_uri(
config: &OmnigraphConfig,
cli_uri: Option<String>,
@ -2465,6 +2523,111 @@ async fn main() -> Result<()> {
print_policy_explain(&decision, &request);
}
},
Command::Optimize {
uri,
target,
config,
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let mut db = Omnigraph::open(&uri).await?;
let stats = db.optimize().await?;
if json {
let value = serde_json::json!({
"uri": uri,
"tables": stats.iter().map(|s| serde_json::json!({
"table_key": s.table_key,
"fragments_removed": s.fragments_removed,
"fragments_added": s.fragments_added,
"committed": s.committed,
})).collect::<Vec<_>>(),
});
print_json(&value)?;
} else {
println!("optimize {}{} tables", uri, stats.len());
for s in &stats {
if s.committed {
println!(
" {:<40} frags {} → {} ✓",
s.table_key,
s.fragments_removed + s.fragments_added - s.fragments_added,
s.fragments_added
);
} else {
println!(" {:<40} no-op", s.table_key);
}
}
}
}
Command::Cleanup {
uri,
target,
config,
keep,
older_than,
confirm,
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let older_than_dur = older_than
.as_deref()
.map(parse_duration_arg)
.transpose()?;
if keep.is_none() && older_than_dur.is_none() {
bail!("cleanup requires at least one of --keep or --older-than");
}
let policy_desc = match (keep, older_than_dur) {
(Some(k), Some(d)) => format!("keep {} versions, remove anything older than {:?}", k, d),
(Some(k), None) => format!("keep {} versions", k),
(None, Some(d)) => format!("remove anything older than {:?}", d),
_ => unreachable!(),
};
if !confirm {
eprintln!(
"cleanup is destructive — rerun with --confirm. Policy for {}: {}",
uri, policy_desc
);
return Ok(());
}
let options = omnigraph::db::CleanupPolicyOptions {
keep_versions: keep,
older_than: older_than_dur,
};
let mut db = Omnigraph::open(&uri).await?;
let stats = db.cleanup(options).await?;
if json {
let value = serde_json::json!({
"uri": uri,
"keep_versions": keep,
"older_than_secs": older_than_dur.map(|d| d.as_secs()),
"tables": stats.iter().map(|s| serde_json::json!({
"table_key": s.table_key,
"bytes_removed": s.bytes_removed,
"old_versions_removed": s.old_versions_removed,
})).collect::<Vec<_>>(),
});
print_json(&value)?;
} else {
let total_bytes: u64 = stats.iter().map(|s| s.bytes_removed).sum();
let total_versions: u64 = stats.iter().map(|s| s.old_versions_removed).sum();
println!(
"cleanup {} ({}) — removed {} versions ({} bytes) across {} tables",
uri,
policy_desc,
total_versions,
total_bytes,
stats.len()
);
}
}
}
Ok(())
}

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-compiler"
version = "0.3.0"
version = "0.3.1"
edition = "2024"
description = "Schema/query compiler for Omnigraph. Zero Lance dependency."
license = "MIT"

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-server"
version = "0.3.0"
version = "0.3.1"
edition = "2024"
description = "HTTP server for the Omnigraph graph database."
license = "MIT"
@ -19,8 +19,8 @@ default = []
aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"]
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" }
axum = { workspace = true }
clap = { workspace = true }
color-eyre = { workspace = true }

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-engine"
version = "0.3.0"
version = "0.3.1"
edition = "2024"
description = "Runtime engine for the Omnigraph graph database."
license = "MIT"
@ -16,7 +16,7 @@ default = []
failpoints = ["dep:fail", "fail/failpoints"]
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" }
lance = { workspace = true }
lance-datafusion = { workspace = true }
lance-file = { workspace = true }
@ -45,9 +45,10 @@ fail = { workspace = true, optional = true }
time = { workspace = true }
async-trait = { workspace = true }
url = { workspace = true }
chrono = { workspace = true }
[dev-dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
serial_test = "3"

View file

@ -8,7 +8,10 @@ mod schema_state;
pub use commit_graph::GraphCommit;
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult};
pub use omnigraph::{
CleanupPolicyOptions, MergeOutcome, Omnigraph, SchemaApplyResult, TableCleanupStats,
TableOptimizeStats,
};
pub(crate) use run_registry::is_internal_run_branch;
pub use run_registry::{RunId, RunRecord, RunStatus};

View file

@ -29,9 +29,12 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_u
use crate::table_store::TableStore;
mod export;
mod optimize;
mod schema_apply;
mod table_ops;
pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
use super::commit_graph::GraphCommit;
use super::manifest::{
ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
@ -440,6 +443,22 @@ impl Omnigraph {
table_ops::ensure_indices_on(self, branch).await
}
/// Compact small Lance fragments into fewer larger ones across every
/// node + edge table on `main`. See [`optimize`] for details.
pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
optimize::optimize_all_tables(self).await
}
/// Remove Lance manifests (and the fragments they uniquely own) per the
/// given [`optimize::CleanupPolicyOptions`]. Destructive to version
/// history. See [`optimize`] for details.
pub async fn cleanup(
&mut self,
options: optimize::CleanupPolicyOptions,
) -> Result<Vec<optimize::TableCleanupStats>> {
optimize::cleanup_all_tables(self, options).await
}
/// Read a blob from a node by its string ID and property name.
///
/// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,

View file

@ -0,0 +1,210 @@
//! Lance compaction + version cleanup exposed at the graph level.
//!
//! Lance accumulates many small `.lance` fragment files per table over the
//! life of a repo: each `write`, `load`, and `change` op appends one or more
//! fragments and a new manifest. Over long timescales this hurts open times
//! and S3 object counts without improving anything.
//!
//! Two dials:
//!
//! * `optimize_all_tables` — Lance `compact_files` on every table. Rewrites
//! small fragments into fewer large ones. Non-destructive (creates a new
//! version; old fragments remain reachable via older manifest versions).
//! * `cleanup_all_tables` — Lance `cleanup_old_versions` on every table.
//! Removes manifests (and their unique fragments) older than the configured
//! retention. Destructive to version history — callers should gate this
//! behind an explicit confirm flag at the CLI layer.
//!
//! Both walk every node + edge table on the `main` branch. Run branches are
//! ephemeral by design (MR-670 / MR-674) so we do not optimize them.
use std::time::Duration;
use chrono::Utc;
use futures::stream::StreamExt;
use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{CompactionMetrics, CompactionOptions, compact_files};
use super::*;
/// How many tables to optimize/cleanup concurrently. Each hits a separate
/// Lance dataset so there is no shared state; the bound is there to avoid
/// flooding the runtime and the S3 connection pool.
const DEFAULT_MAINT_CONCURRENCY: usize = 8;
fn maint_concurrency() -> usize {
std::env::var("OMNIGRAPH_MAINTENANCE_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_MAINT_CONCURRENCY)
}
/// Retention knobs for [`cleanup_all_tables`]. At least one must be set or
/// nothing is cleaned. If both are set, Lance applies them as AND (a manifest
/// is kept if it satisfies either — i.e. only manifests older than BOTH the
/// time cutoff AND the version cutoff are removed).
#[derive(Debug, Clone, Default)]
pub struct CleanupPolicyOptions {
/// Keep this many most-recent versions per table.
pub keep_versions: Option<u32>,
/// Only remove versions older than this duration.
pub older_than: Option<Duration>,
}
/// Per-table outcome of `optimize_all_tables`.
#[derive(Debug, Clone)]
pub struct TableOptimizeStats {
pub table_key: String,
/// Number of source fragments that were rewritten by Lance.
pub fragments_removed: usize,
/// Number of new, larger fragments Lance produced.
pub fragments_added: usize,
/// Did this table get a new Lance manifest version from the compaction?
pub committed: bool,
}
/// Per-table outcome of `cleanup_all_tables`.
#[derive(Debug, Clone)]
pub struct TableCleanupStats {
pub table_key: String,
pub bytes_removed: u64,
pub old_versions_removed: u64,
}
/// Run Lance `compact_files` on every node + edge table on `main`.
/// Tables run in parallel (bounded concurrency).
pub async fn optimize_all_tables(db: &mut Omnigraph) -> Result<Vec<TableOptimizeStats>> {
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("optimize").await?;
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<_> = all_table_keys(&db.catalog)
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
Some((table_key, full_path))
})
.collect();
if table_tasks.is_empty() {
return Ok(Vec::new());
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let table_store = &db.table_store;
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path)| async move {
let mut ds = table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let version_before = ds.version().version;
let metrics: CompactionMetrics =
compact_files(&mut ds, CompactionOptions::default(), None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let version_after = ds.version().version;
Ok(TableOptimizeStats {
table_key,
fragments_removed: metrics.fragments_removed,
fragments_added: metrics.fragments_added,
committed: version_after != version_before,
})
})
.buffer_unordered(concurrency)
.collect()
.await;
stats.into_iter().collect()
}
/// Run Lance `cleanup_old_versions` on every node + edge table on `main`,
/// using [`CleanupPolicyOptions`]. The latest manifest is always preserved
/// regardless (Lance invariant).
pub async fn cleanup_all_tables(
db: &mut Omnigraph,
options: CleanupPolicyOptions,
) -> Result<Vec<TableCleanupStats>> {
if options.keep_versions.is_none() && options.older_than.is_none() {
return Err(OmniError::manifest(
"cleanup requires at least one of keep_versions or older_than",
));
}
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("cleanup").await?;
let before_timestamp = options.older_than.map(|d| Utc::now() - d);
let keep_versions = options.keep_versions;
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<_> = all_table_keys(&db.catalog)
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
Some((table_key, full_path))
})
.collect();
if table_tasks.is_empty() {
return Ok(Vec::new());
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let table_store = &db.table_store;
let results: Vec<Result<TableCleanupStats>> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path)| async move {
let ds = table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let before_version = keep_versions
.map(|n| ds.version().version.saturating_sub(n as u64))
.filter(|v| *v > 0);
let policy = CleanupPolicy {
before_timestamp,
before_version,
delete_unverified: false,
error_if_tagged_old_versions: false,
clean_referenced_branches: false,
delete_rate_limit: None,
};
let removed: RemovalStats =
lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(TableCleanupStats {
table_key,
bytes_removed: removed.bytes_removed,
old_versions_removed: removed.old_versions,
})
})
.buffer_unordered(concurrency)
.collect()
.await;
results.into_iter().collect()
}
fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec<String> {
let mut keys: Vec<String> = catalog
.node_types
.keys()
.map(|n| format!("node:{}", n))
.chain(
catalog
.edge_types
.keys()
.map(|n| format!("edge:{}", n)),
)
.collect();
keys.sort();
keys
}

View file

@ -323,30 +323,38 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Phase 2: Build per-type RecordBatches and write to Lance
// Phase 2: Build per-type RecordBatches and write to Lance.
//
// Writes to different tables are independent in Lance (each table has its
// own manifest + fragments), so we parallelize across types with a bounded
// concurrency limit. Serial writes against S3 were the dominant cost of
// load — batching and parallelizing per-type cuts wall time by roughly
// `LOAD_WRITE_CONCURRENCY`× for wide schemas (see MR-677).
let mut updates = Vec::new();
let mut result = LoadResult::default();
let snapshot = db.snapshot_for_branch(branch).await?;
// Write nodes first (edges reference node IDs)
// Phase 2a: build and validate every node batch up front. Cheap and
// synchronous — surfaces validation errors before any S3 traffic.
let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
Vec::with_capacity(node_rows.len());
for (type_name, rows) in &node_rows {
let node_type = &catalog.node_types[type_name];
let batch = build_node_batch(node_type, rows)?;
// Validate value constraints before writing
validate_value_constraints(&batch, node_type)?;
let loaded_count = batch.num_rows();
let table_key = format!("node:{}", type_name);
snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
}
let (state, table_branch) =
write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
// Phase 2b: write every node type concurrently, bounded.
let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
@ -354,7 +362,7 @@ async fn load_jsonl_reader<R: BufRead>(
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.nodes_loaded.insert(type_name.clone(), loaded_count);
result.nodes_loaded.insert(type_name, loaded_count);
}
// Phase 2b: Validate edge referential integrity — every src/dst must
@ -402,20 +410,23 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Write edges
// Write edges (parallel per edge type, same pattern as nodes)
let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
Vec::with_capacity(edge_rows.len());
for (edge_name, rows) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let batch = build_edge_batch(edge_type, rows)?;
let loaded_count = batch.num_rows();
let table_key = format!("edge:{}", edge_name);
snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
}
let (state, table_branch) =
write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
@ -423,7 +434,7 @@ async fn load_jsonl_reader<R: BufRead>(
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.edges_loaded.insert(edge_name.clone(), loaded_count);
result.edges_loaded.insert(edge_name, loaded_count);
}
// Phase 3: Validate edge cardinality constraints (before commit — invalid
@ -954,6 +965,64 @@ fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
}
/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
/// How many per-type Lance writes to run concurrently during a load.
///
/// Each write is an independent S3 manifest + fragment write against a
/// different table. Ops within a single table must still be serial (Lance
/// OCC on the manifest), but cross-table writes have no shared state.
///
/// 8 is a conservative default — enough to overlap S3 round-trip latency
/// across the typical 10-30 table schemas without flooding the runtime.
/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
fn load_write_concurrency() -> usize {
std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
}
/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
/// concurrently. Returns results in original iteration order so callers can
/// zip them back to per-type metadata.
async fn write_batches_concurrently(
db: &Omnigraph,
branch: Option<&str>,
mode: LoadMode,
prepared: Vec<(String, String, RecordBatch, usize)>,
) -> Result<
Vec<(
String,
String,
usize,
crate::table_store::TableState,
Option<String>,
)>,
> {
use futures::stream::StreamExt;
if prepared.is_empty() {
return Ok(Vec::new());
}
let concurrency = load_write_concurrency().min(prepared.len()).max(1);
futures::stream::iter(prepared.into_iter().map(
|(type_name, table_key, batch, loaded_count)| async move {
let (state, table_branch) =
write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
},
))
.buffered(concurrency)
.collect::<Vec<Result<_>>>()
.await
.into_iter()
.collect()
}
async fn write_batch_to_dataset(
db: &Omnigraph,
branch: Option<&str>,

View file

@ -7,7 +7,7 @@
"name": "MIT",
"identifier": "MIT"
},
"version": "0.3.0"
"version": "0.3.1"
},
"paths": {
"/branches": {