Merge branch 'main' into ragnorc/omnigraph-mcp-crate

Bring the MCP feature branch up to date with main (14 commits). One
conflict — compiler/parser.rs: main's `NanoError` → `CompilerError` rename
vs this branch's `@mcp` / per-param `@description` parser additions; resolved
by keeping the new parsing under the renamed error type. The CLI `queries list`
change (#280, surfacing `@description`/`@instruction`) auto-merged with this
branch's `mcp_expose`/`tool_name` columns.
This commit is contained in:
Ragnor Comerford 2026-06-19 21:59:14 +02:00
commit fbf455a250
No known key found for this signature in database
110 changed files with 6396 additions and 2511 deletions

View file

@ -2,9 +2,9 @@ pub mod api;
mod handlers;
mod mcp;
mod settings;
pub use settings::{load_server_settings, classify_server_runtime_state, ServerRuntimeState};
use settings::*;
use handlers::*;
use settings::*;
pub use settings::{ServerRuntimeState, classify_server_runtime_state, load_server_settings};
pub mod auth;
pub mod graph_id;
pub mod identity;
@ -30,10 +30,10 @@ use api::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest,
SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
schema_apply_output, snapshot_payload,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, InvokeStoredQueryResponse,
QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput,
SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output,
snapshot_payload,
};
pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
use axum::body::{Body, Bytes};
@ -167,6 +167,10 @@ pub struct ServerConfig {
/// who set up auth and forgot the policy file would otherwise ship
/// the illusion of protection.
pub allow_unauthenticated: bool,
/// Operator opt-in for fail-fast cluster boot. By default, graph-local
/// startup failures quarantine that graph and healthy graphs still serve.
/// When true, any quarantined or failed graph aborts startup.
pub require_all_graphs: bool,
}
/// What `load_server_settings` produces. RFC-011 cluster-only: the
@ -316,7 +320,14 @@ impl AppState {
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy = policy_engine.map(Arc::new);
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
Self::build_single_mode(
uri,
db,
bearer_tokens,
per_graph_policy,
Arc::new(workload),
None,
)
}
/// Like `new_single`, but attaches a pre-validated stored-query
@ -433,13 +444,8 @@ impl AppState {
bearer_tokens: Vec<(String, String)>,
policy_file: Option<&PathBuf>,
) -> Result<Self> {
Self::open_single_with_queries(
uri,
bearer_tokens,
policy_file,
QueryRegistry::default(),
)
.await
Self::open_single_with_queries(uri, bearer_tokens, policy_file, QueryRegistry::default())
.await
}
/// Single-mode boot with a stored-query registry: open the engine,
@ -522,8 +528,7 @@ impl AppState {
// reserved id `default` — both the registry key and the URL
// segment (`/graphs/default/...`).
let uri = normalize_root_uri(&uri).unwrap_or(uri);
let graph_id =
GraphId::try_from("default").expect("'default' is a valid GraphId");
let graph_id = GraphId::try_from("default").expect("'default' is a valid GraphId");
let key = GraphKey::cluster(graph_id);
let handle = Arc::new(GraphHandle {
key,
@ -950,15 +955,21 @@ pub fn build_app(state: AppState) -> Router {
// flagged and their responses include RFC 9745 Deprecation +
// RFC 8288 Link headers. Suppress the call-site warning for the
// route registration itself.
.route("/read", post({
#[allow(deprecated)]
server_read
}))
.route(
"/read",
post({
#[allow(deprecated)]
server_read
}),
)
.route("/query", post(server_query))
.route("/change", post({
#[allow(deprecated)]
server_change
}))
.route(
"/change",
post({
#[allow(deprecated)]
server_change
}),
)
.route("/mutate", post(server_mutate))
.route("/queries", get(server_list_queries))
.route("/queries/{name}", post(server_invoke_query))
@ -1080,7 +1091,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
config = %config_path.display(),
"serving omnigraph"
);
open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await?
open_multi_graph_state(
graphs,
tokens,
server_policy.as_ref(),
config_path,
config.require_all_graphs,
)
.await?
}
};
@ -1109,9 +1127,9 @@ fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngi
}
/// Parallel open of every graph in the startup config, with bounded
/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
/// instances close cleanly via Arc drop).
/// concurrency (`buffer_unordered(4)`). Graph-specific open failures
/// quarantine that graph; startup succeeds as long as at least one graph
/// opens.
///
/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
/// trades startup latency for a small amount of concurrent S3 / Lance
@ -1121,8 +1139,9 @@ pub async fn open_multi_graph_state(
tokens: Vec<(String, String)>,
server_policy_source: Option<&PolicySource>,
config_path: PathBuf,
require_all_graphs: bool,
) -> Result<AppState> {
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
if graphs.is_empty() {
bail!("multi-graph mode requires at least one graph in the `graphs:` map");
@ -1134,21 +1153,48 @@ pub async fn open_multi_graph_state(
// `Omnigraph::Server::"root"` entity at evaluation time.
let server_policy = match server_policy_source {
Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
Some(PolicySource::Inline(source)) => {
Some(PolicyEngine::load_server_from_source(source)?)
}
Some(PolicySource::Inline(source)) => Some(PolicyEngine::load_server_from_source(source)?),
None => None,
};
// `try_collect` propagates the first error eagerly, dropping every
// in-flight open. `buffer_unordered + collect::<Vec<_>>` would drain
// the stream before checking errors — incorrect for the docstring's
// "fail-fast" claim and wasteful on S3-backed graphs.
let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
.map(|cfg| async move { open_single_graph(cfg).await })
let configured_graphs = graphs.len();
let results = futures::stream::iter(graphs.into_iter())
.map(|cfg| async move {
let graph_id = cfg.graph_id.clone();
open_single_graph(cfg).await.map_err(|err| (graph_id, err))
})
.buffer_unordered(4)
.try_collect()
.await?;
.collect::<Vec<_>>()
.await;
let mut handles = Vec::new();
let mut failed = 0usize;
for result in results {
match result {
Ok(handle) => handles.push(handle),
Err((graph_id, err)) => {
failed += 1;
warn!(
graph_id = %graph_id,
error = %err,
"graph quarantined during startup"
);
}
}
}
if require_all_graphs && failed > 0 {
bail!(
"strict multi-graph startup requires every graph to open ({} configured, {} failed)",
configured_graphs,
failed
);
}
if handles.is_empty() {
bail!(
"no healthy graphs opened from multi-graph startup config ({} configured, {} failed)",
configured_graphs,
failed
);
}
let workload = workload::WorkloadController::from_env();
let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))

View file

@ -22,6 +22,11 @@ struct Cli {
/// Equivalent to setting `OMNIGRAPH_UNAUTHENTICATED=1`.
#[arg(long)]
unauthenticated: bool,
/// Fail startup if any applied graph is quarantined or fails to open.
/// By default, graph-local failures are logged and healthy graphs still
/// serve. Equivalent to setting `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`.
#[arg(long)]
require_all_graphs: bool,
}
#[tokio::main]
@ -30,7 +35,12 @@ async fn main() -> Result<()> {
init_tracing();
let cli = Cli::parse();
let settings: ServerConfig =
load_server_settings(cli.cluster.as_ref(), cli.bind, cli.unauthenticated).await?;
let settings: ServerConfig = load_server_settings(
cli.cluster.as_ref(),
cli.bind,
cli.unauthenticated,
cli.require_all_graphs,
)
.await?;
serve(settings).await
}

View file

@ -12,6 +12,7 @@ pub(crate) async fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
cli_require_all_graphs: bool,
) -> Result<ServerConfig> {
// `--cluster` accepts either a config directory (the ledger location is
// resolved through cluster.yaml's `storage:` key) or a storage-root URI
@ -28,11 +29,45 @@ pub(crate) async fn load_cluster_settings(
.map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
.map(|diagnostic| {
format!(
"[{}] {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
)
})
.collect::<Vec<_>>()
.join("\n ");
eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display())
eyre!(
"the cluster at '{}' is not ready to serve:\n {details}",
cluster_dir.display()
)
})?;
for diagnostic in &snapshot.diagnostics {
warn!(
code = %diagnostic.code,
path = %diagnostic.path,
message = %diagnostic.message,
"cluster startup diagnostic"
);
}
let env_require_all_graphs = env_flag("OMNIGRAPH_REQUIRE_ALL_GRAPHS");
let require_all_graphs = cli_require_all_graphs || env_require_all_graphs;
if require_all_graphs && !snapshot.diagnostics.is_empty() {
let details = snapshot
.diagnostics
.iter()
.map(|diagnostic| {
format!(
"[{}] {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
)
})
.collect::<Vec<_>>()
.join("\n ");
bail!(
"strict cluster boot requires every applied graph to be ready; startup diagnostics:\n {details}"
);
}
// Bindings -> Cedar slots. The serving pipeline loads one bundle per
// graph plus one server-level bundle; stacked bundles per scope are a
@ -69,6 +104,7 @@ pub(crate) async fn load_cluster_settings(
}
let mut graphs = Vec::new();
let mut skipped_graphs = Vec::new();
for graph in &snapshot.graphs {
let specs: Vec<queries::RegistrySpec> = snapshot
.queries
@ -82,40 +118,75 @@ pub(crate) async fn load_cluster_settings(
// spec carries only identity + source.
})
.collect();
let registry = QueryRegistry::from_specs(specs).map_err(|errors| {
let details = errors
.iter()
.map(|error| error.to_string())
.collect::<Vec<_>>()
.join("\n ");
eyre!(
"stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart"
)
})?;
let registry = match QueryRegistry::from_specs(specs) {
Ok(registry) => registry,
Err(errors) => {
let details = errors
.iter()
.map(|error| error.to_string())
.collect::<Vec<_>>()
.join("\n ");
warn!(
graph_id = %graph.graph_id,
errors = %details,
"graph quarantined because stored queries failed to parse"
);
skipped_graphs.push(format!(
"{}: stored queries failed to parse: {details}",
graph.graph_id
));
continue;
}
};
let embedding = match graph
.embedding
.as_ref()
.map(|profile| {
profile.resolve().map_err(|err| {
eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
})
})
.transpose()
{
Ok(embedding) => embedding,
Err(err) => {
warn!(
graph_id = %graph.graph_id,
error = %err,
"graph quarantined because embedding provider configuration failed"
);
skipped_graphs.push(format!("{}: {err}", graph.graph_id));
continue;
}
};
graphs.push(GraphStartupConfig {
graph_id: graph.graph_id.clone(),
uri: graph.root.to_string_lossy().to_string(),
policy: graph_policies.get(&graph.graph_id).cloned(),
embedding: graph
.embedding
.as_ref()
.map(|profile| {
profile.resolve().map_err(|err| {
eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
})
})
.transpose()?,
embedding,
queries: registry,
});
}
if graphs.is_empty() {
let skipped = skipped_graphs.join(", ");
bail!(
"the cluster at '{}' has no healthy graphs to serve{}",
cluster_dir.display(),
if skipped.is_empty() {
String::new()
} else {
format!(" (quarantined: {skipped})")
}
);
}
if require_all_graphs && !skipped_graphs.is_empty() {
bail!(
"strict cluster boot requires every graph to build startup settings (quarantined: {})",
skipped_graphs.join(", ")
);
}
let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false);
let env_unauth = env_flag("OMNIGRAPH_UNAUTHENTICATED");
Ok(ServerConfig {
mode: ServerConfigMode::Multi {
@ -125,6 +196,7 @@ pub(crate) async fn load_cluster_settings(
},
bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
require_all_graphs,
})
}
@ -136,6 +208,7 @@ pub async fn load_server_settings(
cli_cluster: Option<&PathBuf>,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
cli_require_all_graphs: bool,
) -> Result<ServerConfig> {
let Some(cluster_dir) = cli_cluster else {
bail!(
@ -145,7 +218,23 @@ pub async fn load_server_settings(
was removed in RFC-011."
);
};
load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await
load_cluster_settings(
cluster_dir,
cli_bind,
cli_allow_unauthenticated,
cli_require_all_graphs,
)
.await
}
fn env_flag(name: &str) -> bool {
std::env::var(name)
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false)
}
/// MR-723 server runtime state, classified from the three-state matrix
@ -238,7 +327,9 @@ pub(crate) fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)
.wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
}
pub(crate) fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
pub(crate) fn validate_bearer_tokens(
entries: Vec<(String, String)>,
) -> Result<Vec<(String, String)>> {
let mut seen_actors = HashSet::new();
let mut seen_tokens = HashSet::new();
let mut normalized = Vec::with_capacity(entries.len());
@ -299,11 +390,18 @@ mod tests {
/// as 404 without also masking a 401/500. Pins each outcome.
#[test]
fn authorize_splits_decision_from_operational_error() {
use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
use super::{
Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor,
authorize,
};
use std::sync::Arc;
fn req(action: PolicyAction) -> PolicyRequest {
PolicyRequest { action, branch: None, target_branch: None }
PolicyRequest {
action,
branch: None,
target_branch: None,
}
}
let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
@ -343,7 +441,11 @@ mod tests {
authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
PolicyRequest {
action: PolicyAction::Read,
branch: Some("main".to_string()),
target_branch: None
},
)
.unwrap(),
Authz::Allowed
@ -352,11 +454,17 @@ mod tests {
match authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
PolicyRequest {
action: PolicyAction::Change,
branch: Some("main".to_string()),
target_branch: None,
},
)
.unwrap()
{
Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
Authz::Denied(message) => {
assert!(!message.is_empty(), "a deny carries its decision message")
}
Authz::Allowed => panic!("change must be denied: only read is allowed"),
}
// Policy installed but no actor → operational failure (`Err`), NOT a
@ -393,8 +501,7 @@ mod tests {
};
// Empty registry → nothing attached, no error.
let empty =
super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
let empty = super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
assert!(empty.is_none());
// A query that type-checks → attached.
@ -403,7 +510,11 @@ mod tests {
"query find_user() { match { $u: User } return { $u.name } }",
)])
.unwrap();
assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
assert!(
super::validate_and_attach(ok, &catalog, "g")
.unwrap()
.is_some()
);
// A query referencing a type the schema lacks → boot refusal that
// names both the graph label and the offending query.
@ -416,7 +527,10 @@ mod tests {
let msg = err.to_string();
assert!(msg.contains("graph-x"), "labels the graph: {msg}");
assert!(msg.contains("ghost"), "names the query: {msg}");
assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
assert!(
msg.contains("schema check"),
"mentions the schema check: {msg}"
);
}
#[test]
@ -447,7 +561,7 @@ mod tests {
async fn server_settings_require_cluster_boot_source() {
// RFC-011 cluster-only: with no --cluster the server refuses to
// start and names the cluster-required remedy.
let error = super::load_server_settings(None, None, false)
let error = super::load_server_settings(None, None, false, false)
.await
.unwrap_err();
assert!(
@ -530,6 +644,7 @@ mod tests {
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
require_all_graphs: false,
};
let result = serve(config).await;
let err = result
@ -582,6 +697,7 @@ mod tests {
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
require_all_graphs: false,
};
let result = serve(config).await;
let err =

View file

@ -13,7 +13,6 @@ use serde_json::Value;
use serial_test::serial;
use tower::ServiceExt;
mod support;
use support::*;
@ -414,7 +413,7 @@ async fn cluster_boot_serves_applied_state() {
assert!(server_policy.is_none());
let state =
omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path)
omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path, false)
.await
.unwrap();
let app = build_app(state);
@ -424,7 +423,10 @@ async fn cluster_boot_serves_applied_state() {
// GET /graphs refuses even in cluster mode.
let (status, body) = json_response(
&app,
Request::builder().uri("/graphs").body(Body::empty()).unwrap(),
Request::builder()
.uri("/graphs")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::FORBIDDEN, "{body}");
@ -460,6 +462,115 @@ async fn cluster_boot_serves_applied_state() {
assert_eq!(status, StatusCode::OK, "{body}");
}
#[tokio::test]
async fn cluster_boot_quarantines_graph_open_failures() {
let temp = tempfile::tempdir().unwrap();
let schema = "\nnode Person {\n name: String @key\n}\n";
let good_uri = temp.path().join("good.omni");
Omnigraph::init(good_uri.to_string_lossy().as_ref(), schema)
.await
.unwrap();
let bad_uri = temp.path().join("missing.omni");
let server_policy = omnigraph_server::PolicySource::Inline(
r#"
version: 1
kind: server
groups:
admins: [act-admin]
rules:
- id: admins-list-graphs
allow:
actors: { group: admins }
actions: [graph_list]
"#
.to_string(),
);
let graphs = vec![
omnigraph_server::GraphStartupConfig {
graph_id: "broken".to_string(),
uri: bad_uri.to_string_lossy().to_string(),
policy: None,
embedding: None,
queries: stored_query_registry(&[]),
},
omnigraph_server::GraphStartupConfig {
graph_id: "good".to_string(),
uri: good_uri.to_string_lossy().to_string(),
policy: None,
embedding: None,
queries: stored_query_registry(&[]),
},
];
let strict_err = match omnigraph_server::open_multi_graph_state(
graphs.clone(),
vec![("act-admin".to_string(), "admin-token".to_string())],
Some(&server_policy),
temp.path().join("cluster.yaml"),
true,
)
.await
{
Ok(_) => panic!("strict startup should reject a failed graph open"),
Err(err) => err,
};
assert!(
strict_err
.to_string()
.contains("strict multi-graph startup requires every graph to open"),
"{strict_err}"
);
let state = omnigraph_server::open_multi_graph_state(
graphs,
vec![("act-admin".to_string(), "admin-token".to_string())],
Some(&server_policy),
temp.path().join("cluster.yaml"),
false,
)
.await
.unwrap();
let mut ready: Vec<_> = state
.routing()
.registry
.list()
.iter()
.map(|handle| handle.key.graph_id.as_str().to_string())
.collect();
ready.sort();
assert_eq!(ready, vec!["good"]);
let app = build_app(state);
let (status, body) = json_response(
&app,
Request::builder()
.uri("/graphs")
.header("authorization", "Bearer admin-token")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::OK, "{body}");
assert_eq!(
body["graphs"]
.as_array()
.unwrap()
.iter()
.map(|graph| graph["graph_id"].as_str().unwrap())
.collect::<Vec<_>>(),
vec!["good"]
);
let (status, body) = json_response(
&app,
Request::builder()
.uri("/graphs/broken/queries")
.header("authorization", "Bearer admin-token")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND, "{body}");
}
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn cluster_boot_injects_embedding_provider_config() {
@ -555,6 +666,7 @@ graphs:
Vec::new(),
server_policy.as_ref(),
config_path,
false,
)
.await
.unwrap();
@ -665,7 +777,10 @@ async fn cluster_boot_wires_policy_bindings_into_cedar_slots() {
.unwrap();
fs::write(
temp.path().join("cluster.policy.yaml"),
permit_all_policy_yaml(&["default"]).replace("protected_branches: [main]\n", "protected_branches: [main]\nkind: server\n"),
permit_all_policy_yaml(&["default"]).replace(
"protected_branches: [main]\n",
"protected_branches: [main]\nkind: server\n",
),
)
.unwrap();
fs::write(
@ -719,7 +834,7 @@ graphs:
async fn cluster_boot_refusals() {
// RFC-011 cluster-only: with no --cluster, boot refuses with the
// cluster-required remedy.
let err = omnigraph_server::load_server_settings(None, None, true)
let err = omnigraph_server::load_server_settings(None, None, true, false)
.await
.unwrap_err();
assert!(err.to_string().contains("boots from a cluster"), "{err}");
@ -729,7 +844,12 @@ async fn cluster_boot_refusals() {
// Tampered catalog blob refuses boot with the remedy.
let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person");
let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path();
let blob = fs::read_dir(&blob_dir)
.unwrap()
.next()
.unwrap()
.unwrap()
.path();
fs::write(&blob, "tampered").unwrap();
let err = cluster_settings(&dir).await.unwrap_err();
assert!(

View file

@ -11,7 +11,6 @@ use omnigraph_server::api::ReadRequest;
use omnigraph_server::{AppState, build_app};
use serde_json::json;
mod support;
use support::*;
@ -137,6 +136,7 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() {
Some(&std::path::PathBuf::from(&root)),
None,
true,
false,
)
.await
.unwrap();
@ -153,6 +153,7 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() {
Vec::new(),
server_policy.as_ref(),
config_path,
false,
)
.await
.unwrap();
@ -170,7 +171,9 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() {
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let value: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(value["rows"][0]["p.name"], "Ada", "{value}");
}

View file

@ -370,6 +370,47 @@ async fn list_queries_requires_invoke_query() {
assert!(names.contains(&"find_person"), "invoker sees the exposed query: {names:?}");
}
#[tokio::test(flavor = "multi_thread")]
async fn list_queries_surfaces_query_description_and_instruction() {
// E2e for the query-level `.gq` surface: `@description`/`@instruction` on
// a stored query declaration are carried through to clients via the typed
// `QueryCatalogEntry` fields over `GET /queries`. A query without them
// omits both fields (serde `skip_serializing_if = "Option::is_none"`).
let described = "query described($name: String) \
@description(\"Find a person by exact name.\") \
@instruction(\"Use for exact lookups; prefer search for fuzzy matches.\") \
{ match { $p: Person { name: $name } } return { $p.age } }";
let (_temp, app) = app_with_stored_queries(
&[
("described", described, true),
("bare", "query bare() { match { $p: Person } return { $p.name } }", true),
],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(&app, get_request(&g("/queries"), "t-invoke")).await;
assert_eq!(status, StatusCode::OK, "body: {body}");
let entries = body["queries"].as_array().unwrap();
let described = entries.iter().find(|q| q["name"] == "described").unwrap();
assert_eq!(
described["description"], "Find a person by exact name.",
"query @description surfaces over GET /queries: {described}"
);
assert_eq!(
described["instruction"],
"Use for exact lookups; prefer search for fuzzy matches.",
"query @instruction surfaces over GET /queries: {described}"
);
let bare = entries.iter().find(|q| q["name"] == "bare").unwrap();
assert!(
bare.get("description").is_none() && bare.get("instruction").is_none(),
"a query without the annotations omits both fields: {bare}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn list_queries_is_empty_when_no_registry() {
let (_temp, app) = app_for_loaded_graph_with_auth("demo-token").await;

View file

@ -15,15 +15,12 @@ use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::OmniError;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_policy::{PolicyChecker, PolicyEngine};
use omnigraph_server::api::{
BranchCreateRequest, BranchMergeRequest, ChangeRequest, ReadRequest,
};
use omnigraph_server::api::{BranchCreateRequest, BranchMergeRequest, ChangeRequest, ReadRequest};
use omnigraph_server::queries::{QueryRegistry, RegistrySpec};
use omnigraph_server::{AppState, build_app};
use serde_json::{Value, json};
use tower::ServiceExt;
pub const MUTATION_QUERIES: &str = r#"
query insert_person($name: String, $age: I32) {
insert Person { name: $name, age: $age }
@ -1212,6 +1209,8 @@ graphs:
temp
}
pub async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(Some(&dir.to_path_buf()), None, true).await
pub async fn cluster_settings(
dir: &Path,
) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(Some(&dir.to_path_buf()), None, true, false).await
}