Support multi-statement mutations (insert + edge in one query)

Allow mutation queries to contain multiple sequential statements that
execute atomically within a single transactional run. This enables
patterns like inserting a node and its edges in one query:

    query add_and_link($name: String, $age: I32, $friend: String) {
        insert Person { name: $name, age: $age }
        insert Knows { from: $name, to: $friend }
    }

Changes span the full compiler-to-execution pipeline:
- Grammar: mutation_body = { mutation_stmt+ }
- AST: QueryDecl.mutations: Vec<Mutation>
- IR: MutationIR.ops: Vec<MutationOpIR>
- Execution: loop over ops, accumulate affected counts

Cross-statement visibility works because each statement's commit_updates
advances the manifest state, so subsequent statements see prior writes.
Atomicity comes from the existing run mechanism (begin_run/publish_run).

https://claude.ai/code/session_01E4VG2WXrZW8aeXFiqr8NwF
This commit is contained in:
Claude 2026-04-11 20:27:51 +00:00
parent a844e0ba68
commit d10f78530f
No known key found for this signature in database
9 changed files with 240 additions and 64 deletions

View file

@ -13,7 +13,7 @@ pub fn lower_query(
query: &QueryDecl,
type_ctx: &TypeContext,
) -> Result<QueryIR> {
if query.mutation.is_some() {
if !query.mutations.is_empty() {
return Err(crate::error::NanoError::Plan(
"cannot lower mutation query with read-query lowerer".to_string(),
));
@ -61,54 +61,67 @@ pub fn lower_query(
}
pub fn lower_mutation_query(query: &QueryDecl) -> Result<MutationIR> {
let mutation = query.mutation.as_ref().ok_or_else(|| {
crate::error::NanoError::Plan("query does not contain a mutation body".to_string())
})?;
if query.mutations.is_empty() {
return Err(crate::error::NanoError::Plan(
"query does not contain a mutation body".to_string(),
));
}
let param_names: HashSet<String> = query.params.iter().map(|p| p.name.clone()).collect();
let op = match mutation {
Mutation::Insert(insert) => MutationOpIR::Insert {
let ops = query
.mutations
.iter()
.map(|m| lower_single_mutation(m, &param_names))
.collect::<Result<Vec<_>>>()?;
Ok(MutationIR {
name: query.name.clone(),
params: query.params.clone(),
ops,
})
}
fn lower_single_mutation(
mutation: &Mutation,
param_names: &HashSet<String>,
) -> Result<MutationOpIR> {
match mutation {
Mutation::Insert(insert) => Ok(MutationOpIR::Insert {
type_name: insert.type_name.clone(),
assignments: insert
.assignments
.iter()
.map(|a| IRAssignment {
property: a.property.clone(),
value: lower_match_value(&a.value, &param_names),
value: lower_match_value(&a.value, param_names),
})
.collect(),
},
Mutation::Update(update) => MutationOpIR::Update {
}),
Mutation::Update(update) => Ok(MutationOpIR::Update {
type_name: update.type_name.clone(),
assignments: update
.assignments
.iter()
.map(|a| IRAssignment {
property: a.property.clone(),
value: lower_match_value(&a.value, &param_names),
value: lower_match_value(&a.value, param_names),
})
.collect(),
predicate: IRMutationPredicate {
property: update.predicate.property.clone(),
op: update.predicate.op,
value: lower_match_value(&update.predicate.value, &param_names),
value: lower_match_value(&update.predicate.value, param_names),
},
},
Mutation::Delete(delete) => MutationOpIR::Delete {
}),
Mutation::Delete(delete) => Ok(MutationOpIR::Delete {
type_name: delete.type_name.clone(),
predicate: IRMutationPredicate {
property: delete.predicate.property.clone(),
op: delete.predicate.op,
value: lower_match_value(&delete.predicate.value, &param_names),
value: lower_match_value(&delete.predicate.value, param_names),
},
},
};
Ok(MutationIR {
name: query.name.clone(),
params: query.params.clone(),
op,
})
}),
}
}
fn lower_clauses(
@ -543,7 +556,7 @@ query q($name: String, $age: I32) {
assert!(matches!(checked, CheckedQuery::Mutation(_)));
let ir = lower_mutation_query(&qf.queries[0]).unwrap();
match ir.op {
match &ir.ops[0] {
MutationOpIR::Update {
type_name,
assignments,
@ -636,7 +649,7 @@ query stamp() {
assert!(matches!(checked, CheckedQuery::Mutation(_)));
let ir = lower_mutation_query(&qf.queries[0]).unwrap();
match ir.op {
match &ir.ops[0] {
MutationOpIR::Update {
assignments,
predicate,
@ -654,4 +667,29 @@ query stamp() {
_ => panic!("expected update mutation op"),
}
}
#[test]
fn test_lower_multi_mutation() {
let catalog = setup();
let qf = parse_query(
r#"
query q($name: String, $age: I32, $friend: String) {
insert Person { name: $name, age: $age }
insert Knows { from: $name, to: $friend }
}
"#,
)
.unwrap();
let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
assert!(matches!(checked, CheckedQuery::Mutation(_)));
let ir = lower_mutation_query(&qf.queries[0]).unwrap();
assert_eq!(ir.ops.len(), 2);
assert!(
matches!(&ir.ops[0], MutationOpIR::Insert { type_name, .. } if type_name == "Person")
);
assert!(
matches!(&ir.ops[1], MutationOpIR::Insert { type_name, .. } if type_name == "Knows")
);
}
}

View file

@ -19,7 +19,7 @@ pub struct QueryIR {
pub struct MutationIR {
pub name: String,
pub params: Vec<Param>,
pub op: MutationOpIR,
pub ops: Vec<MutationOpIR>,
}
#[derive(Debug, Clone)]

View file

@ -15,7 +15,7 @@ pub struct QueryDecl {
pub return_clause: Vec<Projection>,
pub order_clause: Vec<Ordering>,
pub limit: Option<u64>,
pub mutation: Option<Mutation>,
pub mutations: Vec<Mutation>,
}
#[derive(Debug, Clone)]

View file

@ -55,7 +55,7 @@ fn parse_query_decl(pair: pest::iterators::Pair<Rule>) -> Result<QueryDecl> {
let mut return_clause = Vec::new();
let mut order_clause = Vec::new();
let mut limit = None;
let mut mutation = None;
let mut mutations = Vec::new();
for item in inner {
match item.as_rule() {
@ -134,11 +134,18 @@ fn parse_query_decl(pair: pest::iterators::Pair<Rule>) -> Result<QueryDecl> {
}
}
}
Rule::mutation_stmt => {
let stmt = body.into_inner().next().ok_or_else(|| {
NanoError::Parse("mutation statement cannot be empty".to_string())
})?;
mutation = Some(parse_mutation_stmt(stmt)?);
Rule::mutation_body => {
for mutation_pair in body.into_inner() {
if let Rule::mutation_stmt = mutation_pair.as_rule() {
let stmt =
mutation_pair.into_inner().next().ok_or_else(|| {
NanoError::Parse(
"mutation statement cannot be empty".to_string(),
)
})?;
mutations.push(parse_mutation_stmt(stmt)?);
}
}
}
_ => {}
}
@ -156,7 +163,7 @@ fn parse_query_decl(pair: pest::iterators::Pair<Rule>) -> Result<QueryDecl> {
return_clause,
order_clause,
limit,
mutation,
mutations,
})
}
@ -1265,7 +1272,7 @@ query add_person($name: String, $age: I32) {
"#;
let qf = parse_query(input).unwrap();
let q = &qf.queries[0];
match q.mutation.as_ref().expect("expected mutation") {
match q.mutations.first().expect("expected mutation") {
Mutation::Insert(ins) => {
assert_eq!(ins.type_name, "Person");
assert_eq!(ins.assignments.len(), 2);
@ -1285,7 +1292,7 @@ query set_age($name: String, $age: I32) {
"#;
let qf = parse_query(input).unwrap();
let q = &qf.queries[0];
match q.mutation.as_ref().expect("expected mutation") {
match q.mutations.first().expect("expected mutation") {
Mutation::Update(upd) => {
assert_eq!(upd.type_name, "Person");
assert_eq!(upd.assignments.len(), 1);
@ -1305,7 +1312,7 @@ query drop_person($name: String) {
"#;
let qf = parse_query(input).unwrap();
let q = &qf.queries[0];
match q.mutation.as_ref().expect("expected mutation") {
match q.mutations.first().expect("expected mutation") {
Mutation::Delete(del) => {
assert_eq!(del.type_name, "Person");
assert_eq!(del.predicate.property, "name");
@ -1372,7 +1379,7 @@ query stamp() {
"#,
)
.unwrap();
match mutation.queries[0].mutation.as_ref().unwrap() {
match mutation.queries[0].mutations.first().unwrap() {
Mutation::Update(update) => {
assert!(matches!(update.assignments[0].value, MatchValue::Now));
assert!(matches!(update.predicate.value, MatchValue::Now));
@ -1381,6 +1388,47 @@ query stamp() {
}
}
#[test]
fn test_parse_multi_mutation() {
let input = r#"
query add_and_link($name: String, $age: I32, $friend: String) {
insert Person { name: $name, age: $age }
insert Knows { from: $name, to: $friend }
}
"#;
let qf = parse_query(input).unwrap();
let q = &qf.queries[0];
assert_eq!(q.mutations.len(), 2);
assert!(matches!(&q.mutations[0], Mutation::Insert(ins) if ins.type_name == "Person"));
assert!(matches!(&q.mutations[1], Mutation::Insert(ins) if ins.type_name == "Knows"));
}
#[test]
fn test_parse_multi_mutation_mixed_ops() {
let input = r#"
query create_and_clean($name: String, $age: I32, $old: String) {
insert Person { name: $name, age: $age }
delete Person where name = $old
}
"#;
let qf = parse_query(input).unwrap();
let q = &qf.queries[0];
assert_eq!(q.mutations.len(), 2);
assert!(matches!(&q.mutations[0], Mutation::Insert(_)));
assert!(matches!(&q.mutations[1], Mutation::Delete(_)));
}
#[test]
fn test_parse_single_mutation_backward_compat() {
let input = r#"
query add($name: String, $age: I32) {
insert Person { name: $name, age: $age }
}
"#;
let qf = parse_query(input).unwrap();
assert_eq!(qf.queries[0].mutations.len(), 1);
}
#[test]
fn test_parse_list_literal() {
let input = r#"

View file

@ -16,7 +16,8 @@ query_annotation = { description_annotation | instruction_annotation }
description_annotation = { "@description" ~ "(" ~ string_lit ~ ")" }
instruction_annotation = { "@instruction" ~ "(" ~ string_lit ~ ")" }
query_body = { read_query_body | mutation_stmt }
query_body = { read_query_body | mutation_body }
mutation_body = { mutation_stmt+ }
read_query_body = {
match_clause
~ return_clause

View file

@ -58,7 +58,7 @@ impl ResolvedType {
#[derive(Debug, Clone)]
pub struct MutationTypeContext {
pub target_type: String,
pub target_types: Vec<String>,
}
#[derive(Debug, Clone)]
@ -68,16 +68,20 @@ pub enum CheckedQuery {
}
pub fn typecheck_query_decl(catalog: &Catalog, query: &QueryDecl) -> Result<CheckedQuery> {
if let Some(mutation) = &query.mutation {
let target_type = typecheck_mutation(catalog, mutation, &query.params)?;
Ok(CheckedQuery::Mutation(MutationTypeContext { target_type }))
if !query.mutations.is_empty() {
let mut target_types = Vec::with_capacity(query.mutations.len());
for mutation in &query.mutations {
let target_type = typecheck_mutation(catalog, mutation, &query.params)?;
target_types.push(target_type);
}
Ok(CheckedQuery::Mutation(MutationTypeContext { target_types }))
} else {
Ok(CheckedQuery::Read(typecheck_read_query(catalog, query)?))
}
}
pub fn typecheck_query(catalog: &Catalog, query: &QueryDecl) -> Result<TypeContext> {
if query.mutation.is_some() {
if !query.mutations.is_empty() {
return Err(NanoError::Type(
"mutation query cannot be typechecked with read-query API".to_string(),
));
@ -2557,7 +2561,7 @@ query add_person($name: String, $age: I32) {
.unwrap();
let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
match checked {
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_type, "Person"),
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_types[0], "Person"),
_ => panic!("expected mutation typecheck result"),
}
}
@ -2593,7 +2597,7 @@ query add_doc($slug: String, $body: String) {
.unwrap();
let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
match checked {
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_type, "Doc"),
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_types[0], "Doc"),
_ => panic!("expected mutation typecheck result"),
}
}
@ -2664,7 +2668,7 @@ query add_knows($from: String, $to: String) {
.unwrap();
let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
match checked {
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_type, "Knows"),
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_types[0], "Knows"),
_ => panic!("expected mutation typecheck result"),
}
}
@ -2699,7 +2703,7 @@ query del_knows($from: String) {
.unwrap();
let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
match checked {
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_type, "Knows"),
CheckedQuery::Mutation(ctx) => assert_eq!(ctx.target_types[0], "Knows"),
_ => panic!("expected mutation typecheck result"),
}
}
@ -2719,6 +2723,43 @@ query upd_knows($from: String) {
assert!(err.to_string().contains("T16"));
}
#[test]
fn test_mutation_multi_insert_typecheck_ok() {
let catalog = setup();
let qf = parse_query(
r#"
query add_and_link($name: String, $age: I32, $friend: String) {
insert Person { name: $name, age: $age }
insert Knows { from: $name, to: $friend }
}
"#,
)
.unwrap();
let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
match checked {
CheckedQuery::Mutation(ctx) => {
assert_eq!(ctx.target_types, vec!["Person", "Knows"]);
}
_ => panic!("expected mutation typecheck result"),
}
}
#[test]
fn test_mutation_multi_second_stmt_error() {
let catalog = setup();
let qf = parse_query(
r#"
query bad($name: String, $age: I32) {
insert Person { name: $name, age: $age }
insert Unknown { foo: $name }
}
"#,
)
.unwrap();
let err = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap_err();
assert!(err.to_string().contains("T10"));
}
#[test]
fn test_now_expression_typechecks_as_datetime() {
let schema = parse_schema(

View file

@ -3335,24 +3335,30 @@ impl Omnigraph {
let ir = lower_mutation_query(&query_decl)?;
match &ir.op {
MutationOpIR::Insert {
type_name,
assignments,
} => self.execute_insert(type_name, assignments, params).await,
MutationOpIR::Update {
type_name,
assignments,
predicate,
} => {
self.execute_update(type_name, assignments, predicate, params)
.await
}
MutationOpIR::Delete {
type_name,
predicate,
} => self.execute_delete(type_name, predicate, params).await,
let mut total = MutationResult::default();
for op in &ir.ops {
let result = match op {
MutationOpIR::Insert {
type_name,
assignments,
} => self.execute_insert(type_name, assignments, params).await?,
MutationOpIR::Update {
type_name,
assignments,
predicate,
} => {
self.execute_update(type_name, assignments, predicate, params)
.await?
}
MutationOpIR::Delete {
type_name,
predicate,
} => self.execute_delete(type_name, predicate, params).await?,
};
total.affected_nodes += result.affected_nodes;
total.affected_edges += result.affected_edges;
}
Ok(total)
}
pub async fn branch_merge(&mut self, source: &str, target: &str) -> Result<MergeOutcome> {

View file

@ -644,6 +644,43 @@ async fn mutation_insert_edge() {
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn mutation_multi_insert_node_and_edge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// In one atomic mutation: insert Eve + edge Eve→Alice
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(&[("$name", "Eve"), ("$friend", "Alice")], &[("$age", 22)]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert_eq!(result.affected_edges, 1);
// Verify traversal: Eve → Alice
let qr = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
&params(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let batch = qr.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn mutation_update_node() {
let dir = tempfile::tempdir().unwrap();

View file

@ -35,6 +35,11 @@ query remove_person($name: String) {
query remove_friendship($from: String) {
delete Knows where from = $from
}
query insert_person_and_friend($name: String, $age: I32, $friend: String) {
insert Person { name: $name, age: $age }
insert Knows { from: $name, to: $friend }
}
"#;
/// Init a repo and load the standard test data.