mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-21 02:28:07 +02:00
* test(engine): reproduce empty-table Vector @index aborting schema apply
A Vector (IVF) index trains k-means centroids over the column, so Lance
cannot build it on 0 vectors ("Creating empty vector indices with
train=False is not yet implemented"). schema apply reconciles a table's
whole index set whenever any @index on it changes, so adding an unrelated
scalar @index materializes the dormant empty vector index and aborts the
entire migration (all-or-nothing).
This regression test inits a 0-row Doc with a Vector @index, adds a scalar
@index, and asserts the apply succeeds (then loads one embedded row and
asserts the deferred index materializes). It fails today at the apply step
with the vector-index abort; the fix lands in the next commit.
Refs dev-graph iss-empty-vector-index-schema-apply, iss-848.
* fix(engine): defer Vector @index on an empty table instead of aborting schema apply
build_indices_on_dataset_for_catalog materialized a declared Vector @index
unconditionally. On a 0-row table Lance cannot train the IVF index
("Creating empty vector indices with train=False is not yet implemented"),
so any later migration that touches the table (e.g. adding an unrelated
scalar @index, which reconciles the table's whole index set) aborted the
entire migration on the dormant vector index — all-or-nothing.
Guard the vector arm with a row-count check, matching the guard
ensure_indices_for_branch and the branch-merge rebuild already use: an
untrainable column becomes a pending index that a later ensure_indices /
optimize materializes once the table has rows. Reads stay correct meanwhile
(vector search degrades to a brute-force scan).
Stop-gap: the residual rows-present-but-vectors-null window and the full
decoupling (intent recorded at apply, an idempotent coverage reconciler)
are dev-graph iss-848. Turns the green half of the regression test added in
the previous commit.
Refs dev-graph iss-empty-vector-index-schema-apply, iss-848, iss-687.
* docs(invariants): record the logical-contract-over-physical-state principle
The bug class behind the empty-table vector-index abort (and the schema-apply
vs optimize version drift) is one shape: a physical operation allowed to fail
a logical one. Several hard invariants (2, 5, 7, 13) and deny-list items are
already instances of this, but the unifying rule was never written down.
Add it to docs/dev/invariants.md as a "Governing principle" section above the
hard invariants, naming which invariants and deny-list items instantiate it
and the smell to watch for (a logical operation gated on a physical fact).
Add a one-line always-on rule (7) in AGENTS.md so it stays in working memory,
with the qualifier that genuine logical conflicts still fail loudly — the
licence to lag covers physical convergence, not correctness.
Audience-neutral: no private ticket refs. check-agents-md.sh passes.
* test(engine): index build must tolerate rows with null vectors (load-before-embed)
Loading rows whose vector column is null into a `Vector @index` table fails
today: build_indices (reached via the loader's prepare_updates_for_commit)
calls create_vector_index, and Lance's IVF KMeans errors "cannot train 1
centroids with 0 vectors". The same abort hits ensure_indices/optimize/schema
apply/merge, since they all funnel through build_indices_on_dataset_for_catalog.
This test loads two null-embedding rows and calls ensure_indices; it must not
abort (the untrainable vector column is deferred, sibling indexes still build).
Fails today at the load step; fixed in the next commit.
Refs dev-graph iss-848, iss-empty-vector-index-schema-apply.
* fix(engine): defer unbuildable index columns instead of aborting the write path
build_indices_on_dataset_for_catalog is the chokepoint every write path funnels
through (load/mutate via prepare_updates_for_commit, schema apply, ensure_indices,
optimize, branch merge). Its vector arm called create_vector_index
unconditionally, so a column with no trainable vectors yet — an empty table, or
rows loaded before `embed` populates them — aborted the whole operation with
Lance's IVF KMeans error.
Fault-isolate the vector build: on failure, record the column as a PendingIndex
(table, column, reason), log it, and continue building the sibling indexes; a
later ensure_indices/optimize materializes it once the column is trainable, and
reads use brute-force meanwhile. Manifest/CAS/IO errors at the publish boundary
still propagate. Isolating at the single chokepoint realizes the governing
principle (physical index state never fails a logical operation) for every write
path, and supersedes the earlier symptomatic count_rows==0 stop-gap (removed) —
closing the residual rows-present-but-vectors-null window it left open.
Surfacing pending index status rather than failing is the database norm
(Postgres indisvalid, LanceDB list_indices). ensure_indices and the build_indices
wrappers now return Vec<PendingIndex>; optimize surfaces it in a later commit.
Refs dev-graph iss-848, iss-951 (vector index stays inline-commit until lance#6666).
* test(engine): index-only schema apply must not touch table data
Adding an @index to an existing column should be a pure metadata change once
index materialization moves to the reconciler (iss-848): the apply records the
intent in the catalog/IR but builds nothing inline, so the table's manifest
version is unchanged. Today the indexed_tables block builds the index inline
and bumps the version (4 -> 5). Fixed in the next commit.
Refs dev-graph iss-848.
* fix(engine): schema apply records index intent only; index-only apply is metadata
Schema apply no longer builds indexes inline. The four build_indices calls
(added/renamed/rewritten/index-only tables) are removed; the @index/@key intent
is already persisted in the catalog/IR the apply writes, and the physical index
is materialized off the critical path by ensure_indices/optimize (iss-848).
Concretely:
- AddConstraint (an @index addition — every other added constraint plans as
UnsupportedChange) becomes a pure metadata step alongside the metadata-only
steps: it touches no table data, so the table version is unchanged.
- added/renamed/rewritten tables still write their data; only the trailing
index build is gone. The rewritten table's coverage is restored later by
optimize_indices.
- recovery_pins drops index-only tables (they no longer advance Lance HEAD) and
keeps rewritten tables; their post_commit_pin = expected+1 is now exact (one
rewrite commit), strengthening recovery classification.
- the now-orphaned Omnigraph::build_indices_on_dataset_for_catalog wrapper is
removed.
A migration can no longer abort on an index build, for any index type at any
cardinality. Turns the green half of index_only_constraint_apply_touches_no_table_data.
Refs dev-graph iss-848.
* test(engine): optimize must converge a declared-but-unbuilt index
After iss-848, adding an @index post-data is a metadata-only apply that defers
the physical build, so the column is declared-indexed but unbuilt (reads scan).
`optimize` — the operator's cron reconciler — must materialize it. Today optimize
only maintains coverage of EXISTING indexes (optimize_indices) and never creates
missing ones, so the rank BTREE stays Degraded after optimize. Fixed next commit.
Refs dev-graph iss-848.
* fix(engine): optimize materializes declared-but-unbuilt indexes (the reconciler)
`omnigraph optimize` is the operator's cron reconciler. It already compacts and
folds new fragments into EXISTING indexes (optimize_indices); now it also builds
declared-but-missing indexes, so the indexes schema apply / load defer (iss-848)
converge on the next optimize.
Done inside optimize_one_table (not by composing the all-tables ensure_indices,
which is drift-blind and would re-publish the uncovered HEAD>manifest drift that
optimize deliberately skips): after the per-table drift/blob skips and under the
queue + Optimize sidecar already held, a needs_index_create gate (reusing
needs_index_work_node/edge — "declared index missing AND row_count > 0", so empty
tables stay no-ops) admits index-only work, and Phase B builds the missing index
over the just-compacted layout via the build chokepoint. An untrainable vector
column fault-isolates into the new TableOptimizeStats.pending_indexes (the
list_indices/indisvalid analog operators read), not a failure. committed now
reflects index commits, so the existing post-publish cache invalidation covers
them. LanceDB's optimize only maintains existing indexes; creating
declared-but-missing ones is the L2 behavior omnigraph's declarative @index needs.
Turns the green half of optimize_materializes_index_declared_but_unbuilt.
Refs dev-graph iss-848.
* docs: index materialization is deferred to the reconciler (iss-848)
Update the index-lifecycle docs to reflect the new contract: @index/@key
declares intent and the physical index is derived state that never fails a
logical operation. Schema apply builds nothing (records intent only);
load/mutate build inline through one chokepoint that defers an untrainable
Vector column as pending; optimize/ensure_indices is the reconciler that
creates declared-but-missing indexes and maintains coverage, reporting
still-pending columns.
Touches: dev/invariants.md (truth-matrix Index-lifecycle row), AGENTS.md
(capability matrix), user/search/indexes.md (L2 orchestration), user/operations/
maintenance.md (optimize reconciler bullet), dev/testing.md (new tests).
* test(server): schema_apply_route_can_add_index reflects deferred index build
iss-848 made schema apply record @index intent without building the physical
index inline. The route test asserted the index count increased after apply;
on an empty graph it now stays unchanged (the build is deferred to
ensure_indices/optimize). Assert the new contract: apply succeeds and the
physical index count is unchanged.
* fix(engine): precheck vector trainability — don't pin or swallow (PR review)
Two issues Cursor Bugbot caught in the chokepoint fault-isolation:
1. (HIGH) Pending vector pins roll back siblings. needs_index_work_node counted
a missing vector index as work whenever the table had rows, so a column with
no trainable vectors got pinned in the EnsureIndices recovery sidecar — but
the build deferred it (zero commit). On a crash before manifest publish the
classifier sees NoMovement and the all-or-nothing decision (recovery.rs
decide()) rolls back the WHOLE sidecar, undoing a sibling table's committed
index work.
2. (MED) Vector build swallowed fatal errors. The match arm converted every
create_vector_index error into a deferred PendingIndex, hiding genuine
I/O/manifest/Lance failures as "pending".
Fix both with one trainability precheck (vector_column_trainable: >=1 non-null
vector, the ivf_flat(1) minimum) used identically by needs_index_work_node and
the build arm: an untrainable column is never counted as work (so never pinned —
no zero-commit pin) and never attempted (so it can't fail); only a trainable
column is built, and then any error PROPAGATES (stays fatal). The deferred
column is still recorded as a PendingIndex with a clear reason.
Refs dev-graph iss-848.
* feat(cli): surface pending index column + reason in optimize output (PR review)
Codex (P2): pending_indexes was documented as visible in `optimize --json` but
the CLI projection never emitted it — operators would lose the only signal that
optimize has deferred index work. Greptile (P2): the stat dropped the reason, so
operators saw which column was stuck, not why.
Carry the reason: TableOptimizeStats.pending_indexes is now Vec<PendingIndex>
(column + reason), and `omnigraph optimize --json` emits {column, reason} per
pending index; human output prints a "↳ index pending on '<col>': <reason>" line.
Refs dev-graph iss-848.
* test: align CLI index-add test with deferred build; cover post-rename reconcile
- schema_apply_json_adds_index_for_existing_property (cli_schema_config.rs): the
CLI analog of the server test — asserted the index count grew after apply;
under iss-848 the apply defers the build, so the count is unchanged on an
empty graph. Assert the deferred contract. (The only full-suite failure.)
- optimize_materializes_index_after_type_rename (maintenance.rs, new): covers
the gap Greptile flagged — a RenameType writes the renamed table with rows but
no indexes (inline build removed in Commit B); assert the rank index is
Degraded post-rename and Indexed after optimize reconciles it.
Refs dev-graph iss-848.
* test(engine): in-source apply tests reflect deferred index materialization
The two db::omnigraph in-source unit tests asserted the old "schema apply builds
/ preserves indexes inline" behavior (the only remaining full-suite failures):
- test_apply_schema_defers_index_then_reconciler_builds_it (was
test_apply_schema_adds_index_for_existing_property): apply records the @index
intent but builds nothing; assert the BTREE on `age` is absent after apply and
present after ensure_indices. (Uses `age`, unindexed in TEST_SCHEMA — `name
@key` is already FTS-indexed at seed.)
- test_apply_schema_rewrite_defers_index_then_reconciler_restores (was
test_apply_schema_rewrite_preserves_existing_indices): an AddProperty rewrite
no longer rebuilds indexes inline; assert ensure_indices restores id BTREE +
name FTS after the rewrite.
Verified by grep that these + the server/CLI tests are the complete set of
"apply builds an index" assertions; all other index-presence tests run after
load/ensure_indices/primitives, which still build.
Refs dev-graph iss-848.
* fix(engine): optimize always reports pending indexes, not only on create-work (PR review)
Cursor Bugbot (MED): pending_indexes was filled only when needs_index_create was
true, but the vector trainability precheck makes needs_index_work_node exclude an
untrainable Vector column. So a table whose sole missing index is untrainable, but
which optimize still compacts or reindexes, returned an empty pending_indexes —
contradicting the documented operator contract for deferred columns.
Run the (idempotent) build chokepoint unconditionally once past the no-op gate,
rather than gating it on needs_index_create. It skips existing indexes, builds
any buildable missing one, and reports an untrainable column as pending whether
the table entered for compaction, reindex, or index creation. needs_index_create
still gates the no-op decision (so an index-only table still enters the path).
Refs dev-graph iss-848.
* test(engine): reframe staged-BTREE-failure failpoint onto the reconciler path
ensure_indices_stage_btree_failure_leaves_existing_tables_writable fired
`ensure_indices.post_stage_pre_commit_btree` and expected `apply_schema` (adding
a type) to fail mid-BTREE-build. iss-848 removed apply's inline index build, so
that apply now succeeds and the test's unwrap_err panicked — it exercised a
removed code path.
Reframe onto where BTREE builds happen now: seed Person, add an `@index` on
`age` (apply records intent, defers the build), then `ensure_indices` builds the
deferred BTREE and the failpoint fires between stage and commit. Person's HEAD
is unchanged (no drift) and its EnsureIndices sidecar pins NoMovement; a write to
a different, unpinned table (Company) is unaffected (mutations/loads heal
roll-forward and proceed, unlike optimize/repair which refuse on a pending
sidecar). Preserves the original coverage (staged-index stage failure leaves
other tables writable, no drift) in the new architecture.
Refs dev-graph iss-848.
* feat(server): converge deferred indexes promptly after schema apply (iss-848)
Schema apply records @index intent but defers the physical build. On a
long-lived server, spawn a detached best-effort ensure_indices after a
successful apply so the indexes converge promptly instead of waiting for the
operator's next optimize. Fire-and-forget: it never blocks or fails the apply
response, and a failure is logged (the index still converges on the next
optimize). Guarded on result.applied. The CLI is one-shot, so it has no
equivalent; its convergence path is the optimize cadence.
handle.engine is already an Arc, so the spawn takes an owned clone. Convergence
itself is covered by the engine ensure_indices/optimize tests; the existing
empty-graph schema-apply route tests confirm the response is unaffected (the
spawn is a read-only no-op on an empty table).
Refs dev-graph iss-848.
* docs(maintenance): list pending_indexes in optimize per-table stats (consistency)
838 lines
28 KiB
Rust
838 lines
28 KiB
Rust
//! Schema read/apply routes: migrations over HTTP, drift, gating.
|
|
//! Moved verbatim from tests/server.rs in the modularization.
|
|
|
|
use std::fs;
|
|
|
|
use axum::body::Body;
|
|
use axum::http::{Method, Request, StatusCode};
|
|
use lance::index::DatasetIndexExt;
|
|
use omnigraph::db::{Omnigraph, ReadTarget};
|
|
use omnigraph::loader::LoadMode;
|
|
use omnigraph_server::api::{
|
|
ChangeRequest, ErrorOutput, ReadRequest, SchemaApplyRequest, SchemaOutput,
|
|
};
|
|
use omnigraph_server::{AppState, build_app};
|
|
use serde_json::json;
|
|
|
|
|
|
mod support;
|
|
use support::*;
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_updates_graph_for_authorized_admin() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
let schema = additive_schema_with_nickname();
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: schema,
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
let graph = graph_path(temp.path());
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
assert!(
|
|
reopened.catalog().node_types["Person"]
|
|
.properties
|
|
.contains_key("nickname")
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_rejects_stored_query_breakage_before_publish() {
|
|
let (temp, app) = app_with_stored_queries(
|
|
&[("find_person", FIND_PERSON_GQ, true)],
|
|
&[("act-ragnor", "admin-token")],
|
|
STORED_QUERY_SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: renamed_age_schema(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
assert_eq!(status, StatusCode::BAD_REQUEST, "body: {payload}");
|
|
let message = payload["error"].as_str().unwrap_or_default();
|
|
assert!(
|
|
message.contains("find_person") && message.contains("schema check"),
|
|
"registry breakage should name the stored query; body: {payload}"
|
|
);
|
|
|
|
let reopened = Omnigraph::open(graph_path(temp.path()).to_str().unwrap())
|
|
.await
|
|
.unwrap();
|
|
let person = &reopened.catalog().node_types["Person"];
|
|
assert!(person.properties.contains_key("age"));
|
|
assert!(!person.properties.contains_key("years"));
|
|
|
|
let (invoke_status, invoke_body) = json_response(
|
|
&app,
|
|
invoke_request(
|
|
"find_person",
|
|
"admin-token",
|
|
json!({ "params": { "name": "Alice" } }),
|
|
),
|
|
)
|
|
.await;
|
|
assert_eq!(invoke_status, StatusCode::OK, "body: {invoke_body}");
|
|
assert_eq!(invoke_body["row_count"], 1);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_noop_keeps_valid_stored_query_registry() {
|
|
let (_temp, app) = app_with_stored_queries(
|
|
&[("find_person", FIND_PERSON_GQ, true)],
|
|
&[("act-ragnor", "admin-token")],
|
|
STORED_QUERY_SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
assert_eq!(status, StatusCode::OK, "body: {payload}");
|
|
assert_eq!(payload["applied"], false);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_requires_schema_apply_policy_permission() {
|
|
let (_temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: additive_schema_with_nickname(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::FORBIDDEN);
|
|
assert_eq!(
|
|
payload["code"],
|
|
serde_json::to_value(omnigraph_server::api::ErrorCode::Forbidden).unwrap()
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_requires_bearer_token_when_policy_enabled() {
|
|
let (_temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: additive_schema_with_nickname(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::UNAUTHORIZED);
|
|
assert_eq!(
|
|
payload["code"],
|
|
serde_json::to_value(omnigraph_server::api::ErrorCode::Unauthorized).unwrap()
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_can_rename_type() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: renamed_person_schema(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
let graph = graph_path(temp.path());
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
let snapshot = reopened
|
|
.snapshot_of(ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
assert!(snapshot.entry("node:Human").is_some());
|
|
assert!(snapshot.entry("node:Person").is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_can_rename_property() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: renamed_age_schema(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
let graph = graph_path(temp.path());
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
let person = &reopened.catalog().node_types["Person"];
|
|
assert!(person.properties.contains_key("years"));
|
|
assert!(!person.properties.contains_key("age"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_can_add_index() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
let graph = graph_path(temp.path());
|
|
let before_index_count = {
|
|
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
|
let dataset = snapshot.open("node:Person").await.unwrap();
|
|
dataset.load_indices().await.unwrap().len()
|
|
};
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: indexed_name_schema(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
// iss-848: the /schema/apply route accepts the index-add and applies it as a
|
|
// metadata change — it records the `@index` intent in the catalog/IR but does
|
|
// NOT build the physical index inline (the build is deferred to
|
|
// ensure_indices/optimize; on this empty table nothing would build anyway).
|
|
// So the physical index count is unchanged by the apply.
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
let snapshot = reopened
|
|
.snapshot_of(ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
let dataset = snapshot.open("node:Person").await.unwrap();
|
|
let after_index_count = dataset.load_indices().await.unwrap().len();
|
|
assert_eq!(
|
|
after_index_count, before_index_count,
|
|
"schema apply records @index intent but defers the physical build (iss-848)"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_rejects_unsupported_plan() {
|
|
let (_temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: unsupported_schema_change(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::BAD_REQUEST);
|
|
assert_eq!(
|
|
payload["code"],
|
|
serde_json::to_value(omnigraph_server::api::ErrorCode::BadRequest).unwrap()
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_route_rejects_when_non_main_branch_exists() {
|
|
let temp = init_graph_with_schema(&fs::read_to_string(fixture("test.pg")).unwrap()).await;
|
|
let graph = graph_path(temp.path());
|
|
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
drop(db);
|
|
|
|
let policy_path = temp.path().join("policy.yaml");
|
|
fs::write(&policy_path, SCHEMA_APPLY_POLICY_YAML).unwrap();
|
|
let state = AppState::open_with_bearer_tokens_and_policy(
|
|
graph.to_string_lossy().to_string(),
|
|
vec![("act-ragnor".to_string(), "admin-token".to_string())],
|
|
Some(&policy_path),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
let app = build_app(state);
|
|
|
|
let request = Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: additive_schema_with_nickname(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap();
|
|
let (status, payload) = json_response(&app, request).await;
|
|
|
|
assert_eq!(status, StatusCode::CONFLICT);
|
|
assert_eq!(
|
|
payload["code"],
|
|
serde_json::to_value(omnigraph_server::api::ErrorCode::Conflict).unwrap()
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_drift_returns_conflict_for_snapshot_read_and_change() {
|
|
let (temp, app) = app_for_loaded_graph().await;
|
|
let graph = graph_path(temp.path());
|
|
fs::write(graph.join("_schema.pg"), drifted_test_schema()).unwrap();
|
|
|
|
let (snapshot_status, snapshot_body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/snapshot?branch=main")
|
|
.method(Method::GET)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
let snapshot_error: ErrorOutput = serde_json::from_value(snapshot_body).unwrap();
|
|
assert_eq!(snapshot_status, StatusCode::CONFLICT);
|
|
assert_eq!(
|
|
snapshot_error.code,
|
|
Some(omnigraph_server::api::ErrorCode::Conflict)
|
|
);
|
|
assert!(
|
|
snapshot_error
|
|
.error
|
|
.contains("schema evolution is locked down in phase 1")
|
|
);
|
|
|
|
let read = ReadRequest {
|
|
query_source: fs::read_to_string(fixture("test.gq")).unwrap(),
|
|
query_name: Some("get_person".to_string()),
|
|
params: Some(json!({ "name": "Alice" })),
|
|
branch: Some("main".to_string()),
|
|
snapshot: None,
|
|
};
|
|
let (read_status, read_body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/read")
|
|
.method(Method::POST)
|
|
.header("content-type", "application/json")
|
|
.body(Body::from(serde_json::to_vec(&read).unwrap()))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
let read_error: ErrorOutput = serde_json::from_value(read_body).unwrap();
|
|
assert_eq!(read_status, StatusCode::CONFLICT);
|
|
assert_eq!(
|
|
read_error.code,
|
|
Some(omnigraph_server::api::ErrorCode::Conflict)
|
|
);
|
|
assert!(
|
|
read_error
|
|
.error
|
|
.contains("schema evolution is locked down in phase 1")
|
|
);
|
|
|
|
let change = ChangeRequest {
|
|
query: MUTATION_QUERIES.to_string(),
|
|
name: Some("insert_person".to_string()),
|
|
params: Some(json!({ "name": "Mina", "age": 28 })),
|
|
branch: Some("main".to_string()),
|
|
};
|
|
let (change_status, change_body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/change")
|
|
.method(Method::POST)
|
|
.header("content-type", "application/json")
|
|
.body(Body::from(serde_json::to_vec(&change).unwrap()))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
let change_error: ErrorOutput = serde_json::from_value(change_body).unwrap();
|
|
assert_eq!(change_status, StatusCode::CONFLICT);
|
|
assert_eq!(
|
|
change_error.code,
|
|
Some(omnigraph_server::api::ErrorCode::Conflict)
|
|
);
|
|
assert!(
|
|
change_error
|
|
.error
|
|
.contains("schema evolution is locked down in phase 1")
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_route_returns_current_source() {
|
|
let (_temp, app) = app_for_loaded_graph().await;
|
|
let (status, body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/schema")
|
|
.method(Method::GET)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
|
|
assert_eq!(status, StatusCode::OK);
|
|
let output: SchemaOutput = serde_json::from_value(body).unwrap();
|
|
assert!(output.schema_source.contains("node Person"));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_route_requires_bearer_token_when_auth_configured() {
|
|
let (_temp, app) = app_for_loaded_graph_with_auth("demo-token").await;
|
|
|
|
let (missing_status, missing_body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/schema")
|
|
.method(Method::GET)
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
let missing_error: ErrorOutput = serde_json::from_value(missing_body).unwrap();
|
|
assert_eq!(missing_status, StatusCode::UNAUTHORIZED);
|
|
assert_eq!(
|
|
missing_error.code,
|
|
Some(omnigraph_server::api::ErrorCode::Unauthorized)
|
|
);
|
|
|
|
let (ok_status, ok_body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/schema")
|
|
.method(Method::GET)
|
|
.header("authorization", "Bearer demo-token")
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
assert_eq!(ok_status, StatusCode::OK);
|
|
let output: SchemaOutput = serde_json::from_value(ok_body).unwrap();
|
|
assert!(!output.schema_source.is_empty());
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_route_denied_when_actor_lacks_read_permission() {
|
|
let temp = init_loaded_graph().await;
|
|
let graph = graph_path(temp.path());
|
|
let policy_path = temp.path().join("policy.yaml");
|
|
// Policy grants branch_create only — no read action for act-bruno.
|
|
fs::write(&policy_path, INGEST_CREATE_ONLY_POLICY_YAML).unwrap();
|
|
let state = AppState::open_with_bearer_tokens_and_policy(
|
|
graph.to_string_lossy().to_string(),
|
|
vec![("act-bruno".to_string(), "team-token".to_string())],
|
|
Some(&policy_path),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
let app = build_app(state);
|
|
|
|
let (status, body) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.uri("/schema")
|
|
.method(Method::GET)
|
|
.header("authorization", "Bearer team-token")
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
let error: ErrorOutput = serde_json::from_value(body).unwrap();
|
|
assert_eq!(status, StatusCode::FORBIDDEN);
|
|
assert_eq!(
|
|
error.code,
|
|
Some(omnigraph_server::api::ErrorCode::Forbidden)
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_soft_drops_property_via_http() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
// Load a row that has the column we're about to drop.
|
|
let graph = graph_path(temp.path());
|
|
{
|
|
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
db.load(
|
|
"main",
|
|
r#"{"type":"Person","data":{"name":"PreDrop","age":42}}"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
let pre_version = manifest_dataset_version(&graph).await;
|
|
|
|
let (status, payload) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: schema_without_age(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
|
|
// Catalog reflects the drop: `age` is gone from the live schema.
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
assert!(
|
|
!reopened.catalog().node_types["Person"]
|
|
.properties
|
|
.contains_key("age"),
|
|
"catalog should not contain `age` after drop"
|
|
);
|
|
|
|
// Soft drop preserves the prior version — `age` is still readable
|
|
// via time travel to the pre-drop manifest version. Mirrors the
|
|
// SDK-side assertion in `apply_schema_drops_a_nullable_property_softly_preserves_prior_version`.
|
|
let pre_drop_snapshot = reopened.snapshot_at_version(pre_version).await.unwrap();
|
|
let pre_drop_ds = pre_drop_snapshot.open("node:Person").await.unwrap();
|
|
let pre_drop_fields = pre_drop_ds
|
|
.schema()
|
|
.fields
|
|
.iter()
|
|
.map(|f| f.name.clone())
|
|
.collect::<Vec<_>>();
|
|
assert!(
|
|
pre_drop_fields.iter().any(|f| f == "age"),
|
|
"soft drop should leave the pre-drop dataset's `age` column \
|
|
time-travel-reachable; got fields {pre_drop_fields:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_soft_drops_node_type_via_http() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
let graph = graph_path(temp.path());
|
|
|
|
let (status, payload) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: schema_without_company(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
assert!(
|
|
!reopened.catalog().node_types.contains_key("Company"),
|
|
"catalog should not contain `Company` after drop"
|
|
);
|
|
assert!(
|
|
!reopened.catalog().edge_types.contains_key("WorksAt"),
|
|
"catalog should not contain `WorksAt` after cascade"
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_hard_drops_property_with_allow_data_loss() {
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
let graph = graph_path(temp.path());
|
|
{
|
|
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
db.load(
|
|
"main",
|
|
r#"{"type":"Person","data":{"name":"PreDropHard","age":50}}"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Apply with allow_data_loss=true → Hard mode promotion.
|
|
let (status, payload) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: schema_without_age(),
|
|
allow_data_loss: true,
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
|
|
// Catalog reflects the drop.
|
|
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
assert!(
|
|
!reopened.catalog().node_types["Person"]
|
|
.properties
|
|
.contains_key("age"),
|
|
"catalog should not contain `age` after Hard drop"
|
|
);
|
|
// Plan steps should show DropMode::Hard for property drops.
|
|
let steps = payload["steps"].as_array().expect("steps array");
|
|
let drop_step = steps
|
|
.iter()
|
|
.find(|s| s["kind"] == "drop_property")
|
|
.expect("plan should include drop_property step");
|
|
let mode = &drop_step["mode"];
|
|
assert_eq!(
|
|
mode, "hard",
|
|
"expected hard mode under allow_data_loss=true"
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_keeps_drops_soft_without_flag() {
|
|
// Symmetric to the Hard test: same schema change, but no
|
|
// allow_data_loss flag → drops stay Soft (prior column data
|
|
// remains time-travel-reachable). Pins the default semantics
|
|
// against accidental Hard promotion.
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
let graph = graph_path(temp.path());
|
|
|
|
let (status, payload) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: schema_without_age(),
|
|
allow_data_loss: false,
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
|
|
let steps = payload["steps"].as_array().expect("steps array");
|
|
let drop_step = steps
|
|
.iter()
|
|
.find(|s| s["kind"] == "drop_property")
|
|
.expect("plan should include drop_property step");
|
|
let mode = &drop_step["mode"];
|
|
assert_eq!(mode, "soft", "expected soft mode without allow_data_loss");
|
|
let _ = graph;
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn schema_apply_route_additive_property_preserves_existing_rows() {
|
|
// SDK suite covers rename and drop data preservation. Additive
|
|
// AddProperty wasn't pinned with a row-count check anywhere.
|
|
// Load N rows, apply schema adding nullable property, verify
|
|
// every row is still readable and the new column is null.
|
|
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
|
|
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
|
&[("act-ragnor", "admin-token")],
|
|
SCHEMA_APPLY_POLICY_YAML,
|
|
)
|
|
.await;
|
|
let graph = graph_path(temp.path());
|
|
|
|
// Standard fixture data: 4 Persons + 1 Company. Load it.
|
|
let pre_count = {
|
|
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
db.load(
|
|
"main",
|
|
&fs::read_to_string(fixture("test.jsonl")).unwrap(),
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
let snap = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
snap.entry("node:Person").expect("Person").row_count
|
|
};
|
|
assert!(pre_count > 0, "fixture should have loaded Person rows");
|
|
|
|
let (status, payload) = json_response(
|
|
&app,
|
|
Request::builder()
|
|
.method(Method::POST)
|
|
.uri("/schema/apply")
|
|
.header("content-type", "application/json")
|
|
.header("authorization", "Bearer admin-token")
|
|
.body(Body::from(
|
|
serde_json::to_vec(&SchemaApplyRequest {
|
|
schema_source: additive_schema_with_nickname(),
|
|
..Default::default()
|
|
})
|
|
.unwrap(),
|
|
))
|
|
.unwrap(),
|
|
)
|
|
.await;
|
|
assert_eq!(status, StatusCode::OK);
|
|
assert_eq!(payload["applied"], true);
|
|
|
|
// Row count preserved.
|
|
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
|
|
let snap = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
let post_count = snap.entry("node:Person").expect("Person").row_count;
|
|
assert_eq!(
|
|
post_count, pre_count,
|
|
"AddProperty should preserve row count",
|
|
);
|
|
}
|