engine: convert write APIs from &mut self to &self (PR 2 Step C)

The interior-mutability primitives from Step B (catalog ArcSwap,
schema_source ArcSwap, coordinator Mutex, and RuntimeCache's existing
internal locking) make every Omnigraph engine write API safe to expose
under &self. This commit flips the public surface so the HTTP server
can hold Arc<Omnigraph> in PR 2 Step F instead of Arc<RwLock<Omnigraph>>.

Public API conversions:
- mutate, mutate_as
- ingest, ingest_as, ingest_file, ingest_file_as
- load, load_as, load_file
- branch_merge, branch_merge_as
- apply_schema
- ensure_indices, ensure_indices_on
- optimize

Inner functions converted in lockstep (their signatures must match the
new caller shape):
- mutate_with_current_actor, ingest_with_current_actor,
  load_direct_on_branch
- execute_named_mutation, execute_insert, execute_update,
  execute_delete, execute_delete_node, execute_delete_edge
- branch_merge_impl, branch_merge_on_current_target
- load_jsonl_reader
- schema_apply::{apply_schema, apply_schema_with_lock,
  acquire_schema_apply_lock, release_schema_apply_lock,
  ensure_schema_apply_idle}
- table_ops::{ensure_indices, ensure_indices_on,
  ensure_indices_for_branch, commit_prepared_updates,
  commit_prepared_updates_with_expected,
  commit_prepared_updates_on_branch,
  commit_prepared_updates_on_branch_with_expected,
  commit_manifest_updates, record_merge_commit,
  ensure_commit_graph_initialized, commit_updates_on_branch_with_expected}
- optimize::optimize_all_tables
- Omnigraph::commit_manifest_updates, record_merge_commit,
  commit_updates_on_branch_with_expected, ensure_commit_graph_initialized

The conversion is mechanical: callers that previously took `db: &mut
Omnigraph` now take `db: &Omnigraph`; every interior mutation goes
through the existing locks (coordinator.lock().await, store_catalog,
runtime_cache.invalidate_all). No new locks acquired, no new lock-order
hazards introduced.

102 lib tests + 24 runs + 30 branching + 63 end_to_end + 39 server
tests pass. Workspace compiles clean (1 warning on a now-redundant `mut`
binding in CLI; cleaned up in a follow-up). The remaining work in PR 2
is the AppState flip (Arc<RwLock<Omnigraph>> -> Arc<Omnigraph> +
WorkloadController), the revalidation perf optimization in commit_all,
and the WorkloadController itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-07 16:52:02 +02:00
parent 011f9b9610
commit d08c42c369
No known key found for this signature in database
7 changed files with 47 additions and 47 deletions

View file

@ -278,7 +278,7 @@ impl Omnigraph {
schema_apply::plan_schema(self, desired_schema_source).await
}
pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
schema_apply::apply_schema(self, desired_schema_source).await
}
@ -647,11 +647,11 @@ impl Omnigraph {
/// unbranched subtables keep inheriting `main`, while subtables inherited
/// from an ancestor branch are first forked into the active branch before
/// their index metadata is updated.
pub async fn ensure_indices(&mut self) -> Result<()> {
pub async fn ensure_indices(&self) -> Result<()> {
table_ops::ensure_indices(self).await
}
pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
table_ops::ensure_indices_on(self, branch).await
}
@ -674,7 +674,7 @@ impl Omnigraph {
/// Compact small Lance fragments into fewer larger ones across every
/// node + edge table on `main`. See [`optimize`] for details.
pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
optimize::optimize_all_tables(self).await
}
@ -1003,14 +1003,14 @@ impl Omnigraph {
}
pub(crate) async fn commit_manifest_updates(
&mut self,
&self,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
table_ops::commit_manifest_updates(self, updates).await
}
pub(crate) async fn record_merge_commit(
&mut self,
&self,
manifest_version: u64,
parent_commit_id: &str,
merged_parent_commit_id: &str,
@ -1027,7 +1027,7 @@ impl Omnigraph {
}
pub(crate) async fn commit_updates_on_branch_with_expected(
&mut self,
&self,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
@ -1043,7 +1043,7 @@ impl Omnigraph {
.await
}
pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
table_ops::ensure_commit_graph_initialized(self).await
}

View file

@ -74,7 +74,7 @@ pub struct TableCleanupStats {
/// Run Lance `compact_files` on every node + edge table on `main`.
/// Tables run in parallel (bounded concurrency).
pub async fn optimize_all_tables(db: &mut Omnigraph) -> Result<Vec<TableOptimizeStats>> {
pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStats>> {
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("optimize").await?;

View file

@ -12,7 +12,7 @@ pub(super) async fn plan_schema(
}
pub(super) async fn apply_schema(
db: &mut Omnigraph,
db: &Omnigraph,
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
acquire_schema_apply_lock(db).await?;
@ -27,7 +27,7 @@ pub(super) async fn apply_schema(
}
pub(super) async fn apply_schema_with_lock(
db: &mut Omnigraph,
db: &Omnigraph,
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
db.ensure_schema_state_valid().await?;
@ -536,7 +536,7 @@ pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) ->
ensure_schema_apply_not_locked(db, operation).await
}
pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.refresh_coordinator_only().await?;
let branches = db.coordinator.lock().await.all_branches().await?;
@ -576,7 +576,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
Ok(())
}
pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
db.coordinator
.lock()
.await

View file

@ -21,12 +21,12 @@ pub(super) async fn graph_index_for_resolved(
db.runtime_cache.graph_index(resolved, &catalog).await
}
pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> {
pub(super) async fn ensure_indices(db: &Omnigraph) -> Result<()> {
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
ensure_indices_for_branch(db, current_branch.as_deref()).await
}
pub(super) async fn ensure_indices_on(db: &mut Omnigraph, branch: &str) -> Result<()> {
pub(super) async fn ensure_indices_on(db: &Omnigraph, branch: &str) -> Result<()> {
let branch = normalize_branch_name(branch)?;
ensure_indices_for_branch(db, branch.as_deref()).await
}
@ -69,7 +69,7 @@ pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
}
pub(super) async fn ensure_indices_for_branch(
db: &mut Omnigraph,
db: &Omnigraph,
branch: Option<&str>,
) -> Result<()> {
db.ensure_schema_state_valid().await?;
@ -726,7 +726,7 @@ async fn prepare_updates_for_commit(
}
async fn commit_prepared_updates(
db: &mut Omnigraph,
db: &Omnigraph,
updates: &[crate::db::SubTableUpdate],
actor_id: Option<&str>,
) -> Result<u64> {
@ -743,7 +743,7 @@ async fn commit_prepared_updates(
}
async fn commit_prepared_updates_with_expected(
db: &mut Omnigraph,
db: &Omnigraph,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
@ -761,7 +761,7 @@ async fn commit_prepared_updates_with_expected(
}
pub(super) async fn commit_prepared_updates_on_branch(
db: &mut Omnigraph,
db: &Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
actor_id: Option<&str>,
@ -788,7 +788,7 @@ pub(super) async fn commit_prepared_updates_on_branch(
}
pub(super) async fn commit_prepared_updates_on_branch_with_expected(
db: &mut Omnigraph,
db: &Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
@ -835,14 +835,14 @@ pub(super) async fn commit_updates(
}
pub(super) async fn commit_manifest_updates(
db: &mut Omnigraph,
db: &Omnigraph,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.coordinator.lock().await.commit_manifest_updates(updates).await
}
pub(super) async fn record_merge_commit(
db: &mut Omnigraph,
db: &Omnigraph,
manifest_version: u64,
parent_commit_id: &str,
merged_parent_commit_id: &str,
@ -863,7 +863,7 @@ pub(super) async fn record_merge_commit(
/// `expected_table_versions` map asserts the manifest's pre-write per-table
/// versions; mismatches surface as `ManifestConflictDetails::ExpectedVersionMismatch`.
pub(super) async fn commit_updates_on_branch_with_expected(
db: &mut Omnigraph,
db: &Omnigraph,
branch: Option<&str>,
updates: &[crate::db::SubTableUpdate],
expected_table_versions: &std::collections::HashMap<String, u64>,
@ -881,7 +881,7 @@ pub(super) async fn commit_updates_on_branch_with_expected(
.await
}
pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> {
pub(super) async fn ensure_commit_graph_initialized(db: &Omnigraph) -> Result<()> {
db.coordinator.lock().await.ensure_commit_graph_initialized().await
}

View file

@ -1008,12 +1008,12 @@ async fn publish_rewritten_merge_table(
}
impl Omnigraph {
pub async fn branch_merge(&mut self, source: &str, target: &str) -> Result<MergeOutcome> {
pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
self.branch_merge_as(source, target, None).await
}
pub async fn branch_merge_as(
&mut self,
&self,
source: &str,
target: &str,
actor_id: Option<&str>,
@ -1023,7 +1023,7 @@ impl Omnigraph {
}
async fn branch_merge_impl(
&mut self,
&self,
source: &str,
target: &str,
actor_id: Option<&str>,
@ -1101,7 +1101,7 @@ impl Omnigraph {
}
async fn branch_merge_on_current_target(
&mut self,
&self,
base_snapshot: &Snapshot,
source_snapshot: &Snapshot,
target_head_commit_id: &str,

View file

@ -670,7 +670,7 @@ fn enforce_no_mixed_destructive_constructive(
impl Omnigraph {
pub async fn mutate(
&mut self,
&self,
branch: &str,
query_source: &str,
query_name: &str,
@ -681,7 +681,7 @@ impl Omnigraph {
}
pub async fn mutate_as(
&mut self,
&self,
branch: &str,
query_source: &str,
query_name: &str,
@ -693,7 +693,7 @@ impl Omnigraph {
}
async fn mutate_with_current_actor(
&mut self,
&self,
branch: &str,
query_source: &str,
query_name: &str,
@ -799,7 +799,7 @@ impl Omnigraph {
}
async fn execute_named_mutation(
&mut self,
&self,
query_source: &str,
query_name: &str,
params: &ParamMap,
@ -863,7 +863,7 @@ impl Omnigraph {
}
async fn execute_insert(
&mut self,
&self,
type_name: &str,
assignments: &[IRAssignment],
params: &ParamMap,
@ -977,7 +977,7 @@ impl Omnigraph {
}
async fn execute_update(
&mut self,
&self,
type_name: &str,
assignments: &[IRAssignment],
predicate: &IRMutationPredicate,
@ -1102,7 +1102,7 @@ impl Omnigraph {
}
async fn execute_delete(
&mut self,
&self,
type_name: &str,
predicate: &IRMutationPredicate,
params: &ParamMap,
@ -1120,7 +1120,7 @@ impl Omnigraph {
}
async fn execute_delete_node(
&mut self,
&self,
type_name: &str,
predicate: &IRMutationPredicate,
params: &ParamMap,
@ -1251,7 +1251,7 @@ impl Omnigraph {
}
async fn execute_delete_edge(
&mut self,
&self,
type_name: &str,
predicate: &IRMutationPredicate,
params: &ParamMap,

View file

@ -73,7 +73,7 @@ pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) ->
impl Omnigraph {
pub async fn ingest(
&mut self,
&self,
branch: &str,
from: Option<&str>,
data: &str,
@ -83,7 +83,7 @@ impl Omnigraph {
}
pub async fn ingest_as(
&mut self,
&self,
branch: &str,
from: Option<&str>,
data: &str,
@ -95,7 +95,7 @@ impl Omnigraph {
}
pub async fn ingest_file(
&mut self,
&self,
branch: &str,
from: Option<&str>,
path: &str,
@ -105,7 +105,7 @@ impl Omnigraph {
}
pub async fn ingest_file_as(
&mut self,
&self,
branch: &str,
from: Option<&str>,
path: &str,
@ -117,7 +117,7 @@ impl Omnigraph {
}
async fn ingest_with_current_actor(
&mut self,
&self,
branch: &str,
from: Option<&str>,
data: &str,
@ -149,12 +149,12 @@ impl Omnigraph {
})
}
pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
self.load_as(branch, data, mode, None).await
}
pub async fn load_as(
&mut self,
&self,
branch: &str,
data: &str,
mode: LoadMode,
@ -180,7 +180,7 @@ impl Omnigraph {
}
pub async fn load_file(
&mut self,
&self,
branch: &str,
path: &str,
mode: LoadMode,
@ -190,7 +190,7 @@ impl Omnigraph {
}
async fn load_direct_on_branch(
&mut self,
&self,
branch: Option<&str>,
data: &str,
mode: LoadMode,
@ -235,7 +235,7 @@ impl LoadResult {
}
async fn load_jsonl_reader<R: BufRead>(
db: &mut Omnigraph,
db: &Omnigraph,
branch: Option<&str>,
reader: R,
mode: LoadMode,