diff --git a/crates/omnigraph-config/src/lib.rs b/crates/omnigraph-config/src/lib.rs index 3f9e444..0856ea9 100644 --- a/crates/omnigraph-config/src/lib.rs +++ b/crates/omnigraph-config/src/lib.rs @@ -22,25 +22,147 @@ pub struct ProjectConfig { } #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] pub struct TargetConfig { + /// Embedded base URI. Legacy spelling (`uri:`) for an object-store + /// location; populated post-load from `storage`/`server` so every existing + /// reader sees a resolved base URI. Empty until `storage`/`server`/`uri` + /// is provided and `normalize_graphs` runs. + #[serde(default)] pub uri: String, + /// Embedded storage location (string or block) — RFC-002 §2. Mutually + /// exclusive with `server`; the bare-string form desugars to `{ uri }`. + pub storage: Option, + /// Remote: the name of a `servers:` entry hosting this graph. Mutually + /// exclusive with `storage`/`uri`. + pub server: Option, + /// Remote: the graph's id on `server` (defaults to the entry key). + pub graph_id: Option, + /// Default branch for this graph (sub-state). + pub branch: Option, + /// Optional read-pinned snapshot version. + pub snapshot: Option, pub bearer_token_env: Option, - /// Per-graph Cedar policy file (MR-668). In single-graph mode this - /// field is unused — the top-level `policy.file` applies. In - /// multi-graph mode, each `graphs..policy.file` governs that - /// graph's HTTP-layer Cedar enforcement. + /// Per-graph Cedar policy file (MR-668). Valid only on embedded graphs; a + /// remote graph's policy is owned by the server it targets. #[serde(default)] pub policy: PolicySettings, - /// Per-graph stored-query registry: an inline `name -> entry` - /// map. Mirrors the per-graph `policy` shape — each - /// `graphs..queries` declares that graph's stored queries. Absent - /// (or empty) = no stored queries for the graph. v1 is inline-only; - /// an external `queries.yaml` manifest indirection is a deferred - /// convenience. + /// Per-graph stored-query registry (embedded graphs only; remote queries + /// are server-owned and discovered). #[serde(default)] pub queries: BTreeMap, } +/// Embedded storage location: a bare URI string, or a block carrying per-graph +/// object-store options. `region`/`endpoint` are parsed but **not yet threaded +/// to the engine** (that lands with the V2 remote/route work); `profile` is +/// intentionally absent (`AWS_PROFILE` is env-only in Lance and our store). +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +pub enum Storage { + Bare(String), + Block(StorageBlock), +} + +impl<'de> Deserialize<'de> for Storage { + /// String-or-block, hand-rolled rather than `#[serde(untagged)]` on the + /// deserialize side so a bad key in the block form surfaces the precise + /// `unknown field` error instead of a generic "did not match any variant" + /// (the honored-or-rejected DX rule, RFC-002 §1.2). + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + let value = serde_yaml::Value::deserialize(deserializer)?; + if let serde_yaml::Value::String(s) = value { + Ok(Storage::Bare(s)) + } else { + serde_yaml::from_value::(value) + .map(Storage::Block) + .map_err(serde::de::Error::custom) + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct StorageBlock { + pub uri: String, + pub region: Option, + pub endpoint: Option, +} + +impl Storage { + pub fn uri(&self) -> &str { + match self { + Storage::Bare(uri) => uri, + Storage::Block(block) => &block.uri, + } + } +} + +/// A named remote server — an endpoint a client targets. Secret-free: bearer +/// credentials are resolved out of band (the auth model is a later change). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ServerEntry { + pub endpoint: String, +} + +/// A resolved graph address (RFC-002 §1.1/§2) — replaces scheme-sniffing on a +/// `uri` string with a typed embedded-XOR-remote locator. +#[derive(Debug, Clone)] +pub enum GraphLocator { + Embedded { + /// Object-store URI, resolved against the config `base_dir`. + uri: String, + region: Option, + endpoint: Option, + branch: Option, + snapshot: Option, + policy_file: Option, + /// Cedar resource identity (graph name, or normalized URI for a + /// positional ``). + graph_id: String, + /// The selected graph name, if from `graphs:` (None for a positional). + selected: Option, + }, + Remote { + endpoint: String, + server: String, + graph_id: String, + branch: Option, + snapshot: Option, + }, +} + +impl GraphLocator { + pub fn is_remote(&self) -> bool { + matches!(self, GraphLocator::Remote { .. }) + } +} + +/// Scheme classification for a base URI: `http(s)://` is remote, everything +/// else (`file://`, `s3://`, a path) is embedded. The single place this sniff +/// survives, fed only the positional `` and legacy `uri:` entries. +fn uri_is_remote(uri: &str) -> bool { + uri.starts_with("http://") || uri.starts_with("https://") +} + +/// Split a legacy remote `uri:` into `(endpoint, graph_id)`: strip a trailing +/// `/graphs/{gid}` (the pre-typed-locator multi-graph hack) so the resolved +/// address is clean; otherwise the whole URI is the endpoint and `graph_id` +/// falls back to the entry key. +fn split_legacy_remote(uri: &str, key: &str) -> (String, String) { + let trimmed = uri.trim_end_matches('/'); + if let Some((endpoint, gid)) = trimmed.rsplit_once("/graphs/") { + if !gid.is_empty() && !gid.contains('/') { + return (endpoint.to_string(), gid.to_string()); + } + } + (trimmed.to_string(), key.to_string()) +} + #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize, ValueEnum)] #[serde(rename_all = "snake_case")] pub enum ReadOutputFormat { @@ -179,9 +301,18 @@ pub struct AliasConfig { } #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] pub struct OmnigraphConfig { + /// Schema version. Absent = legacy (lenient); `1` = the typed schema. The + /// loader rejects unsupported versions before parsing (`check_config_version`). + #[serde(default)] + pub version: Option, #[serde(default)] pub project: ProjectConfig, + /// Named remote servers (endpoints) referenced by remote graph entries + /// (`graphs..server`) and `server/graph_id` addressing — RFC-002 §1. + #[serde(default)] + pub servers: BTreeMap, #[serde(default, rename = "graphs")] pub graphs: BTreeMap, #[serde(default)] @@ -209,7 +340,9 @@ pub struct OmnigraphConfig { impl Default for OmnigraphConfig { fn default() -> Self { Self { + version: None, project: ProjectConfig::default(), + servers: BTreeMap::new(), graphs: BTreeMap::new(), server: ServerDefaults::default(), auth: AuthDefaults::default(), @@ -456,6 +589,147 @@ impl OmnigraphConfig { .ok_or_else(|| color_eyre::eyre::eyre!("alias '{}' not found", name)) } + /// Resolve a graph selection to a typed [`GraphLocator`] (RFC-002 §1): an + /// explicit positional `` (scheme-sniffed), else a local `graphs:` + /// alias, else a `server/graph_id` qualified name against `servers:`. A + /// bare selection falls back to `cli.graph`. + pub fn resolve_graph( + &self, + explicit_uri: Option<&str>, + name: Option<&str>, + ) -> Result { + if let Some(uri) = explicit_uri { + // Anonymous positional — the one surviving scheme sniff. + return Ok(if uri_is_remote(uri) { + let endpoint = uri.trim_end_matches('/').to_string(); + GraphLocator::Remote { + graph_id: endpoint.clone(), + server: "".to_string(), + endpoint, + branch: None, + snapshot: None, + } + } else { + let resolved = self.resolve_config_uri(uri); + GraphLocator::Embedded { + graph_id: graph_resource_id_for_selection(None, &resolved), + uri: resolved, + region: None, + endpoint: None, + branch: None, + snapshot: None, + policy_file: self.resolve_policy_file(), + selected: None, + } + }); + } + + let name = name.or_else(|| self.cli_graph_name()).ok_or_else(|| { + color_eyre::eyre::eyre!("URI must be provided via , --graph, or config") + })?; + + if let Some(entry) = self.graphs.get(name) { + return self.locator_from_entry(name, entry); + } + if let Some((server_name, graph_id)) = name.split_once('/') { + if let Some(server) = self.servers.get(server_name) { + return Ok(GraphLocator::Remote { + endpoint: server.endpoint.trim_end_matches('/').to_string(), + server: server_name.to_string(), + graph_id: graph_id.to_string(), + branch: None, + snapshot: None, + }); + } + } + bail!("graph '{}' not found in {}", name, DEFAULT_CONFIG_FILE) + } + + fn locator_from_entry(&self, name: &str, entry: &TargetConfig) -> Result { + let remote = entry.server.is_some() || uri_is_remote(&entry.uri); + if remote { + let (endpoint, graph_id, server) = match &entry.server { + Some(server) => ( + entry.uri.trim_end_matches('/').to_string(), + entry.graph_id.clone().unwrap_or_else(|| name.to_string()), + server.clone(), + ), + None => { + let (endpoint, graph_id) = split_legacy_remote(&entry.uri, name); + (endpoint, graph_id, name.to_string()) + } + }; + Ok(GraphLocator::Remote { + endpoint, + server, + graph_id, + branch: entry.branch.clone(), + snapshot: entry.snapshot, + }) + } else { + let (region, endpoint) = match &entry.storage { + Some(Storage::Block(block)) => (block.region.clone(), block.endpoint.clone()), + _ => (None, None), + }; + Ok(GraphLocator::Embedded { + uri: self.resolve_config_uri(&entry.uri), + region, + endpoint, + branch: entry.branch.clone(), + snapshot: entry.snapshot, + policy_file: self.resolve_target_policy_file(name), + graph_id: name.to_string(), + selected: Some(name.to_string()), + }) + } + } + + /// Fill each graph entry's `uri` from `storage`/`server` and validate the + /// locator shape (RFC-002 §1.2/§3), so existing `uri`-readers see a + /// resolved base URI and invalid combinations fail at load. + fn normalize_graphs(&mut self) -> Result<()> { + let endpoints: BTreeMap = self + .servers + .iter() + .map(|(name, entry)| (name.clone(), entry.endpoint.clone())) + .collect(); + for (key, entry) in self.graphs.iter_mut() { + match (entry.storage.is_some(), entry.server.is_some()) { + (true, true) => { + bail!("graph '{key}': set exactly one of `storage` or `server`, not both") + } + (true, false) => { + entry.uri = entry.storage.as_ref().unwrap().uri().to_string(); + } + (false, true) => { + if entry.policy.file.is_some() || !entry.queries.is_empty() { + bail!( + "graph '{key}': a remote graph (`server:`) cannot define `policy` or \ + `queries` — those are owned by the server it targets" + ); + } + let server = entry.server.as_deref().unwrap(); + let endpoint = endpoints.get(server).ok_or_else(|| { + color_eyre::eyre::eyre!( + "graph '{key}': server '{server}' is not defined in `servers:`" + ) + })?; + entry.uri = endpoint.clone(); + if entry.graph_id.is_none() { + entry.graph_id = Some(key.clone()); + } + } + (false, false) => { + if entry.uri.is_empty() { + bail!("graph '{key}': set `storage:` (embedded) or `server:` (remote)"); + } + // Legacy `uri:` — kept as-is; classified by scheme downstream. + } + } + } + Ok(()) + } + pub fn resolve_target_uri( &self, explicit_uri: Option, @@ -551,6 +825,8 @@ fn load_config_in(cwd: &Path, config_path: Option<&PathBuf>) -> Result 1 must be rejected: {err}" ); } + + fn load_yaml(yaml: &str) -> super::OmnigraphConfig { + let temp = tempdir().unwrap(); + fs::write(temp.path().join("omnigraph.yaml"), yaml).unwrap(); + load_config_in(temp.path(), None).unwrap() + } + + fn load_yaml_err(yaml: &str) -> String { + let temp = tempdir().unwrap(); + fs::write(temp.path().join("omnigraph.yaml"), yaml).unwrap(); + load_config_in(temp.path(), None).unwrap_err().to_string() + } + + #[test] + fn storage_bare_resolves_embedded() { + let config = load_yaml("version: 1\ngraphs:\n local:\n storage: ./demo.omni\n"); + match config.resolve_graph(None, Some("local")).unwrap() { + GraphLocator::Embedded { uri, selected, graph_id, .. } => { + assert!(uri.ends_with("demo.omni"), "uri: {uri}"); + assert_eq!(selected.as_deref(), Some("local")); + assert_eq!(graph_id, "local"); + } + other => panic!("expected embedded, got {other:?}"), + } + } + + #[test] + fn storage_block_carries_region_and_endpoint() { + let config = load_yaml( + "version: 1\ngraphs:\n prod:\n storage:\n uri: s3://b/prod.omni\n \ + region: eu-west-1\n endpoint: https://minio.local\n", + ); + match config.resolve_graph(None, Some("prod")).unwrap() { + GraphLocator::Embedded { uri, region, endpoint, .. } => { + assert_eq!(uri, "s3://b/prod.omni"); + assert_eq!(region.as_deref(), Some("eu-west-1")); + assert_eq!(endpoint.as_deref(), Some("https://minio.local")); + } + other => panic!("expected embedded, got {other:?}"), + } + } + + #[test] + fn remote_server_resolves_graph_id_defaulting_to_key() { + let config = load_yaml( + "version: 1\nservers:\n prod:\n endpoint: https://og.internal:8080\n\ + graphs:\n reports:\n server: prod\n", + ); + match config.resolve_graph(None, Some("reports")).unwrap() { + GraphLocator::Remote { endpoint, server, graph_id, .. } => { + assert_eq!(endpoint, "https://og.internal:8080"); + assert_eq!(server, "prod"); + assert_eq!(graph_id, "reports", "graph_id defaults to the entry key"); + } + other => panic!("expected remote, got {other:?}"), + } + } + + #[test] + fn qualified_server_slash_graph_resolves_without_an_alias() { + let config = load_yaml( + "version: 1\nservers:\n prod:\n endpoint: https://og.internal:8080\n", + ); + match config.resolve_graph(None, Some("prod/production")).unwrap() { + GraphLocator::Remote { endpoint, server, graph_id, .. } => { + assert_eq!(endpoint, "https://og.internal:8080"); + assert_eq!(server, "prod"); + assert_eq!(graph_id, "production"); + } + other => panic!("expected remote, got {other:?}"), + } + } + + #[test] + fn storage_xor_server_both_set_is_rejected() { + let err = load_yaml_err( + "version: 1\nservers:\n p:\n endpoint: https://h\n\ + graphs:\n bad:\n storage: s3://b/x\n server: p\n", + ); + assert!(err.contains("exactly one of `storage` or `server`"), "{err}"); + } + + #[test] + fn graph_with_neither_storage_nor_server_is_rejected() { + let err = load_yaml_err("version: 1\ngraphs:\n bad:\n branch: main\n"); + assert!(err.contains("set `storage:`") && err.contains("bad"), "{err}"); + } + + #[test] + fn remote_graph_with_policy_is_rejected() { + let err = load_yaml_err( + "version: 1\nservers:\n p:\n endpoint: https://h\n\ + graphs:\n g:\n server: p\n policy:\n file: ./p.yaml\n", + ); + assert!(err.contains("cannot define `policy`"), "{err}"); + } + + #[test] + fn remote_graph_referencing_unknown_server_is_rejected() { + let err = load_yaml_err("version: 1\ngraphs:\n g:\n server: ghost\n"); + assert!( + err.contains("ghost") && err.contains("not defined in `servers:`"), + "{err}" + ); + } + + #[test] + fn unknown_graph_name_errors() { + let config = load_yaml("version: 1\ngraphs:\n local:\n storage: ./demo.omni\n"); + let err = config.resolve_graph(None, Some("ghost")).unwrap_err().to_string(); + assert!(err.contains("ghost") && err.contains("not found"), "{err}"); + } + + #[test] + fn storage_block_rejects_unknown_field() { + // `profile` is intentionally unsupported; deny_unknown_fields catches it. + let err = load_yaml_err( + "version: 1\ngraphs:\n g:\n storage:\n uri: s3://b/x\n profile: default\n", + ); + assert!(err.to_lowercase().contains("profile") || err.contains("unknown field"), "{err}"); + } + + #[test] + fn legacy_remote_uri_splits_graphs_suffix() { + // No `version:` (legacy); a remote uri with the /graphs/{gid} hack splits. + let config = load_yaml("graphs:\n prod:\n uri: https://host:8080/graphs/production\n"); + match config.resolve_graph(None, Some("prod")).unwrap() { + GraphLocator::Remote { endpoint, graph_id, .. } => { + assert_eq!(endpoint, "https://host:8080"); + assert_eq!(graph_id, "production"); + } + other => panic!("expected remote, got {other:?}"), + } + } + + #[test] + fn legacy_embedded_uri_still_resolves_embedded() { + let config = load_yaml("graphs:\n local:\n uri: ./demo.omni\n"); + assert!(matches!( + config.resolve_graph(None, Some("local")).unwrap(), + GraphLocator::Embedded { .. } + )); + } }