diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 60ebef3..0d9a1ff 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -1404,6 +1404,19 @@ async fn server_graphs_list( State(state): State, actor: Option>, ) -> std::result::Result, ApiError> { + Ok(Json( + do_graphs_list(&state, actor.as_ref().map(|Extension(a)| a)).await?, + )) +} + +/// Shared backend for `GET /graphs` and the MCP `graphs_list` tool + +/// `omnigraph://graphs` resource. Server-scoped: 405 in single mode (no +/// registry to enumerate), else the server-level `GraphList` Cedar gate then +/// the sorted registry list. +pub(crate) async fn do_graphs_list( + state: &AppState, + actor: Option<&ResolvedActor>, +) -> std::result::Result { // 405 in single mode — there's no registry to enumerate, and the // legacy URL surface didn't expose this endpoint. let registry = match state.routing() { @@ -1424,7 +1437,7 @@ async fn server_graphs_list( // is the right default (don't leak the registry until the operator // explicitly authorizes it). authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, state.server_policy.as_deref(), PolicyRequest { action: PolicyAction::GraphList, @@ -1442,7 +1455,7 @@ async fn server_graphs_list( }) .collect(); graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id)); - Ok(Json(GraphListResponse { graphs })) + Ok(GraphListResponse { graphs }) } async fn server_openapi(State(state): State) -> Json { @@ -1823,8 +1836,24 @@ async fn server_snapshot( Query(query): Query, ) -> std::result::Result, ApiError> { let branch = query.branch.unwrap_or_else(|| "main".to_string()); + Ok(Json( + do_snapshot(&handle, actor.as_ref().map(|Extension(a)| a), branch).await?, + )) +} + +/// Shared backend for `GET /snapshot` and the MCP `snapshot` tool. +/// +/// The Cedar `Read` gate plus the engine snapshot read live here so the HTTP +/// handler and the MCP tool dispatch one path (MR-969 / RFC-003). Same shape +/// as `run_query`/`run_mutate`: takes the resolved actor and the args, never +/// an Axum extractor. +pub(crate) async fn do_snapshot( + handle: &Arc, + actor: Option<&ResolvedActor>, + branch: String, +) -> std::result::Result { authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::Read, @@ -1838,7 +1867,7 @@ async fn server_snapshot( .await .map_err(ApiError::from_omni)? }; - Ok(Json(snapshot_payload(&branch, &snapshot))) + Ok(snapshot_payload(&branch, &snapshot)) } /// Header values that flag a response as coming from a deprecated route @@ -2443,8 +2472,19 @@ async fn server_schema_get( Extension(handle): Extension>, actor: Option>, ) -> std::result::Result, ApiError> { + Ok(Json( + do_schema_get(&handle, actor.as_ref().map(|Extension(a)| a)).await?, + )) +} + +/// Shared backend for `GET /schema` and the MCP `schema_get` tool + +/// `omnigraph://schema` resource. Cedar `Read` gate then the schema source. +pub(crate) async fn do_schema_get( + handle: &Arc, + actor: Option<&ResolvedActor>, +) -> std::result::Result { authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::Read, @@ -2456,7 +2496,7 @@ async fn server_schema_get( let db = &handle.engine; db.schema_source().to_string() }; - Ok(Json(SchemaOutput { schema_source })) + Ok(SchemaOutput { schema_source }) } #[utoipa::path( @@ -2486,15 +2526,34 @@ async fn server_schema_apply( actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { + Ok(Json( + do_schema_apply( + &state, + &handle, + actor.as_ref().map(|Extension(a)| a), + &request.schema_source, + request.allow_data_loss, + ) + .await?, + )) +} + +/// Shared backend for `POST /schema/apply` and the MCP `schema_apply` tool. +/// Cedar `SchemaApply` gate, per-actor admission, then the catalog-checked +/// engine apply. +pub(crate) async fn do_schema_apply( + state: &AppState, + handle: &Arc, + actor: Option<&ResolvedActor>, + schema_source: &str, + allow_data_loss: bool, +) -> std::result::Result { let actor_arc = actor - .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.actor_id)) + .map(|a| Arc::clone(&a.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor - .as_ref() - .map(|Extension(actor)| actor.actor_id.as_ref()); + let actor_id = actor.map(|a| a.actor_id.as_ref()); authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::SchemaApply, @@ -2502,7 +2561,7 @@ async fn server_schema_apply( target_branch: Some("main".to_string()), }, )?; - let est_bytes = request.schema_source.len() as u64; + let est_bytes = schema_source.len() as u64; let _admission = state .workload .try_admit(&actor_arc, est_bytes) @@ -2518,9 +2577,9 @@ async fn server_schema_apply( // HTTP-layer authorize_request just made above. PR #3 collapses // the redundancy. db.apply_schema_as_with_catalog_check( - &request.schema_source, + schema_source, omnigraph::db::SchemaApplyOptions { - allow_data_loss: request.allow_data_loss, + allow_data_loss, }, actor_id, |catalog| { @@ -2533,7 +2592,7 @@ async fn server_schema_apply( .await .map_err(ApiError::from_omni)? }; - Ok(Json(schema_apply_output(handle.uri.as_str(), result))) + Ok(schema_apply_output(handle.uri.as_str(), result)) } #[utoipa::path( @@ -2567,13 +2626,37 @@ async fn server_ingest( let branch = request.branch.unwrap_or_else(|| "main".to_string()); let from = request.from.unwrap_or_else(|| "main".to_string()); let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge); + Ok(Json( + do_ingest( + &state, + &handle, + actor.as_ref().map(|Extension(a)| a), + &request.data, + mode, + branch, + from, + ) + .await?, + )) +} + +/// Shared backend for `POST /ingest` and the MCP `ingest` tool. `branch`, +/// `from`, and `mode` are already defaulted by the caller. Gates `Change` +/// (plus `BranchCreate` when ingesting into a not-yet-existing branch), then +/// per-actor admission, then the engine ingest. +pub(crate) async fn do_ingest( + state: &AppState, + handle: &Arc, + actor: Option<&ResolvedActor>, + data: &str, + mode: omnigraph::loader::LoadMode, + branch: String, + from: String, +) -> std::result::Result { let actor_arc = actor - .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.actor_id)) + .map(|a| Arc::clone(&a.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor - .as_ref() - .map(|Extension(actor)| actor.actor_id.as_ref()); + let actor_id = actor.map(|a| a.actor_id.as_ref()); let branch_exists = { let db = &handle.engine; @@ -2586,7 +2669,7 @@ async fn server_ingest( if !branch_exists { authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::BranchCreate, @@ -2596,7 +2679,7 @@ async fn server_ingest( )?; } authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::Change, @@ -2604,7 +2687,7 @@ async fn server_ingest( target_branch: None, }, )?; - let est_bytes = request.data.len() as u64; + let est_bytes = data.len() as u64; let _admission = state .workload .try_admit(&actor_arc, est_bytes) @@ -2612,16 +2695,16 @@ async fn server_ingest( let result = { let db = &handle.engine; - db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) + db.ingest_as(&branch, Some(&from), data, mode, actor_id) .await .map_err(ApiError::from_omni)? }; - Ok(Json(ingest_output( + Ok(ingest_output( handle.uri.as_str(), &result, actor_id.map(str::to_string), - ))) + )) } #[utoipa::path( @@ -2643,8 +2726,19 @@ async fn server_branch_list( Extension(handle): Extension>, actor: Option>, ) -> std::result::Result, ApiError> { + Ok(Json( + do_branches_list(&handle, actor.as_ref().map(|Extension(a)| a)).await?, + )) +} + +/// Shared backend for `GET /branches` and the MCP `branches_list` tool + +/// `omnigraph://branches` resource. Cedar `Read` gate then the sorted list. +pub(crate) async fn do_branches_list( + handle: &Arc, + actor: Option<&ResolvedActor>, +) -> std::result::Result { authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::Read, @@ -2657,7 +2751,7 @@ async fn server_branch_list( db.branch_list().await.map_err(ApiError::from_omni)? }; branches.sort(); - Ok(Json(BranchListOutput { branches })) + Ok(BranchListOutput { branches }) } #[utoipa::path( @@ -2688,17 +2782,37 @@ async fn server_branch_create( Json(request): Json, ) -> std::result::Result, ApiError> { let from = request.from.unwrap_or_else(|| "main".to_string()); + Ok(Json( + do_branch_create( + &state, + &handle, + actor.as_ref().map(|Extension(a)| a), + request.name, + from, + ) + .await?, + )) +} + +/// Shared backend for `POST /branches` and the MCP `branches_create` tool. +/// Cedar `BranchCreate` gate, admission, then the engine fork. +pub(crate) async fn do_branch_create( + state: &AppState, + handle: &Arc, + actor: Option<&ResolvedActor>, + name: String, + from: String, +) -> std::result::Result { let actor_arc = actor - .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.actor_id)) + .map(|a| Arc::clone(&a.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::BranchCreate, branch: Some(from.clone()), - target_branch: Some(request.name.clone()), + target_branch: Some(name.clone()), }, )?; // Branch metadata only — small constant bytes estimate. The Lance @@ -2712,18 +2826,18 @@ async fn server_branch_create( let db = &handle.engine; db.branch_create_from_as( ReadTarget::branch(&from), - &request.name, - actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()), + &name, + actor.map(|a| a.actor_id.as_ref()), ) .await .map_err(ApiError::from_omni)?; } - Ok(Json(BranchCreateOutput { + Ok(BranchCreateOutput { uri: handle.uri.clone(), from, - name: request.name, - actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()), - })) + name, + actor_id: actor.map(|a| a.actor_id.as_ref().to_string()), + }) } /// Path-param shape for [`server_branch_delete`]. Named-field @@ -2768,15 +2882,32 @@ async fn server_branch_delete( actor: Option>, Path(BranchPath { branch }): Path, ) -> std::result::Result, ApiError> { + Ok(Json( + do_branch_delete( + &state, + &handle, + actor.as_ref().map(|Extension(a)| a), + branch, + ) + .await?, + )) +} + +/// Shared backend for `DELETE /branches/{branch}` and the MCP +/// `branches_delete` tool. Cedar `BranchDelete` gate, admission, then the +/// engine delete. +pub(crate) async fn do_branch_delete( + state: &AppState, + handle: &Arc, + actor: Option<&ResolvedActor>, + branch: String, +) -> std::result::Result { let actor_arc = actor - .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.actor_id)) + .map(|a| Arc::clone(&a.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor - .as_ref() - .map(|Extension(actor)| actor.actor_id.as_ref()); + let actor_id = actor.map(|a| a.actor_id.as_ref()); authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::BranchDelete, @@ -2795,11 +2926,11 @@ async fn server_branch_delete( .await .map_err(ApiError::from_omni)?; } - Ok(Json(BranchDeleteOutput { + Ok(BranchDeleteOutput { uri: handle.uri.clone(), name: branch, actor_id: actor_id.map(str::to_string), - })) + }) } #[utoipa::path( @@ -2831,19 +2962,37 @@ async fn server_branch_merge( Json(request): Json, ) -> std::result::Result, ApiError> { let target = request.target.unwrap_or_else(|| "main".to_string()); + Ok(Json( + do_branch_merge( + &state, + &handle, + actor.as_ref().map(|Extension(a)| a), + request.source, + target, + ) + .await?, + )) +} + +/// Shared backend for `POST /branches/merge` and the MCP `branches_merge` +/// tool. Cedar `BranchMerge` gate, admission, then the engine merge. +pub(crate) async fn do_branch_merge( + state: &AppState, + handle: &Arc, + actor: Option<&ResolvedActor>, + source: String, + target: String, +) -> std::result::Result { let actor_arc = actor - .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.actor_id)) + .map(|a| Arc::clone(&a.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor - .as_ref() - .map(|Extension(actor)| actor.actor_id.as_ref()); + let actor_id = actor.map(|a| a.actor_id.as_ref()); authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::BranchMerge, - branch: Some(request.source.clone()), + branch: Some(source.clone()), target_branch: Some(target.clone()), }, )?; @@ -2856,16 +3005,16 @@ async fn server_branch_merge( .map_err(ApiError::from_workload_reject)?; let outcome = { let db = &handle.engine; - db.branch_merge_as(&request.source, &target, actor_id) + db.branch_merge_as(&source, &target, actor_id) .await .map_err(ApiError::from_omni)? }; - Ok(Json(BranchMergeOutput { - source: request.source, + Ok(BranchMergeOutput { + source, target, outcome: outcome.into(), actor_id: actor_id.map(str::to_string), - })) + }) } #[utoipa::path( @@ -2890,24 +3039,36 @@ async fn server_commit_list( actor: Option>, Query(query): Query, ) -> std::result::Result, ApiError> { + Ok(Json( + do_commits_list(&handle, actor.as_ref().map(|Extension(a)| a), query.branch).await?, + )) +} + +/// Shared backend for `GET /commits` and the MCP `commits_list` tool. Cedar +/// `Read` gate (branch-scoped to the optional filter) then the commit list. +pub(crate) async fn do_commits_list( + handle: &Arc, + actor: Option<&ResolvedActor>, + branch: Option, +) -> std::result::Result { authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::Read, - branch: query.branch.clone(), + branch: branch.clone(), target_branch: None, }, )?; let commits = { let db = &handle.engine; - db.list_commits(query.branch.as_deref()) + db.list_commits(branch.as_deref()) .await .map_err(ApiError::from_omni)? }; - Ok(Json(CommitListOutput { + Ok(CommitListOutput { commits: commits.iter().map(api::commit_output).collect(), - })) + }) } /// Path-param shape for [`server_commit_show`]. See [`BranchPath`] @@ -2943,8 +3104,20 @@ async fn server_commit_show( actor: Option>, Path(CommitPath { commit_id }): Path, ) -> std::result::Result, ApiError> { + Ok(Json( + do_commit_show(&handle, actor.as_ref().map(|Extension(a)| a), &commit_id).await?, + )) +} + +/// Shared backend for `GET /commits/{id}` and the MCP `commits_get` tool. +/// Cedar `Read` gate then the single commit. +pub(crate) async fn do_commit_show( + handle: &Arc, + actor: Option<&ResolvedActor>, + commit_id: &str, +) -> std::result::Result { authorize_request( - actor.as_ref().map(|Extension(actor)| actor), + actor, handle.policy.as_deref(), PolicyRequest { action: PolicyAction::Read, @@ -2954,11 +3127,11 @@ async fn server_commit_show( )?; let commit = { let db = &handle.engine; - db.get_commit(&commit_id) + db.get_commit(commit_id) .await .map_err(ApiError::from_omni)? }; - Ok(Json(api::commit_output(&commit))) + Ok(api::commit_output(&commit)) } fn read_target_from_request(branch: Option, snapshot: Option) -> ReadTarget {