mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-27 02:39:38 +02:00
mr-668: composite e2e tests, race fix, v0.7.0 release (PR 9/10)
PR 9 — the final integration PR for MR-668 multi-graph server work.
Closes the v0.7.0 release.
Composite lifecycle tests (closes gaps flagged in PR 7's coverage
review):
- `multi_graph_lifecycle_post_query_restart_persistence` — POST a
graph, query it via cluster route, reload the config from disk
and confirm `load_server_settings` sees the rewritten YAML.
Validates the "restart resolves orphans" failure-mode story.
- `per_graph_policy_enforced_on_post_created_graph` — POST a graph
with a per-graph policy attached, then send authenticated read
and change requests. Per-graph Cedar enforcement fires correctly
on a POST-created graph (engine-layer policy reinstalled via
`Omnigraph::with_policy` inside the create flow).
- `concurrent_post_graphs_distinct_ids_all_succeed` — 4 concurrent
POSTs with distinct graph_ids all return 201. Caught a real
race in `rewrite_atomic` (see below).
Race fix — `rewrite_atomic_with_modify`:
The first composite test surfaced a real bug. The old
`rewrite_atomic(path, new_config, expected_hash)` captured the
baseline hash OUTSIDE the flock, then called rewrite_atomic which
re-acquired it inside. Under concurrent writers:
- POST A: captures baseline H0, calls rewrite_atomic.
- POST B: captures baseline H0 too (before A's update lands).
- A: acquires flock, on-disk == H0, writes H1, releases.
- A: updates baseline H0 → H1.
- B: tries to acquire flock — waits.
- B: acquires flock. On-disk is now H1. Expected (captured
before A finished) is H0. MISMATCH → spurious Drift error.
Worse: even if the timing happens to align, B's `updated` config
was constructed from BYTES read before the flock. B writes a config
that doesn't include A's new graph — silent data loss.
The fix: new `config::rewrite_atomic_with_modify(path, baseline,
modify)` takes a closure. Inside the flock + baseline mutex:
1. Read on-disk bytes, hash, compare to baseline.
2. Parse on-disk YAML.
3. Call `modify(parsed)` to produce the new config — receives
fresh on-disk state, returns the modification.
4. Serialize + write + fsync + rename + update baseline.
Everything is read-modify-write under the same critical section.
Concurrent writers serialize cleanly. Test confirmed this is no
longer a race.
The old `rewrite_atomic(path, new_config, expected_hash)` API stays
for tests that don't need the read-modify-write shape; the POST
handler switches to the new shape.
Version bump v0.6.0 → v0.7.0:
- All 5 `crates/*/Cargo.toml` (compiler, engine, policy, cli, server)
plus their inter-crate `path` dep version constraints.
- `Cargo.lock` regenerated by `cargo build --workspace`.
- `AGENTS.md` "Version surveyed" line, capability matrix HTTP-server
row updated to mention multi-graph + cluster routes + atomic YAML
rewrite.
- `openapi.json` regenerated.
Docs:
- `docs/releases/v0.7.0.md` (new) — release notes with breaking
changes, new features, deferred items (DELETE, `delete_prefix`,
actor forwarding), and the single→multi migration recipe.
- `docs/user/server.md` — substantial section additions for the
two modes, mode inference, cluster endpoint table, management
endpoints, `omnigraph.yaml` ownership contract, `POST /graphs`
body shape + status codes.
- `docs/user/cli.md` — `omnigraph graphs list/create` section,
deferred-DELETE note.
- `docs/user/policy.md` — server-scoped Cedar actions
(`graph_create`, `graph_list`), per-graph vs server-level policy
composition, example server-level policy.
Workspace test pass: 573 tests green across all crates. Zero
failures. MR-731 spoof regression still pinned and passing across
the entire 10-PR series.
This commit closes MR-668. v0.7.0 is ready for tagging.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
75514b6cfd
commit
d11c18fb27
15 changed files with 632 additions and 77 deletions
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-cli"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
description = "CLI for the Omnigraph graph database."
|
||||
license = "MIT"
|
||||
|
|
@ -13,10 +13,10 @@ name = "omnigraph"
|
|||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.0" }
|
||||
omnigraph-server = { path = "../omnigraph-server", version = "0.6.0" }
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.0" }
|
||||
omnigraph-server = { path = "../omnigraph-server", version = "0.7.0" }
|
||||
clap = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-compiler"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
description = "Schema/query compiler for Omnigraph. Zero Lance dependency."
|
||||
license = "MIT"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-policy"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
description = "Policy / authorization layer for Omnigraph — Cedar-backed PolicyEngine, PolicyChecker trait, ResourceScope enum."
|
||||
license = "MIT"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-server"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
description = "HTTP server for the Omnigraph graph database."
|
||||
license = "MIT"
|
||||
|
|
@ -19,9 +19,9 @@ default = []
|
|||
aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"]
|
||||
|
||||
[dependencies]
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.0" }
|
||||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.0" }
|
||||
axum = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -493,6 +493,77 @@ fn staging_path(path: &Path) -> PathBuf {
|
|||
PathBuf::from(s)
|
||||
}
|
||||
|
||||
/// Atomic read-modify-write of `omnigraph.yaml` (MR-668 PR 7 — race-fix
|
||||
/// from PR 9). Everything happens **inside** the `fcntl::flock` and the
|
||||
/// in-memory baseline mutex:
|
||||
/// 1. Acquire `LOCK_EX`.
|
||||
/// 2. Lock the in-memory baseline mutex.
|
||||
/// 3. Read the on-disk file, hash it.
|
||||
/// 4. Compare to the in-memory baseline; if mismatch → `Drift`.
|
||||
/// 5. Parse the on-disk YAML, hand the parsed config to `modify`.
|
||||
/// 6. Serialize the returned config, write `.tmp`, fsync, rename.
|
||||
/// 7. Update the in-memory baseline to the new file's hash.
|
||||
/// 8. Release flock + mutex.
|
||||
///
|
||||
/// The earlier `rewrite_atomic` captured the baseline OUTSIDE the
|
||||
/// flock, which created a race under concurrent writers: a second
|
||||
/// writer would see a stale baseline + the first writer's new on-disk
|
||||
/// hash, yielding a spurious `Drift` error. The `_with_modify` shape
|
||||
/// keeps the entire critical section atomic.
|
||||
///
|
||||
/// `modify` is a `FnOnce` so the caller can read mutable state into it
|
||||
/// (e.g. a `GraphCreateRequest`) without `Sync` requirements.
|
||||
pub fn rewrite_atomic_with_modify<F>(
|
||||
path: &Path,
|
||||
baseline: &std::sync::Mutex<[u8; 32]>,
|
||||
modify: F,
|
||||
) -> std::result::Result<(), RewriteAtomicError>
|
||||
where
|
||||
F: FnOnce(OmnigraphConfig) -> std::result::Result<OmnigraphConfig, RewriteAtomicError>,
|
||||
{
|
||||
let lock_file = fs::OpenOptions::new().read(true).write(true).open(path)?;
|
||||
lock_file.lock_exclusive()?;
|
||||
let _lock_guard = lock_file;
|
||||
|
||||
// Lock the in-memory baseline INSIDE the flock so concurrent writers
|
||||
// serialize on both: flock for cross-process safety, mutex for
|
||||
// in-process baseline updates. The mutex guard outlives the modify
|
||||
// step so the baseline can't move under our feet.
|
||||
let mut baseline_guard = baseline
|
||||
.lock()
|
||||
.expect("baseline mutex must not be poisoned");
|
||||
|
||||
let current_bytes = fs::read(path)?;
|
||||
let mut current_hash = [0u8; 32];
|
||||
current_hash.copy_from_slice(&Sha256::digest(¤t_bytes));
|
||||
if current_hash != *baseline_guard {
|
||||
return Err(RewriteAtomicError::Drift);
|
||||
}
|
||||
|
||||
// Parse the on-disk config (NOT a stale cached version) and hand
|
||||
// to `modify`. The closure can mutate freely; the result is what
|
||||
// we serialize and write.
|
||||
let current_config: OmnigraphConfig = serde_yaml::from_slice(¤t_bytes)?;
|
||||
let new_config = modify(current_config)?;
|
||||
let serialized = serde_yaml::to_string(&new_config)?;
|
||||
|
||||
let tmp_path = staging_path(path);
|
||||
fs::write(&tmp_path, &serialized)?;
|
||||
let tmp_file = fs::File::open(&tmp_path)?;
|
||||
tmp_file.sync_all()?;
|
||||
drop(tmp_file);
|
||||
fs::rename(&tmp_path, path)?;
|
||||
if let Some(parent) = path.parent() {
|
||||
let dir = fs::File::open(parent)?;
|
||||
dir.sync_all()?;
|
||||
}
|
||||
|
||||
let mut new_hash = [0u8; 32];
|
||||
new_hash.copy_from_slice(&Sha256::digest(serialized.as_bytes()));
|
||||
*baseline_guard = new_hash;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fs;
|
||||
|
|
|
|||
|
|
@ -1428,49 +1428,33 @@ async fn server_graphs_create(
|
|||
))
|
||||
}
|
||||
|
||||
/// Load `omnigraph.yaml` from disk, add the new graph entry, write it
|
||||
/// back via `config::rewrite_atomic`, and update the in-memory baseline
|
||||
/// hash. Returns an `ApiError` mapped to the appropriate HTTP status
|
||||
/// (503 for drift, 500 for IO/serialize failures).
|
||||
/// Atomically rewrite `omnigraph.yaml` to add a new graph entry.
|
||||
/// Runs inside `tokio::task::spawn_blocking` (the flock is sync).
|
||||
///
|
||||
/// Runs inside `tokio::task::spawn_blocking` — `fs2::flock` is sync.
|
||||
/// Read-modify-write happens entirely under the flock + baseline
|
||||
/// mutex via `config::rewrite_atomic_with_modify` — concurrent
|
||||
/// writers serialize without spurious drift errors.
|
||||
fn rewrite_yaml_with_new_graph(
|
||||
config_path: &std::path::Path,
|
||||
config_hash: &Arc<std::sync::Mutex<[u8; 32]>>,
|
||||
graph_id: &str,
|
||||
new_target: config::TargetConfig,
|
||||
) -> std::result::Result<(), ApiError> {
|
||||
// Re-read the config file to construct the next state.
|
||||
let bytes = std::fs::read(config_path)
|
||||
.map_err(|err| ApiError::internal(format!("read omnigraph.yaml: {err}")))?;
|
||||
let mut updated: config::OmnigraphConfig = serde_yaml::from_slice(&bytes)
|
||||
.map_err(|err| ApiError::internal(format!("parse omnigraph.yaml: {err}")))?;
|
||||
updated.graphs.insert(graph_id.to_string(), new_target);
|
||||
|
||||
// Grab the current baseline hash for the drift check.
|
||||
let expected = *config_hash
|
||||
.lock()
|
||||
.expect("config_hash mutex must not be poisoned");
|
||||
let new_hash = config::rewrite_atomic(config_path, &updated, &expected).map_err(|err| {
|
||||
match err {
|
||||
config::RewriteAtomicError::Drift => ApiError {
|
||||
status: StatusCode::SERVICE_UNAVAILABLE,
|
||||
code: ErrorCode::Conflict,
|
||||
message: err.to_string(),
|
||||
merge_conflicts: Vec::new(),
|
||||
manifest_conflict: None,
|
||||
},
|
||||
other => ApiError::internal(other.to_string()),
|
||||
}
|
||||
})?;
|
||||
|
||||
// Update the baseline so the next POST sees this as the new "no
|
||||
// drift" reference. If we forgot this, every POST after the first
|
||||
// would 503.
|
||||
*config_hash
|
||||
.lock()
|
||||
.expect("config_hash mutex must not be poisoned") = new_hash;
|
||||
Ok(())
|
||||
let graph_id = graph_id.to_string();
|
||||
config::rewrite_atomic_with_modify(config_path, config_hash, move |mut config| {
|
||||
config.graphs.insert(graph_id, new_target);
|
||||
Ok(config)
|
||||
})
|
||||
.map_err(|err| match err {
|
||||
config::RewriteAtomicError::Drift => ApiError {
|
||||
status: StatusCode::SERVICE_UNAVAILABLE,
|
||||
code: ErrorCode::Conflict,
|
||||
message: err.to_string(),
|
||||
merge_conflicts: Vec::new(),
|
||||
manifest_conflict: None,
|
||||
},
|
||||
other => ApiError::internal(other.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
|
||||
|
|
|
|||
|
|
@ -5244,4 +5244,271 @@ graphs:
|
|||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
// ─── PR 9: composite lifecycle tests ─────────────────────────────────
|
||||
//
|
||||
// These tests exercise PRs 1–8 in combination. Each test composes
|
||||
// multiple primitives (POST a graph, query it, restart, enforce
|
||||
// per-graph policy) into a single scenario. They're the closure
|
||||
// tests for the gaps I flagged in PR 7's coverage assessment —
|
||||
// not redundant with the per-PR tests because they catch
|
||||
// integration regressions that individual unit tests miss.
|
||||
|
||||
/// Post a graph, query it via cluster route, then re-load the
|
||||
/// config from disk and confirm `load_server_settings` sees the
|
||||
/// rewritten YAML (i.e. the server's `POST /graphs` actually
|
||||
/// persists). Validates that on restart, the new graph would be
|
||||
/// opened automatically by `serve()`'s multi-mode startup.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn multi_graph_lifecycle_post_query_restart_persistence() {
|
||||
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
|
||||
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
|
||||
|
||||
// 1. POST a new graph `beta`.
|
||||
let beta_uri = cfg_dir.path().join("beta.omni");
|
||||
let req = GraphCreateRequest {
|
||||
graph_id: "beta".to_string(),
|
||||
uri: beta_uri.to_string_lossy().to_string(),
|
||||
schema: GraphSchemaSpec {
|
||||
source: schema.clone(),
|
||||
},
|
||||
policy: None,
|
||||
};
|
||||
let (status, _) = post_graph(&app, &req, None).await;
|
||||
assert_eq!(status, StatusCode::CREATED);
|
||||
|
||||
// 2. Query the new graph via its cluster route.
|
||||
let snap = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri("/graphs/beta/snapshot?branch=main")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(snap.status(), StatusCode::OK);
|
||||
|
||||
// 3. "Restart": reload the config and confirm the rewritten
|
||||
// YAML carries the new graph through `load_server_settings`.
|
||||
// A real restart calls `open_multi_graph_state` next; we
|
||||
// stop short of opening Lance again (the per-PR tests
|
||||
// already cover that path) but assert the inferred
|
||||
// `ServerConfigMode::Multi` lists both graphs.
|
||||
let config_path = cfg_dir.path().join("omnigraph.yaml");
|
||||
let settings: ServerConfig =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => {
|
||||
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
|
||||
assert_eq!(
|
||||
ids,
|
||||
vec!["alpha", "beta"],
|
||||
"rewritten YAML must include both graphs in BTreeMap order"
|
||||
);
|
||||
}
|
||||
_ => panic!("expected Multi mode after restart"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-graph Cedar policy is enforced for a graph created via POST.
|
||||
/// Closes the gap from PR 7's test coverage — the policy was loaded
|
||||
/// but never exercised end-to-end. This test sends an authenticated
|
||||
/// `change` request against a POST-created graph whose per-graph
|
||||
/// policy denies `change` for that actor.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn per_graph_policy_enforced_on_post_created_graph() {
|
||||
let (cfg_dir, _initial_app) = multi_mode_app_with_real_config(&[]).await;
|
||||
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
|
||||
let config_path = cfg_dir.path().join("omnigraph.yaml");
|
||||
let config_hash = omnigraph_server::config::hash_config_file(&config_path).unwrap();
|
||||
// Server-level policy: act-andrew can create graphs. Required
|
||||
// because requires_bearer_auth fires under MR-723 default-deny
|
||||
// once we configure tokens, and `GraphCreate != Read` would
|
||||
// otherwise 403 without a server policy.
|
||||
let server_policy_path = cfg_dir.path().join("server-policy.yaml");
|
||||
fs::write(
|
||||
&server_policy_path,
|
||||
r#"
|
||||
version: 1
|
||||
groups:
|
||||
admins: [act-andrew]
|
||||
rules:
|
||||
- id: admins-create
|
||||
allow:
|
||||
actors: { group: admins }
|
||||
actions: [graph_create, graph_list]
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let server_policy = omnigraph_policy::PolicyEngine::load(&server_policy_path, "server")
|
||||
.unwrap();
|
||||
let workload = omnigraph_server::workload::WorkloadController::from_env();
|
||||
let state = AppState::new_multi(
|
||||
vec![],
|
||||
vec![
|
||||
("act-andrew".to_string(), "andrew-token".to_string()),
|
||||
("act-bruno".to_string(), "bruno-token".to_string()),
|
||||
],
|
||||
Some(server_policy),
|
||||
workload,
|
||||
Some(config_path.clone()),
|
||||
Some(config_hash),
|
||||
)
|
||||
.expect("empty multi-mode registry must be constructible");
|
||||
let app = build_app(state);
|
||||
|
||||
// Per-graph policy file: only `act-andrew` may `change`.
|
||||
let beta_policy_path = cfg_dir.path().join("beta-policy.yaml");
|
||||
fs::write(
|
||||
&beta_policy_path,
|
||||
r#"
|
||||
version: 1
|
||||
groups:
|
||||
writers: [act-andrew]
|
||||
readers: [act-bruno]
|
||||
protected_branches: []
|
||||
rules:
|
||||
- id: writers-change
|
||||
allow:
|
||||
actors: { group: writers }
|
||||
actions: [read, change]
|
||||
branch_scope: any
|
||||
- id: readers-read
|
||||
allow:
|
||||
actors: { group: readers }
|
||||
actions: [read]
|
||||
branch_scope: any
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// POST `beta` with the per-graph policy attached.
|
||||
let beta_uri = cfg_dir.path().join("beta.omni");
|
||||
let req = GraphCreateRequest {
|
||||
graph_id: "beta".to_string(),
|
||||
uri: beta_uri.to_string_lossy().to_string(),
|
||||
schema: GraphSchemaSpec { source: schema },
|
||||
policy: Some(omnigraph_server::api::GraphPolicySpec {
|
||||
file: Some(beta_policy_path.to_string_lossy().to_string()),
|
||||
}),
|
||||
};
|
||||
let (status, body) = post_graph(&app, &req, Some("andrew-token")).await;
|
||||
assert_eq!(
|
||||
status,
|
||||
StatusCode::CREATED,
|
||||
"POST /graphs failed: {body}"
|
||||
);
|
||||
|
||||
// Authenticated `read` from a reader: 200.
|
||||
let read_resp = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri("/graphs/beta/snapshot?branch=main")
|
||||
.header("authorization", "Bearer bruno-token")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
read_resp.status(),
|
||||
StatusCode::OK,
|
||||
"act-bruno must be allowed read on beta"
|
||||
);
|
||||
|
||||
// Authenticated `change` from the reader (act-bruno) must 403:
|
||||
// beta-policy allows readers only `read`, not `change`.
|
||||
let change_body = serde_json::json!({
|
||||
"query_source": "query foo() { insert Person { name: \"X\" } }",
|
||||
"query_name": "foo",
|
||||
"branch": "main"
|
||||
});
|
||||
let change_resp = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/graphs/beta/change")
|
||||
.header("authorization", "Bearer bruno-token")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(serde_json::to_vec(&change_body).unwrap()))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
change_resp.status(),
|
||||
StatusCode::FORBIDDEN,
|
||||
"per-graph Cedar policy must deny `change` for act-bruno on beta"
|
||||
);
|
||||
}
|
||||
|
||||
/// Concurrent POST /graphs for DISTINCT graph_ids all succeed.
|
||||
/// The flock + drift detection serializes the YAML rewrite, but
|
||||
/// all writes are valid and the final YAML lists every graph.
|
||||
/// (Same-graph_id concurrency is already covered by the
|
||||
/// `concurrent_insert_same_key_exactly_one_succeeds` registry
|
||||
/// test plus the YAML drift-detection behavior.)
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn concurrent_post_graphs_distinct_ids_all_succeed() {
|
||||
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
|
||||
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
|
||||
const N: usize = 4;
|
||||
|
||||
let app = Arc::new(app);
|
||||
let barrier = Arc::new(tokio::sync::Barrier::new(N));
|
||||
let mut tasks = Vec::with_capacity(N);
|
||||
for i in 0..N {
|
||||
let app = Arc::clone(&app);
|
||||
let barrier = Arc::clone(&barrier);
|
||||
let dir = cfg_dir.path().to_path_buf();
|
||||
let schema = schema.clone();
|
||||
tasks.push(tokio::spawn(async move {
|
||||
barrier.wait().await;
|
||||
let id = format!("graph-{i}");
|
||||
let uri = dir.join(format!("{id}.omni"));
|
||||
let req = GraphCreateRequest {
|
||||
graph_id: id.clone(),
|
||||
uri: uri.to_string_lossy().to_string(),
|
||||
schema: GraphSchemaSpec { source: schema },
|
||||
policy: None,
|
||||
};
|
||||
let (status, _) = post_graph(&app, &req, None).await;
|
||||
(id, status)
|
||||
}));
|
||||
}
|
||||
|
||||
let mut succeeded = Vec::new();
|
||||
for t in tasks {
|
||||
let (id, status) = t.await.unwrap();
|
||||
assert_eq!(
|
||||
status,
|
||||
StatusCode::CREATED,
|
||||
"POST {id} must succeed under concurrent distinct-id POSTs"
|
||||
);
|
||||
succeeded.push(id);
|
||||
}
|
||||
|
||||
// Final registry has 1 (alpha) + N (graph-0..N-1) = N+1 graphs.
|
||||
let resp = (*app)
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri("/graphs")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
|
||||
let payload: Value = serde_json::from_slice(&body).unwrap();
|
||||
let graph_count = payload["graphs"].as_array().unwrap().len();
|
||||
assert_eq!(graph_count, N + 1);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "omnigraph-engine"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
edition = "2024"
|
||||
description = "Runtime engine for the Omnigraph graph database."
|
||||
license = "MIT"
|
||||
|
|
@ -16,8 +16,8 @@ default = []
|
|||
failpoints = ["dep:fail", "fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.0" }
|
||||
lance = { workspace = true }
|
||||
lance-datafusion = { workspace = true }
|
||||
datafusion = { workspace = true }
|
||||
|
|
@ -51,7 +51,7 @@ chrono = { workspace = true }
|
|||
arc-swap = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.0" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
|
||||
tokio = { workspace = true }
|
||||
lance-namespace-impls = { workspace = true }
|
||||
serial_test = "3"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue