diff --git a/Cargo.lock b/Cargo.lock index 034c878..9eafcf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4596,7 +4596,7 @@ dependencies = [ [[package]] name = "omnigraph-cli" -version = "0.3.0" +version = "0.3.1" dependencies = [ "assert_cmd", "clap", @@ -4616,7 +4616,7 @@ dependencies = [ [[package]] name = "omnigraph-compiler" -version = "0.3.0" +version = "0.3.1" dependencies = [ "ahash", "arrow-array", @@ -4637,7 +4637,7 @@ dependencies = [ [[package]] name = "omnigraph-engine" -version = "0.3.0" +version = "0.3.1" dependencies = [ "arrow-array", "arrow-cast", @@ -4646,6 +4646,7 @@ dependencies = [ "arrow-select", "async-trait", "base64", + "chrono", "fail", "futures", "lance", @@ -4674,7 +4675,7 @@ dependencies = [ [[package]] name = "omnigraph-server" -version = "0.3.0" +version = "0.3.1" dependencies = [ "async-trait", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index 1e7129d..1766b07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ lance-table = "4.0.0" ulid = "1" futures = "0.3" async-trait = "0.1" +chrono = { version = "0.4", default-features = false, features = ["clock"] } pest = "2" pest_derive = "2" thiserror = "2" diff --git a/crates/omnigraph-cli/Cargo.toml b/crates/omnigraph-cli/Cargo.toml index dd30eca..0813d12 100644 --- a/crates/omnigraph-cli/Cargo.toml +++ b/crates/omnigraph-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-cli" -version = "0.3.0" +version = "0.3.1" edition = "2024" description = "CLI for the Omnigraph graph database." license = "MIT" @@ -13,9 +13,9 @@ name = "omnigraph" path = "src/main.rs" [dependencies] -omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.0" } -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" } -omnigraph-server = { path = "../omnigraph-server", version = "0.3.0" } +omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.1" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" } +omnigraph-server = { path = "../omnigraph-server", version = "0.3.1" } clap = { workspace = true } color-eyre = { workspace = true } serde = { workspace = true } diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index c29cac6..03d1d14 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -214,6 +214,39 @@ enum Command { #[command(subcommand)] command: PolicyCommand, }, + /// Compact small Lance fragments in every table of the repo + Optimize { + /// Repo URI + uri: Option, + #[arg(long)] + target: Option, + #[arg(long)] + config: Option, + #[arg(long)] + json: bool, + }, + /// Remove old Lance versions from every table of the repo (destructive) + Cleanup { + /// Repo URI + uri: Option, + #[arg(long)] + target: Option, + #[arg(long)] + config: Option, + /// Number of recent versions to keep per table. Either `--keep` or + /// `--older-than` (or both) must be set. + #[arg(long)] + keep: Option, + /// Only remove versions older than this duration. Accepts Go-style + /// durations: `7d`, `24h`, `90m`. At least one of --keep / --older-than. + #[arg(long)] + older_than: Option, + /// Required to actually run; without it, prints what would be removed + #[arg(long)] + confirm: bool, + #[arg(long)] + json: bool, + }, } #[derive(Debug, Subcommand)] @@ -795,6 +828,31 @@ fn resolve_uri( config.resolve_target_uri(cli_uri, cli_target, config.cli_graph_name()) } +/// Parse a Go-style compact duration: `7d`, `24h`, `30m`, `90s`, or a plain +/// integer as seconds. Used by the `cleanup --older-than` flag. +fn parse_duration_arg(s: &str) -> Result { + let s = s.trim(); + if s.is_empty() { + bail!("duration is empty"); + } + let (num_part, unit) = match s.char_indices().rev().find(|(_, c)| c.is_ascii_alphabetic()) { + Some((i, _)) => (&s[..i + 1 - s[i..].chars().next().unwrap().len_utf8()], &s[i..]), + None => (s, ""), + }; + let n: u64 = num_part + .parse() + .map_err(|e| color_eyre::eyre::eyre!("invalid duration '{}': {}", s, e))?; + let secs = match unit { + "" | "s" => n, + "m" => n * 60, + "h" => n * 60 * 60, + "d" => n * 60 * 60 * 24, + "w" => n * 60 * 60 * 24 * 7, + _ => bail!("unknown duration unit '{}'. Supported: s, m, h, d, w", unit), + }; + Ok(std::time::Duration::from_secs(secs)) +} + fn resolve_local_uri( config: &OmnigraphConfig, cli_uri: Option, @@ -2465,6 +2523,111 @@ async fn main() -> Result<()> { print_policy_explain(&decision, &request); } }, + Command::Optimize { + uri, + target, + config, + json, + } => { + let config = load_cli_config(config.as_ref())?; + let uri = resolve_uri(&config, uri, target.as_deref())?; + let mut db = Omnigraph::open(&uri).await?; + let stats = db.optimize().await?; + if json { + let value = serde_json::json!({ + "uri": uri, + "tables": stats.iter().map(|s| serde_json::json!({ + "table_key": s.table_key, + "fragments_removed": s.fragments_removed, + "fragments_added": s.fragments_added, + "committed": s.committed, + })).collect::>(), + }); + print_json(&value)?; + } else { + println!("optimize {} — {} tables", uri, stats.len()); + for s in &stats { + if s.committed { + println!( + " {:<40} frags {} → {} ✓", + s.table_key, + s.fragments_removed + s.fragments_added - s.fragments_added, + s.fragments_added + ); + } else { + println!(" {:<40} no-op", s.table_key); + } + } + } + } + Command::Cleanup { + uri, + target, + config, + keep, + older_than, + confirm, + json, + } => { + let config = load_cli_config(config.as_ref())?; + let uri = resolve_uri(&config, uri, target.as_deref())?; + + let older_than_dur = older_than + .as_deref() + .map(parse_duration_arg) + .transpose()?; + + if keep.is_none() && older_than_dur.is_none() { + bail!("cleanup requires at least one of --keep or --older-than"); + } + + let policy_desc = match (keep, older_than_dur) { + (Some(k), Some(d)) => format!("keep {} versions, remove anything older than {:?}", k, d), + (Some(k), None) => format!("keep {} versions", k), + (None, Some(d)) => format!("remove anything older than {:?}", d), + _ => unreachable!(), + }; + + if !confirm { + eprintln!( + "cleanup is destructive — rerun with --confirm. Policy for {}: {}", + uri, policy_desc + ); + return Ok(()); + } + + let options = omnigraph::db::CleanupPolicyOptions { + keep_versions: keep, + older_than: older_than_dur, + }; + + let mut db = Omnigraph::open(&uri).await?; + let stats = db.cleanup(options).await?; + if json { + let value = serde_json::json!({ + "uri": uri, + "keep_versions": keep, + "older_than_secs": older_than_dur.map(|d| d.as_secs()), + "tables": stats.iter().map(|s| serde_json::json!({ + "table_key": s.table_key, + "bytes_removed": s.bytes_removed, + "old_versions_removed": s.old_versions_removed, + })).collect::>(), + }); + print_json(&value)?; + } else { + let total_bytes: u64 = stats.iter().map(|s| s.bytes_removed).sum(); + let total_versions: u64 = stats.iter().map(|s| s.old_versions_removed).sum(); + println!( + "cleanup {} ({}) — removed {} versions ({} bytes) across {} tables", + uri, + policy_desc, + total_versions, + total_bytes, + stats.len() + ); + } + } } Ok(()) } diff --git a/crates/omnigraph-compiler/Cargo.toml b/crates/omnigraph-compiler/Cargo.toml index f8aaf04..3a0af1e 100644 --- a/crates/omnigraph-compiler/Cargo.toml +++ b/crates/omnigraph-compiler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-compiler" -version = "0.3.0" +version = "0.3.1" edition = "2024" description = "Schema/query compiler for Omnigraph. Zero Lance dependency." license = "MIT" diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index f808c76..0f938a6 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-server" -version = "0.3.0" +version = "0.3.1" edition = "2024" description = "HTTP server for the Omnigraph graph database." license = "MIT" @@ -19,8 +19,8 @@ default = [] aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"] [dependencies] -omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.0" } -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" } +omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.3.1" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" } axum = { workspace = true } clap = { workspace = true } color-eyre = { workspace = true } diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index fdd520b..0e1bae9 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-engine" -version = "0.3.0" +version = "0.3.1" edition = "2024" description = "Runtime engine for the Omnigraph graph database." license = "MIT" @@ -16,7 +16,7 @@ default = [] failpoints = ["dep:fail", "fail/failpoints"] [dependencies] -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" } lance = { workspace = true } lance-datafusion = { workspace = true } lance-file = { workspace = true } @@ -45,9 +45,10 @@ fail = { workspace = true, optional = true } time = { workspace = true } async-trait = { workspace = true } url = { workspace = true } +chrono = { workspace = true } [dev-dependencies] -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.0" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" } tokio = { workspace = true } lance-namespace-impls = { workspace = true } serial_test = "3" diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index a4f90d1..77c3d26 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -8,7 +8,10 @@ mod schema_state; pub use commit_graph::GraphCommit; pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId}; pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; -pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult}; +pub use omnigraph::{ + CleanupPolicyOptions, MergeOutcome, Omnigraph, SchemaApplyResult, TableCleanupStats, + TableOptimizeStats, +}; pub(crate) use run_registry::is_internal_run_branch; pub use run_registry::{RunId, RunRecord, RunStatus}; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 04e64c4..0f251ba 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -29,9 +29,12 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_u use crate::table_store::TableStore; mod export; +mod optimize; mod schema_apply; mod table_ops; +pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats}; + use super::commit_graph::GraphCommit; use super::manifest::{ ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone, @@ -440,6 +443,22 @@ impl Omnigraph { table_ops::ensure_indices_on(self, branch).await } + /// Compact small Lance fragments into fewer larger ones across every + /// node + edge table on `main`. See [`optimize`] for details. + pub async fn optimize(&mut self) -> Result> { + optimize::optimize_all_tables(self).await + } + + /// Remove Lance manifests (and the fragments they uniquely own) per the + /// given [`optimize::CleanupPolicyOptions`]. Destructive to version + /// history. See [`optimize`] for details. + pub async fn cleanup( + &mut self, + options: optimize::CleanupPolicyOptions, + ) -> Result> { + optimize::cleanup_all_tables(self, options).await + } + /// Read a blob from a node by its string ID and property name. /// /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`, diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs new file mode 100644 index 0000000..32889a0 --- /dev/null +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -0,0 +1,210 @@ +//! Lance compaction + version cleanup exposed at the graph level. +//! +//! Lance accumulates many small `.lance` fragment files per table over the +//! life of a repo: each `write`, `load`, and `change` op appends one or more +//! fragments and a new manifest. Over long timescales this hurts open times +//! and S3 object counts without improving anything. +//! +//! Two dials: +//! +//! * `optimize_all_tables` — Lance `compact_files` on every table. Rewrites +//! small fragments into fewer large ones. Non-destructive (creates a new +//! version; old fragments remain reachable via older manifest versions). +//! * `cleanup_all_tables` — Lance `cleanup_old_versions` on every table. +//! Removes manifests (and their unique fragments) older than the configured +//! retention. Destructive to version history — callers should gate this +//! behind an explicit confirm flag at the CLI layer. +//! +//! Both walk every node + edge table on the `main` branch. Run branches are +//! ephemeral by design (MR-670 / MR-674) so we do not optimize them. + +use std::time::Duration; + +use chrono::Utc; +use futures::stream::StreamExt; +use lance::dataset::cleanup::{CleanupPolicy, RemovalStats}; +use lance::dataset::optimize::{CompactionMetrics, CompactionOptions, compact_files}; + +use super::*; + +/// How many tables to optimize/cleanup concurrently. Each hits a separate +/// Lance dataset so there is no shared state; the bound is there to avoid +/// flooding the runtime and the S3 connection pool. +const DEFAULT_MAINT_CONCURRENCY: usize = 8; + +fn maint_concurrency() -> usize { + std::env::var("OMNIGRAPH_MAINTENANCE_CONCURRENCY") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|v| *v > 0) + .unwrap_or(DEFAULT_MAINT_CONCURRENCY) +} + +/// Retention knobs for [`cleanup_all_tables`]. At least one must be set or +/// nothing is cleaned. If both are set, Lance applies them as AND (a manifest +/// is kept if it satisfies either — i.e. only manifests older than BOTH the +/// time cutoff AND the version cutoff are removed). +#[derive(Debug, Clone, Default)] +pub struct CleanupPolicyOptions { + /// Keep this many most-recent versions per table. + pub keep_versions: Option, + /// Only remove versions older than this duration. + pub older_than: Option, +} + +/// Per-table outcome of `optimize_all_tables`. +#[derive(Debug, Clone)] +pub struct TableOptimizeStats { + pub table_key: String, + /// Number of source fragments that were rewritten by Lance. + pub fragments_removed: usize, + /// Number of new, larger fragments Lance produced. + pub fragments_added: usize, + /// Did this table get a new Lance manifest version from the compaction? + pub committed: bool, +} + +/// Per-table outcome of `cleanup_all_tables`. +#[derive(Debug, Clone)] +pub struct TableCleanupStats { + pub table_key: String, + pub bytes_removed: u64, + pub old_versions_removed: u64, +} + +/// Run Lance `compact_files` on every node + edge table on `main`. +/// Tables run in parallel (bounded concurrency). +pub async fn optimize_all_tables(db: &mut Omnigraph) -> Result> { + db.ensure_schema_state_valid().await?; + db.ensure_schema_apply_idle("optimize").await?; + + let resolved = db.resolved_branch_target(None).await?; + let snapshot = resolved.snapshot; + + let table_tasks: Vec<_> = all_table_keys(&db.catalog) + .into_iter() + .filter_map(|table_key| { + let entry = snapshot.entry(&table_key)?; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + Some((table_key, full_path)) + }) + .collect(); + + if table_tasks.is_empty() { + return Ok(Vec::new()); + } + + let concurrency = maint_concurrency().min(table_tasks.len()).max(1); + let table_store = &db.table_store; + + let stats: Vec> = futures::stream::iter(table_tasks.into_iter()) + .map(|(table_key, full_path)| async move { + let mut ds = table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?; + let version_before = ds.version().version; + let metrics: CompactionMetrics = + compact_files(&mut ds, CompactionOptions::default(), None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let version_after = ds.version().version; + Ok(TableOptimizeStats { + table_key, + fragments_removed: metrics.fragments_removed, + fragments_added: metrics.fragments_added, + committed: version_after != version_before, + }) + }) + .buffer_unordered(concurrency) + .collect() + .await; + + stats.into_iter().collect() +} + +/// Run Lance `cleanup_old_versions` on every node + edge table on `main`, +/// using [`CleanupPolicyOptions`]. The latest manifest is always preserved +/// regardless (Lance invariant). +pub async fn cleanup_all_tables( + db: &mut Omnigraph, + options: CleanupPolicyOptions, +) -> Result> { + if options.keep_versions.is_none() && options.older_than.is_none() { + return Err(OmniError::manifest( + "cleanup requires at least one of keep_versions or older_than", + )); + } + + db.ensure_schema_state_valid().await?; + db.ensure_schema_apply_idle("cleanup").await?; + + let before_timestamp = options.older_than.map(|d| Utc::now() - d); + let keep_versions = options.keep_versions; + + let resolved = db.resolved_branch_target(None).await?; + let snapshot = resolved.snapshot; + + let table_tasks: Vec<_> = all_table_keys(&db.catalog) + .into_iter() + .filter_map(|table_key| { + let entry = snapshot.entry(&table_key)?; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + Some((table_key, full_path)) + }) + .collect(); + + if table_tasks.is_empty() { + return Ok(Vec::new()); + } + + let concurrency = maint_concurrency().min(table_tasks.len()).max(1); + let table_store = &db.table_store; + + let results: Vec> = futures::stream::iter(table_tasks.into_iter()) + .map(|(table_key, full_path)| async move { + let ds = table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?; + let before_version = keep_versions + .map(|n| ds.version().version.saturating_sub(n as u64)) + .filter(|v| *v > 0); + let policy = CleanupPolicy { + before_timestamp, + before_version, + delete_unverified: false, + error_if_tagged_old_versions: false, + clean_referenced_branches: false, + delete_rate_limit: None, + }; + let removed: RemovalStats = + lance::dataset::cleanup::cleanup_old_versions(&ds, policy) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(TableCleanupStats { + table_key, + bytes_removed: removed.bytes_removed, + old_versions_removed: removed.old_versions, + }) + }) + .buffer_unordered(concurrency) + .collect() + .await; + + results.into_iter().collect() +} + +fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec { + let mut keys: Vec = catalog + .node_types + .keys() + .map(|n| format!("node:{}", n)) + .chain( + catalog + .edge_types + .keys() + .map(|n| format!("edge:{}", n)), + ) + .collect(); + keys.sort(); + keys +} diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index bb7f5cc..0e29c2a 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -323,30 +323,38 @@ async fn load_jsonl_reader( } } - // Phase 2: Build per-type RecordBatches and write to Lance + // Phase 2: Build per-type RecordBatches and write to Lance. + // + // Writes to different tables are independent in Lance (each table has its + // own manifest + fragments), so we parallelize across types with a bounded + // concurrency limit. Serial writes against S3 were the dominant cost of + // load — batching and parallelizing per-type cuts wall time by roughly + // `LOAD_WRITE_CONCURRENCY`× for wide schemas (see MR-677). let mut updates = Vec::new(); let mut result = LoadResult::default(); let snapshot = db.snapshot_for_branch(branch).await?; - // Write nodes first (edges reference node IDs) + // Phase 2a: build and validate every node batch up front. Cheap and + // synchronous — surfaces validation errors before any S3 traffic. + let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> = + Vec::with_capacity(node_rows.len()); for (type_name, rows) in &node_rows { let node_type = &catalog.node_types[type_name]; let batch = build_node_batch(node_type, rows)?; - - // Validate value constraints before writing validate_value_constraints(&batch, node_type)?; - let loaded_count = batch.num_rows(); - let table_key = format!("node:{}", type_name); snapshot .entry(&table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count)); + } - let (state, table_branch) = - write_batch_to_dataset(db, branch, &table_key, batch, mode).await?; + // Phase 2b: write every node type concurrently, bounded. + let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?; + for (type_name, table_key, loaded_count, state, table_branch) in node_write_results { updates.push(crate::db::SubTableUpdate { table_key, table_version: state.version, @@ -354,7 +362,7 @@ async fn load_jsonl_reader( row_count: state.row_count, version_metadata: state.version_metadata, }); - result.nodes_loaded.insert(type_name.clone(), loaded_count); + result.nodes_loaded.insert(type_name, loaded_count); } // Phase 2b: Validate edge referential integrity — every src/dst must @@ -402,20 +410,23 @@ async fn load_jsonl_reader( } } - // Write edges + // Write edges (parallel per edge type, same pattern as nodes) + let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> = + Vec::with_capacity(edge_rows.len()); for (edge_name, rows) in &edge_rows { let edge_type = &catalog.edge_types[edge_name]; let batch = build_edge_batch(edge_type, rows)?; let loaded_count = batch.num_rows(); - let table_key = format!("edge:{}", edge_name); snapshot .entry(&table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count)); + } - let (state, table_branch) = - write_batch_to_dataset(db, branch, &table_key, batch, mode).await?; + let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?; + for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results { updates.push(crate::db::SubTableUpdate { table_key, table_version: state.version, @@ -423,7 +434,7 @@ async fn load_jsonl_reader( row_count: state.row_count, version_metadata: state.version_metadata, }); - result.edges_loaded.insert(edge_name.clone(), loaded_count); + result.edges_loaded.insert(edge_name, loaded_count); } // Phase 3: Validate edge cardinality constraints (before commit — invalid @@ -954,6 +965,64 @@ fn parse_date64_json_value(value: &JsonValue) -> Result> { } /// Write a batch to a Lance dataset, returning (new_version, total_row_count). +/// How many per-type Lance writes to run concurrently during a load. +/// +/// Each write is an independent S3 manifest + fragment write against a +/// different table. Ops within a single table must still be serial (Lance +/// OCC on the manifest), but cross-table writes have no shared state. +/// +/// 8 is a conservative default — enough to overlap S3 round-trip latency +/// across the typical 10-30 table schemas without flooding the runtime. +/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking. +const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8; + +fn load_write_concurrency() -> usize { + std::env::var("OMNIGRAPH_LOAD_CONCURRENCY") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|v| *v > 0) + .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY) +} + +/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples +/// concurrently. Returns results in original iteration order so callers can +/// zip them back to per-type metadata. +async fn write_batches_concurrently( + db: &Omnigraph, + branch: Option<&str>, + mode: LoadMode, + prepared: Vec<(String, String, RecordBatch, usize)>, +) -> Result< + Vec<( + String, + String, + usize, + crate::table_store::TableState, + Option, + )>, +> { + use futures::stream::StreamExt; + + if prepared.is_empty() { + return Ok(Vec::new()); + } + + let concurrency = load_write_concurrency().min(prepared.len()).max(1); + + futures::stream::iter(prepared.into_iter().map( + |(type_name, table_key, batch, loaded_count)| async move { + let (state, table_branch) = + write_batch_to_dataset(db, branch, &table_key, batch, mode).await?; + Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch)) + }, + )) + .buffered(concurrency) + .collect::>>() + .await + .into_iter() + .collect() +} + async fn write_batch_to_dataset( db: &Omnigraph, branch: Option<&str>, diff --git a/openapi.json b/openapi.json index edee87d..186e841 100644 --- a/openapi.json +++ b/openapi.json @@ -7,7 +7,7 @@ "name": "MIT", "identifier": "MIT" }, - "version": "0.3.0" + "version": "0.3.1" }, "paths": { "/branches": {