feat: canonical POST /load, deprecate /ingest (RFC-009 Phase 5) (#222)

* feat(server): canonical POST /load, deprecate /ingest (RFC-009 Phase 5)

The CLI's non-deprecated `load` verb rode the deprecated `/ingest` route, so
`/ingest`'s eventual removal would silently break it. Add a canonical `/load`,
mirroring the shipped `/mutate`↔`/change` and `/query`↔`/read` pattern.

- Extract `server_ingest`'s body into a shared `run_ingest` (branch-exists /
  fork-if-`from`, Cedar auth, admission, `load_as`, `IngestOutput` mapping).
- `server_load` (canonical) → `run_ingest`, `Json<IngestOutput>`.
- `server_ingest` (deprecated) → `run_ingest` + `#[deprecated]` + RFC 9745/8288
  `Deprecation: true` / `Link: </load>; rel="successor-version"` headers.
- Router mounts `/load` (same 32 MB body limit) beside `/ingest`; OpenAPI
  `paths(...)` gains `server_load` and flags `server_ingest` deprecated.

`/load` reuses `IngestRequest`/`IngestOutput`, exactly as canonical `/mutate`
reuses `Change*` — a DTO rename is a separate, larger change (out of scope).

openapi.json regenerated. Tests: openapi `/load` present + not deprecated,
`/ingest` deprecated, `/load` bearer-secured; data_routes `/load` happy path +
`/ingest` deprecation headers. Existing `/ingest` route tests stay green (the
shim is unchanged). Docs: server.md endpoint table; RFC-009 Phase 5 marked
landed (incl. the hand-mount-vs-utoipa-axum registration finding).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(cli): point remote load at /load (RFC-009 Phase 5)

`GraphClient::load`'s remote arm now POSTs to the canonical `/load` route
instead of the deprecated `/ingest`; the deprecated `ingest` verb keeps
riding `/ingest`. `parity_load` exercises `/load` on the remote arm (its
documented flip); the matrix exclusions comment is updated.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-14 03:32:16 +03:00 committed by GitHub
parent 6144bb18d6
commit 8726ca92ec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 325 additions and 57 deletions

View file

@ -304,10 +304,13 @@ impl GraphClient {
token,
} => {
let data = std::fs::read_to_string(data)?;
// RFC-009 Phase 5: the canonical `load` verb targets the
// canonical `/load` route (the deprecated `ingest` verb below
// still rides `/ingest`).
let output = remote_json::<IngestOutput>(
http,
Method::POST,
remote_url(base_url, "/ingest"),
remote_url(base_url, "/load"),
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.to_string()),
from: from.map(ToOwned::to_owned),

View file

@ -265,9 +265,9 @@ fn parity_errors_share_exit_codes() {
//
// - `graphs list`: server-only today; becomes Both-capability when the
// embedded arm enumerates the cluster catalog (RFC-009 open Q3, answered).
// - `ingest`: deprecated alias of load; the remote `load` arm itself rides
// the deprecated /ingest route today (RFC-009 Phase 5 flips it to /load —
// this matrix's `parity_load` row is where that flip becomes visible).
// - `ingest`: deprecated alias of load; its remote arm rides the deprecated
// /ingest route. The canonical `load` verb targets `/load` (RFC-009 Phase 5,
// landed) — `parity_load` exercises it on the remote arm.
// - `init`, `optimize`, `repair`, `cleanup`, `cluster *`: storage-plane by
// design (must work with the server down); Phase 4 declares this.
#[allow(dead_code)]

View file

@ -1183,46 +1183,22 @@ pub(crate) async fn server_schema_apply(
Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
}
#[utoipa::path(
post,
path = "/ingest",
tag = "mutations",
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Ingest results", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Bulk-load NDJSON data into a branch.
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. Branch creation is opt-in by
/// presence of `from`: with `from` set, a missing `branch` is created from
/// it; without `from`, `branch` must already exist — a missing branch is a
/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite`
/// or when the load produces conflicting writes.
pub(crate) async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
/// Shared body for `POST /load` (canonical) and `POST /ingest` (deprecated):
/// branch-exists / fork-if-`from` check, Cedar authorization, admission, the
/// bulk `load_as`, and the `IngestOutput` mapping.
async fn run_ingest(
state: AppState,
handle: Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
request: IngestRequest,
) -> std::result::Result<IngestOutput, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from;
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|actor| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
let actor_id = actor.map(|actor| actor.actor_id.as_ref());
let branch_exists = {
let db = &handle.engine;
@ -1244,7 +1220,7 @@ pub(crate) async fn server_ingest(
)));
}
Some(from) => authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
@ -1255,7 +1231,7 @@ pub(crate) async fn server_ingest(
}
}
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Change,
@ -1276,12 +1252,98 @@ pub(crate) async fn server_ingest(
.map_err(ApiError::from_omni)?
};
Ok(Json(ingest_output(
Ok(ingest_output(
handle.uri.as_str(),
&result,
mode,
actor_id.map(str::to_string),
)))
))
}
#[utoipa::path(
post,
path = "/load",
tag = "mutations",
operation_id = "load",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Bulk-load NDJSON data into a branch (canonical load endpoint).
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. Branch creation is opt-in by
/// presence of `from`: with `from` set, a missing `branch` is created from
/// it; without `from`, `branch` must already exist — a missing branch is a
/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite`
/// or when the load produces conflicting writes.
///
/// The legacy `POST /ingest` route has identical semantics and is kept as a
/// deprecated alias.
pub(crate) async fn server_load(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
Ok(Json(
run_ingest(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
request,
)
.await?,
))
}
#[utoipa::path(
post,
path = "/ingest",
tag = "mutations",
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results (response includes `Deprecation: true` + `Link: </load>; rel=\"successor-version\"`)", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
#[deprecated(note = "use POST /load instead; /ingest is kept indefinitely for back-compat")]
/// **Deprecated** — use [`POST /load`](#tag/mutations/operation/load) instead.
///
/// Bulk-load NDJSON data into a branch. Behavior is unchanged; the route is
/// kept indefinitely for back-compat. New integrations should target
/// `POST /load`, which has identical semantics. Responses from this route
/// include `Deprecation: true` and `Link: </load>; rel="successor-version"`
/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the signal.
pub(crate) async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<IngestOutput>), ApiError> {
let output = run_ingest(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
request,
)
.await?;
Ok((
deprecation_headers("</load>; rel=\"successor-version\""),
Json(output),
))
}
#[utoipa::path(

View file

@ -107,7 +107,10 @@ fn hash_bearer_token(token: &str) -> BearerTokenHash {
handlers::server_invoke_query,
handlers::server_schema_apply,
handlers::server_schema_get,
handlers::server_ingest,
handlers::server_load,
// deprecated; the #[deprecated] attribute on the handler surfaces as
// `deprecated: true` on the OpenAPI operation.
#[allow(deprecated)] handlers::server_ingest,
handlers::server_branch_list,
handlers::server_branch_create,
handlers::server_branch_delete,
@ -934,9 +937,20 @@ pub fn build_app(state: AppState) -> Router {
.route("/queries/{name}", post(server_invoke_query))
.route("/schema", get(server_schema_get))
.route("/schema/apply", post(server_schema_apply))
.route(
"/load",
post(server_load).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
)
// /ingest is the deprecated alias of /load; its handler carries
// #[deprecated] (OpenAPI operation flagged) and emits RFC 9745
// Deprecation + RFC 8288 Link headers. Suppress the call-site warning.
.route(
"/ingest",
post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
post({
#[allow(deprecated)]
server_ingest
})
.layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
)
.route(
"/branches",

View file

@ -620,6 +620,83 @@ async fn change_endpoint_emits_deprecation_headers() {
);
}
#[tokio::test(flavor = "multi_thread")]
async fn load_endpoint_loads_into_existing_branch() {
// Canonical bulk-load endpoint (RFC-009 Phase 5). Same wire shape as
// /ingest, no deprecation signal.
let (_temp, app) = app_for_loaded_graph().await;
let request = IngestRequest {
branch: Some("main".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Loaded","age":7}}"#.to_string(),
};
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/load")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
assert!(
response.headers().get("deprecation").is_none(),
"POST /load must not advertise itself as deprecated"
);
let body_bytes = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let body: Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["branch"], "main");
assert_eq!(body["tables"][0]["table_key"], "node:Person");
}
#[tokio::test(flavor = "multi_thread")]
async fn ingest_endpoint_emits_deprecation_headers() {
// `/ingest` is the deprecated alias of `/load` (RFC-009 Phase 5): flagged
// at runtime per RFC 9745 (`Deprecation: true`) + RFC 8288 (`Link: </load>;
// rel="successor-version"`). The OpenAPI side is covered by
// `openapi_ingest_is_deprecated` in tests/openapi.rs.
let (_temp, app) = app_for_loaded_graph().await;
let request = IngestRequest {
branch: Some("main".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Legacyer","age":33}}"#.to_string(),
};
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get("deprecation")
.and_then(|v| v.to_str().ok()),
Some("true"),
"POST /ingest must advertise `Deprecation: true` (RFC 9745)"
);
assert_eq!(
response.headers().get("link").and_then(|v| v.to_str().ok()),
Some("</load>; rel=\"successor-version\""),
"POST /ingest must point at /load via `Link` rel=successor-version (RFC 8288)"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn read_endpoint_emits_deprecation_headers() {
// `/read` is kept indefinitely for byte-stable back-compat but flagged

View file

@ -172,6 +172,7 @@ const EXPECTED_PATHS: &[&str] = &[
"/queries/{name}",
"/schema",
"/schema/apply",
"/load",
"/ingest",
"/branches",
"/branches/{branch}",
@ -300,6 +301,32 @@ fn openapi_ingest_is_post() {
assert!(doc["paths"]["/ingest"]["post"].is_object());
}
#[test]
fn openapi_load_is_not_deprecated() {
// RFC-009 Phase 5: /load is the canonical bulk-load endpoint.
let doc = openapi_json();
assert!(doc["paths"]["/load"]["post"].is_object());
let deprecated = doc["paths"]["/load"]["post"]
.get("deprecated")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
assert!(
!deprecated,
"/load is the canonical load endpoint and must not be deprecated"
);
}
#[test]
fn openapi_ingest_is_deprecated() {
// RFC-009 Phase 5: /ingest is now the deprecated alias of /load.
let doc = openapi_json();
assert_eq!(
doc["paths"]["/ingest"]["post"]["deprecated"],
serde_json::Value::Bool(true),
"/ingest must be flagged deprecated now that /load is canonical"
);
}
#[test]
fn openapi_branches_supports_get_and_post() {
let doc = openapi_json();
@ -705,6 +732,7 @@ fn protected_endpoints_reference_bearer_token_security() {
("/schema/apply", "post"),
("/queries", "get"),
("/queries/{name}", "post"),
("/load", "post"),
("/ingest", "post"),
("/export", "post"),
("/snapshot", "get"),

View file

@ -161,15 +161,20 @@ and cluster commands must work with the server down) explicit in code.
"Server" targets include operator-config named servers (RFC-007), not only
literal `http(s)://` URIs.
### Phase 5 — Route alignment
### Phase 5 — Route alignment (landed)
Add a canonical `/load` endpoint (the handler already exists behind the
`/ingest` shim); point `RemoteClient` at it; keep `/ingest` on its existing
deprecation path. While here, check whether the server uses `utoipa-axum`'s
router-coupled registration (`OpenApiRouter`/`routes!`); if it hand-mounts
routes beside `#[utoipa::path]` annotations, prefer migrating registration so
path annotations and mount points are the same declaration (the modularization
already hit one orphaned-attribute incident of exactly this class).
Added a canonical `POST /load` (shared `run_ingest` body; the deprecated
`/ingest` is now a thin alias carrying `#[deprecated]` + RFC 9745/8288
`Deprecation`/`Link: </load>` headers, exactly mirroring `/mutate``/change`)
and pointed the CLI's remote `load` arm at it; `/ingest` stays on its
deprecation path. `/load` reuses `IngestRequest`/`IngestOutput` (as canonical
`/mutate` reuses `Change*`); a DTO rename is a separate change.
Registration finding: the server **hand-mounts** routes (`.route(...)`) beside a
manual `#[openapi(paths(...))]` list, not `utoipa-axum`'s `OpenApiRouter`/
`routes!`. This PR followed the existing manual pattern (one `.route` + one
`paths(...)` entry + the `#[utoipa::path]` annotation) rather than migrating
registration — the migration is a worthwhile but orthogonal cleanup, deferred.
## Non-goals

View file

@ -56,7 +56,8 @@ Per-graph endpoints — same body shape across modes; URLs differ:
| POST | `/queries/{name}` | `/graphs/{id}/queries/{name}` | bearer + `invoke_query` (+ `change` for a stored mutation) | invoke a named query from the `queries:` registry; deny == 404 | `server_invoke_query` |
| GET | `/schema` | `/graphs/{id}/schema` | bearer + `read` | get current `.pg` source | `server_schema_get` |
| POST | `/schema/apply` | `/graphs/{id}/schema/apply` | bearer + `schema_apply` (target=`main`) | migrate | `server_schema_apply` |
| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | bulk load; branch creation is opt-in via `from` — without it a missing `branch` is a 404, never an implicit fork | `server_ingest` (32 MB body limit) |
| POST | `/load` | `/graphs/{id}/load` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | bulk load (canonical); branch creation is opt-in via `from` — without it a missing `branch` is a 404, never an implicit fork | `server_load` (32 MB body limit) |
| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | **deprecated** alias of `/load` (carries `Deprecation: true` + `Link: </load>; rel="successor-version"`) | `server_ingest` (32 MB body limit) |
| GET | `/branches` | `/graphs/{id}/branches` | bearer + `read` | list branches | `server_branch_list` |
| POST | `/branches` | `/graphs/{id}/branches` | bearer + `branch_create` | create | `server_branch_create` |
| DELETE | `/branches/{branch}` | `/graphs/{id}/branches/{branch}` | bearer + `branch_delete` | delete | `server_branch_delete` |

View file

@ -670,8 +670,8 @@
"tags": [
"mutations"
],
"summary": "Bulk-load NDJSON data into a branch.",
"description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. Branch creation is opt-in by\npresence of `from`: with `from` set, a missing `branch` is created from\nit; without `from`, `branch` must already exist — a missing branch is a\n404, never an implicit fork. **Destructive** when `mode` is `overwrite`\nor when the load produces conflicting writes.",
"summary": "**Deprecated** — use [`POST /load`](#tag/mutations/operation/load) instead.",
"description": "Bulk-load NDJSON data into a branch. Behavior is unchanged; the route is\nkept indefinitely for back-compat. New integrations should target\n`POST /load`, which has identical semantics. Responses from this route\ninclude `Deprecation: true` and `Link: </load>; rel=\"successor-version\"`\nheaders per RFC 9745 / RFC 8288 so SDKs and proxies can surface the signal.",
"operationId": "ingest",
"requestBody": {
"content": {
@ -685,7 +685,85 @@
},
"responses": {
"200": {
"description": "Ingest results",
"description": "Load results (response includes `Deprecation: true` + `Link: </load>; rel=\"successor-version\"`)",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/IngestOutput"
}
}
}
},
"400": {
"description": "Bad request",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"401": {
"description": "Unauthorized",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"403": {
"description": "Forbidden",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"429": {
"description": "Per-actor admission cap exceeded; honor `Retry-After` header",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
}
},
"deprecated": true,
"security": [
{
"bearer_token": []
}
]
}
},
"/load": {
"post": {
"tags": [
"mutations"
],
"summary": "Bulk-load NDJSON data into a branch (canonical load endpoint).",
"description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. Branch creation is opt-in by\npresence of `from`: with `from` set, a missing `branch` is created from\nit; without `from`, `branch` must already exist — a missing branch is a\n404, never an implicit fork. **Destructive** when `mode` is `overwrite`\nor when the load produces conflicting writes.\n\nThe legacy `POST /ingest` route has identical semantics and is kept as a\ndeprecated alias.",
"operationId": "load",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/IngestRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Load results",
"content": {
"application/json": {
"schema": {