refactor(server): extract do_* inner fns from server_* handlers (MR-969/RFC-003)

Mechanical and behavior-neutral. Each server_* HTTP handler becomes a thin
wrapper over a pub(crate) do_* fn that takes the resolved actor plus plain
args (the shape run_query/run_mutate already use), so the upcoming in-server
MCP tools can dispatch one path instead of duplicating the Cedar gate,
admission, and engine call.

11 fns: do_snapshot, do_schema_get, do_branches_list, do_commits_list,
do_commit_show, do_schema_apply, do_ingest, do_branch_create,
do_branch_delete, do_branch_merge, do_graphs_list. The Cedar PolicyRequest,
admission ordering (gate then try_admit), actor/anonymous derivation, and
engine args stay identical; only the Json(...) wrap moves to the call site.

All 105 server tests pass.
This commit is contained in:
Ragnor Comerford 2026-06-08 15:26:18 +02:00
parent e62d9166fb
commit 69fae71870
No known key found for this signature in database

View file

@ -1404,6 +1404,19 @@ async fn server_graphs_list(
State(state): State<AppState>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<GraphListResponse>, ApiError> {
Ok(Json(
do_graphs_list(&state, actor.as_ref().map(|Extension(a)| a)).await?,
))
}
/// Shared backend for `GET /graphs` and the MCP `graphs_list` tool +
/// `omnigraph://graphs` resource. Server-scoped: 405 in single mode (no
/// registry to enumerate), else the server-level `GraphList` Cedar gate then
/// the sorted registry list.
pub(crate) async fn do_graphs_list(
state: &AppState,
actor: Option<&ResolvedActor>,
) -> std::result::Result<GraphListResponse, ApiError> {
// 405 in single mode — there's no registry to enumerate, and the
// legacy URL surface didn't expose this endpoint.
let registry = match state.routing() {
@ -1424,7 +1437,7 @@ async fn server_graphs_list(
// is the right default (don't leak the registry until the operator
// explicitly authorizes it).
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
state.server_policy.as_deref(),
PolicyRequest {
action: PolicyAction::GraphList,
@ -1442,7 +1455,7 @@ async fn server_graphs_list(
})
.collect();
graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id));
Ok(Json(GraphListResponse { graphs }))
Ok(GraphListResponse { graphs })
}
async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
@ -1823,8 +1836,24 @@ async fn server_snapshot(
Query(query): Query<SnapshotQuery>,
) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
let branch = query.branch.unwrap_or_else(|| "main".to_string());
Ok(Json(
do_snapshot(&handle, actor.as_ref().map(|Extension(a)| a), branch).await?,
))
}
/// Shared backend for `GET /snapshot` and the MCP `snapshot` tool.
///
/// The Cedar `Read` gate plus the engine snapshot read live here so the HTTP
/// handler and the MCP tool dispatch one path (MR-969 / RFC-003). Same shape
/// as `run_query`/`run_mutate`: takes the resolved actor and the args, never
/// an Axum extractor.
pub(crate) async fn do_snapshot(
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
branch: String,
) -> std::result::Result<api::SnapshotOutput, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
@ -1838,7 +1867,7 @@ async fn server_snapshot(
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(snapshot_payload(&branch, &snapshot)))
Ok(snapshot_payload(&branch, &snapshot))
}
/// Header values that flag a response as coming from a deprecated route
@ -2443,8 +2472,19 @@ async fn server_schema_get(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<SchemaOutput>, ApiError> {
Ok(Json(
do_schema_get(&handle, actor.as_ref().map(|Extension(a)| a)).await?,
))
}
/// Shared backend for `GET /schema` and the MCP `schema_get` tool +
/// `omnigraph://schema` resource. Cedar `Read` gate then the schema source.
pub(crate) async fn do_schema_get(
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
) -> std::result::Result<SchemaOutput, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
@ -2456,7 +2496,7 @@ async fn server_schema_get(
let db = &handle.engine;
db.schema_source().to_string()
};
Ok(Json(SchemaOutput { schema_source }))
Ok(SchemaOutput { schema_source })
}
#[utoipa::path(
@ -2486,15 +2526,34 @@ async fn server_schema_apply(
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<SchemaApplyRequest>,
) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
Ok(Json(
do_schema_apply(
&state,
&handle,
actor.as_ref().map(|Extension(a)| a),
&request.schema_source,
request.allow_data_loss,
)
.await?,
))
}
/// Shared backend for `POST /schema/apply` and the MCP `schema_apply` tool.
/// Cedar `SchemaApply` gate, per-actor admission, then the catalog-checked
/// engine apply.
pub(crate) async fn do_schema_apply(
state: &AppState,
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
schema_source: &str,
allow_data_loss: bool,
) -> std::result::Result<SchemaApplyOutput, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|a| Arc::clone(&a.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
let actor_id = actor.map(|a| a.actor_id.as_ref());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::SchemaApply,
@ -2502,7 +2561,7 @@ async fn server_schema_apply(
target_branch: Some("main".to_string()),
},
)?;
let est_bytes = request.schema_source.len() as u64;
let est_bytes = schema_source.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
@ -2518,9 +2577,9 @@ async fn server_schema_apply(
// HTTP-layer authorize_request just made above. PR #3 collapses
// the redundancy.
db.apply_schema_as_with_catalog_check(
&request.schema_source,
schema_source,
omnigraph::db::SchemaApplyOptions {
allow_data_loss: request.allow_data_loss,
allow_data_loss,
},
actor_id,
|catalog| {
@ -2533,7 +2592,7 @@ async fn server_schema_apply(
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
Ok(schema_apply_output(handle.uri.as_str(), result))
}
#[utoipa::path(
@ -2567,13 +2626,37 @@ async fn server_ingest(
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from.unwrap_or_else(|| "main".to_string());
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
Ok(Json(
do_ingest(
&state,
&handle,
actor.as_ref().map(|Extension(a)| a),
&request.data,
mode,
branch,
from,
)
.await?,
))
}
/// Shared backend for `POST /ingest` and the MCP `ingest` tool. `branch`,
/// `from`, and `mode` are already defaulted by the caller. Gates `Change`
/// (plus `BranchCreate` when ingesting into a not-yet-existing branch), then
/// per-actor admission, then the engine ingest.
pub(crate) async fn do_ingest(
state: &AppState,
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
data: &str,
mode: omnigraph::loader::LoadMode,
branch: String,
from: String,
) -> std::result::Result<IngestOutput, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|a| Arc::clone(&a.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
let actor_id = actor.map(|a| a.actor_id.as_ref());
let branch_exists = {
let db = &handle.engine;
@ -2586,7 +2669,7 @@ async fn server_ingest(
if !branch_exists {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
@ -2596,7 +2679,7 @@ async fn server_ingest(
)?;
}
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Change,
@ -2604,7 +2687,7 @@ async fn server_ingest(
target_branch: None,
},
)?;
let est_bytes = request.data.len() as u64;
let est_bytes = data.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
@ -2612,16 +2695,16 @@ async fn server_ingest(
let result = {
let db = &handle.engine;
db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
db.ingest_as(&branch, Some(&from), data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(ingest_output(
Ok(ingest_output(
handle.uri.as_str(),
&result,
actor_id.map(str::to_string),
)))
))
}
#[utoipa::path(
@ -2643,8 +2726,19 @@ async fn server_branch_list(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<BranchListOutput>, ApiError> {
Ok(Json(
do_branches_list(&handle, actor.as_ref().map(|Extension(a)| a)).await?,
))
}
/// Shared backend for `GET /branches` and the MCP `branches_list` tool +
/// `omnigraph://branches` resource. Cedar `Read` gate then the sorted list.
pub(crate) async fn do_branches_list(
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
) -> std::result::Result<BranchListOutput, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
@ -2657,7 +2751,7 @@ async fn server_branch_list(
db.branch_list().await.map_err(ApiError::from_omni)?
};
branches.sort();
Ok(Json(BranchListOutput { branches }))
Ok(BranchListOutput { branches })
}
#[utoipa::path(
@ -2688,17 +2782,37 @@ async fn server_branch_create(
Json(request): Json<BranchCreateRequest>,
) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
let from = request.from.unwrap_or_else(|| "main".to_string());
Ok(Json(
do_branch_create(
&state,
&handle,
actor.as_ref().map(|Extension(a)| a),
request.name,
from,
)
.await?,
))
}
/// Shared backend for `POST /branches` and the MCP `branches_create` tool.
/// Cedar `BranchCreate` gate, admission, then the engine fork.
pub(crate) async fn do_branch_create(
state: &AppState,
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
name: String,
from: String,
) -> std::result::Result<BranchCreateOutput, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|a| Arc::clone(&a.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
target_branch: Some(request.name.clone()),
target_branch: Some(name.clone()),
},
)?;
// Branch metadata only — small constant bytes estimate. The Lance
@ -2712,18 +2826,18 @@ async fn server_branch_create(
let db = &handle.engine;
db.branch_create_from_as(
ReadTarget::branch(&from),
&request.name,
actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
&name,
actor.map(|a| a.actor_id.as_ref()),
)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchCreateOutput {
Ok(BranchCreateOutput {
uri: handle.uri.clone(),
from,
name: request.name,
actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
}))
name,
actor_id: actor.map(|a| a.actor_id.as_ref().to_string()),
})
}
/// Path-param shape for [`server_branch_delete`]. Named-field
@ -2768,15 +2882,32 @@ async fn server_branch_delete(
actor: Option<Extension<ResolvedActor>>,
Path(BranchPath { branch }): Path<BranchPath>,
) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
Ok(Json(
do_branch_delete(
&state,
&handle,
actor.as_ref().map(|Extension(a)| a),
branch,
)
.await?,
))
}
/// Shared backend for `DELETE /branches/{branch}` and the MCP
/// `branches_delete` tool. Cedar `BranchDelete` gate, admission, then the
/// engine delete.
pub(crate) async fn do_branch_delete(
state: &AppState,
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
branch: String,
) -> std::result::Result<BranchDeleteOutput, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|a| Arc::clone(&a.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
let actor_id = actor.map(|a| a.actor_id.as_ref());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchDelete,
@ -2795,11 +2926,11 @@ async fn server_branch_delete(
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchDeleteOutput {
Ok(BranchDeleteOutput {
uri: handle.uri.clone(),
name: branch,
actor_id: actor_id.map(str::to_string),
}))
})
}
#[utoipa::path(
@ -2831,19 +2962,37 @@ async fn server_branch_merge(
Json(request): Json<BranchMergeRequest>,
) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
let target = request.target.unwrap_or_else(|| "main".to_string());
Ok(Json(
do_branch_merge(
&state,
&handle,
actor.as_ref().map(|Extension(a)| a),
request.source,
target,
)
.await?,
))
}
/// Shared backend for `POST /branches/merge` and the MCP `branches_merge`
/// tool. Cedar `BranchMerge` gate, admission, then the engine merge.
pub(crate) async fn do_branch_merge(
state: &AppState,
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
source: String,
target: String,
) -> std::result::Result<BranchMergeOutput, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|a| Arc::clone(&a.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
let actor_id = actor.map(|a| a.actor_id.as_ref());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchMerge,
branch: Some(request.source.clone()),
branch: Some(source.clone()),
target_branch: Some(target.clone()),
},
)?;
@ -2856,16 +3005,16 @@ async fn server_branch_merge(
.map_err(ApiError::from_workload_reject)?;
let outcome = {
let db = &handle.engine;
db.branch_merge_as(&request.source, &target, actor_id)
db.branch_merge_as(&source, &target, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(BranchMergeOutput {
source: request.source,
Ok(BranchMergeOutput {
source,
target,
outcome: outcome.into(),
actor_id: actor_id.map(str::to_string),
}))
})
}
#[utoipa::path(
@ -2890,24 +3039,36 @@ async fn server_commit_list(
actor: Option<Extension<ResolvedActor>>,
Query(query): Query<CommitListQuery>,
) -> std::result::Result<Json<CommitListOutput>, ApiError> {
Ok(Json(
do_commits_list(&handle, actor.as_ref().map(|Extension(a)| a), query.branch).await?,
))
}
/// Shared backend for `GET /commits` and the MCP `commits_list` tool. Cedar
/// `Read` gate (branch-scoped to the optional filter) then the commit list.
pub(crate) async fn do_commits_list(
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
branch: Option<String>,
) -> std::result::Result<CommitListOutput, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: query.branch.clone(),
branch: branch.clone(),
target_branch: None,
},
)?;
let commits = {
let db = &handle.engine;
db.list_commits(query.branch.as_deref())
db.list_commits(branch.as_deref())
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(CommitListOutput {
Ok(CommitListOutput {
commits: commits.iter().map(api::commit_output).collect(),
}))
})
}
/// Path-param shape for [`server_commit_show`]. See [`BranchPath`]
@ -2943,8 +3104,20 @@ async fn server_commit_show(
actor: Option<Extension<ResolvedActor>>,
Path(CommitPath { commit_id }): Path<CommitPath>,
) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
Ok(Json(
do_commit_show(&handle, actor.as_ref().map(|Extension(a)| a), &commit_id).await?,
))
}
/// Shared backend for `GET /commits/{id}` and the MCP `commits_get` tool.
/// Cedar `Read` gate then the single commit.
pub(crate) async fn do_commit_show(
handle: &Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
commit_id: &str,
) -> std::result::Result<api::CommitOutput, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
@ -2954,11 +3127,11 @@ async fn server_commit_show(
)?;
let commit = {
let db = &handle.engine;
db.get_commit(&commit_id)
db.get_commit(commit_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(api::commit_output(&commit)))
Ok(api::commit_output(&commit))
}
fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {