feat(config): typed GraphLocator, servers map, resolve_graph (RFC-002 §1/§2)

Add the typed graph schema behind the version gate: per-graph storage: (string or block) XOR server:+graph_id:, a servers: map, and resolve_graph -> typed GraphLocator (local alias -> srv/gid qualified -> cli.graph default). uri stays a defaulted String filled by normalize_graphs from storage/server, so all existing .uri readers are unchanged and this commit is config-only (server reject-Remote is L5). deny_unknown_fields rejects typos; Storage has a hand-rolled string-or-block Deserialize for precise unknown-field errors. region/endpoint parsed but not yet engine-threaded (V2); GraphLocator.graph_id carried but unused on the wire until V2. 26 config tests pass; cargo test --workspace --locked green (per review).
This commit is contained in:
Ragnor Comerford 2026-06-03 15:48:06 +02:00
parent fd64f8abc4
commit b5690d5d8e
No known key found for this signature in database

View file

@ -22,25 +22,147 @@ pub struct ProjectConfig {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TargetConfig {
/// Embedded base URI. Legacy spelling (`uri:`) for an object-store
/// location; populated post-load from `storage`/`server` so every existing
/// reader sees a resolved base URI. Empty until `storage`/`server`/`uri`
/// is provided and `normalize_graphs` runs.
#[serde(default)]
pub uri: String,
/// Embedded storage location (string or block) — RFC-002 §2. Mutually
/// exclusive with `server`; the bare-string form desugars to `{ uri }`.
pub storage: Option<Storage>,
/// Remote: the name of a `servers:` entry hosting this graph. Mutually
/// exclusive with `storage`/`uri`.
pub server: Option<String>,
/// Remote: the graph's id on `server` (defaults to the entry key).
pub graph_id: Option<String>,
/// Default branch for this graph (sub-state).
pub branch: Option<String>,
/// Optional read-pinned snapshot version.
pub snapshot: Option<u64>,
pub bearer_token_env: Option<String>,
/// Per-graph Cedar policy file (MR-668). In single-graph mode this
/// field is unused — the top-level `policy.file` applies. In
/// multi-graph mode, each `graphs.<id>.policy.file` governs that
/// graph's HTTP-layer Cedar enforcement.
/// Per-graph Cedar policy file (MR-668). Valid only on embedded graphs; a
/// remote graph's policy is owned by the server it targets.
#[serde(default)]
pub policy: PolicySettings,
/// Per-graph stored-query registry: an inline `name -> entry`
/// map. Mirrors the per-graph `policy` shape — each
/// `graphs.<id>.queries` declares that graph's stored queries. Absent
/// (or empty) = no stored queries for the graph. v1 is inline-only;
/// an external `queries.yaml` manifest indirection is a deferred
/// convenience.
/// Per-graph stored-query registry (embedded graphs only; remote queries
/// are server-owned and discovered).
#[serde(default)]
pub queries: BTreeMap<String, QueryEntry>,
}
/// Embedded storage location: a bare URI string, or a block carrying per-graph
/// object-store options. `region`/`endpoint` are parsed but **not yet threaded
/// to the engine** (that lands with the V2 remote/route work); `profile` is
/// intentionally absent (`AWS_PROFILE` is env-only in Lance and our store).
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum Storage {
Bare(String),
Block(StorageBlock),
}
impl<'de> Deserialize<'de> for Storage {
/// String-or-block, hand-rolled rather than `#[serde(untagged)]` on the
/// deserialize side so a bad key in the block form surfaces the precise
/// `unknown field` error instead of a generic "did not match any variant"
/// (the honored-or-rejected DX rule, RFC-002 §1.2).
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = serde_yaml::Value::deserialize(deserializer)?;
if let serde_yaml::Value::String(s) = value {
Ok(Storage::Bare(s))
} else {
serde_yaml::from_value::<StorageBlock>(value)
.map(Storage::Block)
.map_err(serde::de::Error::custom)
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct StorageBlock {
pub uri: String,
pub region: Option<String>,
pub endpoint: Option<String>,
}
impl Storage {
pub fn uri(&self) -> &str {
match self {
Storage::Bare(uri) => uri,
Storage::Block(block) => &block.uri,
}
}
}
/// A named remote server — an endpoint a client targets. Secret-free: bearer
/// credentials are resolved out of band (the auth model is a later change).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ServerEntry {
pub endpoint: String,
}
/// A resolved graph address (RFC-002 §1.1/§2) — replaces scheme-sniffing on a
/// `uri` string with a typed embedded-XOR-remote locator.
#[derive(Debug, Clone)]
pub enum GraphLocator {
Embedded {
/// Object-store URI, resolved against the config `base_dir`.
uri: String,
region: Option<String>,
endpoint: Option<String>,
branch: Option<String>,
snapshot: Option<u64>,
policy_file: Option<PathBuf>,
/// Cedar resource identity (graph name, or normalized URI for a
/// positional `<URI>`).
graph_id: String,
/// The selected graph name, if from `graphs:` (None for a positional).
selected: Option<String>,
},
Remote {
endpoint: String,
server: String,
graph_id: String,
branch: Option<String>,
snapshot: Option<u64>,
},
}
impl GraphLocator {
pub fn is_remote(&self) -> bool {
matches!(self, GraphLocator::Remote { .. })
}
}
/// Scheme classification for a base URI: `http(s)://` is remote, everything
/// else (`file://`, `s3://`, a path) is embedded. The single place this sniff
/// survives, fed only the positional `<URI>` and legacy `uri:` entries.
fn uri_is_remote(uri: &str) -> bool {
uri.starts_with("http://") || uri.starts_with("https://")
}
/// Split a legacy remote `uri:` into `(endpoint, graph_id)`: strip a trailing
/// `/graphs/{gid}` (the pre-typed-locator multi-graph hack) so the resolved
/// address is clean; otherwise the whole URI is the endpoint and `graph_id`
/// falls back to the entry key.
fn split_legacy_remote(uri: &str, key: &str) -> (String, String) {
let trimmed = uri.trim_end_matches('/');
if let Some((endpoint, gid)) = trimmed.rsplit_once("/graphs/") {
if !gid.is_empty() && !gid.contains('/') {
return (endpoint.to_string(), gid.to_string());
}
}
(trimmed.to_string(), key.to_string())
}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize, ValueEnum)]
#[serde(rename_all = "snake_case")]
pub enum ReadOutputFormat {
@ -179,9 +301,18 @@ pub struct AliasConfig {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct OmnigraphConfig {
/// Schema version. Absent = legacy (lenient); `1` = the typed schema. The
/// loader rejects unsupported versions before parsing (`check_config_version`).
#[serde(default)]
pub version: Option<u32>,
#[serde(default)]
pub project: ProjectConfig,
/// Named remote servers (endpoints) referenced by remote graph entries
/// (`graphs.<name>.server`) and `server/graph_id` addressing — RFC-002 §1.
#[serde(default)]
pub servers: BTreeMap<String, ServerEntry>,
#[serde(default, rename = "graphs")]
pub graphs: BTreeMap<String, TargetConfig>,
#[serde(default)]
@ -209,7 +340,9 @@ pub struct OmnigraphConfig {
impl Default for OmnigraphConfig {
fn default() -> Self {
Self {
version: None,
project: ProjectConfig::default(),
servers: BTreeMap::new(),
graphs: BTreeMap::new(),
server: ServerDefaults::default(),
auth: AuthDefaults::default(),
@ -456,6 +589,147 @@ impl OmnigraphConfig {
.ok_or_else(|| color_eyre::eyre::eyre!("alias '{}' not found", name))
}
/// Resolve a graph selection to a typed [`GraphLocator`] (RFC-002 §1): an
/// explicit positional `<URI>` (scheme-sniffed), else a local `graphs:`
/// alias, else a `server/graph_id` qualified name against `servers:`. A
/// bare selection falls back to `cli.graph`.
pub fn resolve_graph(
&self,
explicit_uri: Option<&str>,
name: Option<&str>,
) -> Result<GraphLocator> {
if let Some(uri) = explicit_uri {
// Anonymous positional <URI> — the one surviving scheme sniff.
return Ok(if uri_is_remote(uri) {
let endpoint = uri.trim_end_matches('/').to_string();
GraphLocator::Remote {
graph_id: endpoint.clone(),
server: "<positional>".to_string(),
endpoint,
branch: None,
snapshot: None,
}
} else {
let resolved = self.resolve_config_uri(uri);
GraphLocator::Embedded {
graph_id: graph_resource_id_for_selection(None, &resolved),
uri: resolved,
region: None,
endpoint: None,
branch: None,
snapshot: None,
policy_file: self.resolve_policy_file(),
selected: None,
}
});
}
let name = name.or_else(|| self.cli_graph_name()).ok_or_else(|| {
color_eyre::eyre::eyre!("URI must be provided via <URI>, --graph, or config")
})?;
if let Some(entry) = self.graphs.get(name) {
return self.locator_from_entry(name, entry);
}
if let Some((server_name, graph_id)) = name.split_once('/') {
if let Some(server) = self.servers.get(server_name) {
return Ok(GraphLocator::Remote {
endpoint: server.endpoint.trim_end_matches('/').to_string(),
server: server_name.to_string(),
graph_id: graph_id.to_string(),
branch: None,
snapshot: None,
});
}
}
bail!("graph '{}' not found in {}", name, DEFAULT_CONFIG_FILE)
}
fn locator_from_entry(&self, name: &str, entry: &TargetConfig) -> Result<GraphLocator> {
let remote = entry.server.is_some() || uri_is_remote(&entry.uri);
if remote {
let (endpoint, graph_id, server) = match &entry.server {
Some(server) => (
entry.uri.trim_end_matches('/').to_string(),
entry.graph_id.clone().unwrap_or_else(|| name.to_string()),
server.clone(),
),
None => {
let (endpoint, graph_id) = split_legacy_remote(&entry.uri, name);
(endpoint, graph_id, name.to_string())
}
};
Ok(GraphLocator::Remote {
endpoint,
server,
graph_id,
branch: entry.branch.clone(),
snapshot: entry.snapshot,
})
} else {
let (region, endpoint) = match &entry.storage {
Some(Storage::Block(block)) => (block.region.clone(), block.endpoint.clone()),
_ => (None, None),
};
Ok(GraphLocator::Embedded {
uri: self.resolve_config_uri(&entry.uri),
region,
endpoint,
branch: entry.branch.clone(),
snapshot: entry.snapshot,
policy_file: self.resolve_target_policy_file(name),
graph_id: name.to_string(),
selected: Some(name.to_string()),
})
}
}
/// Fill each graph entry's `uri` from `storage`/`server` and validate the
/// locator shape (RFC-002 §1.2/§3), so existing `uri`-readers see a
/// resolved base URI and invalid combinations fail at load.
fn normalize_graphs(&mut self) -> Result<()> {
let endpoints: BTreeMap<String, String> = self
.servers
.iter()
.map(|(name, entry)| (name.clone(), entry.endpoint.clone()))
.collect();
for (key, entry) in self.graphs.iter_mut() {
match (entry.storage.is_some(), entry.server.is_some()) {
(true, true) => {
bail!("graph '{key}': set exactly one of `storage` or `server`, not both")
}
(true, false) => {
entry.uri = entry.storage.as_ref().unwrap().uri().to_string();
}
(false, true) => {
if entry.policy.file.is_some() || !entry.queries.is_empty() {
bail!(
"graph '{key}': a remote graph (`server:`) cannot define `policy` or \
`queries` those are owned by the server it targets"
);
}
let server = entry.server.as_deref().unwrap();
let endpoint = endpoints.get(server).ok_or_else(|| {
color_eyre::eyre::eyre!(
"graph '{key}': server '{server}' is not defined in `servers:`"
)
})?;
entry.uri = endpoint.clone();
if entry.graph_id.is_none() {
entry.graph_id = Some(key.clone());
}
}
(false, false) => {
if entry.uri.is_empty() {
bail!("graph '{key}': set `storage:` (embedded) or `server:` (remote)");
}
// Legacy `uri:` — kept as-is; classified by scheme downstream.
}
}
}
Ok(())
}
pub fn resolve_target_uri(
&self,
explicit_uri: Option<String>,
@ -551,6 +825,8 @@ fn load_config_in(cwd: &Path, config_path: Option<&PathBuf>) -> Result<Omnigraph
cwd.to_path_buf()
};
config.normalize_graphs()?;
Ok(config)
}
@ -592,7 +868,8 @@ mod tests {
use tempfile::tempdir;
use super::{
ReadOutputFormat, TableCellLayout, graph_resource_id_for_selection, load_config_in,
GraphLocator, ReadOutputFormat, TableCellLayout, graph_resource_id_for_selection,
load_config_in,
};
#[test]
@ -1000,4 +1277,147 @@ cli:
"config version > 1 must be rejected: {err}"
);
}
fn load_yaml(yaml: &str) -> super::OmnigraphConfig {
let temp = tempdir().unwrap();
fs::write(temp.path().join("omnigraph.yaml"), yaml).unwrap();
load_config_in(temp.path(), None).unwrap()
}
fn load_yaml_err(yaml: &str) -> String {
let temp = tempdir().unwrap();
fs::write(temp.path().join("omnigraph.yaml"), yaml).unwrap();
load_config_in(temp.path(), None).unwrap_err().to_string()
}
#[test]
fn storage_bare_resolves_embedded() {
let config = load_yaml("version: 1\ngraphs:\n local:\n storage: ./demo.omni\n");
match config.resolve_graph(None, Some("local")).unwrap() {
GraphLocator::Embedded { uri, selected, graph_id, .. } => {
assert!(uri.ends_with("demo.omni"), "uri: {uri}");
assert_eq!(selected.as_deref(), Some("local"));
assert_eq!(graph_id, "local");
}
other => panic!("expected embedded, got {other:?}"),
}
}
#[test]
fn storage_block_carries_region_and_endpoint() {
let config = load_yaml(
"version: 1\ngraphs:\n prod:\n storage:\n uri: s3://b/prod.omni\n \
region: eu-west-1\n endpoint: https://minio.local\n",
);
match config.resolve_graph(None, Some("prod")).unwrap() {
GraphLocator::Embedded { uri, region, endpoint, .. } => {
assert_eq!(uri, "s3://b/prod.omni");
assert_eq!(region.as_deref(), Some("eu-west-1"));
assert_eq!(endpoint.as_deref(), Some("https://minio.local"));
}
other => panic!("expected embedded, got {other:?}"),
}
}
#[test]
fn remote_server_resolves_graph_id_defaulting_to_key() {
let config = load_yaml(
"version: 1\nservers:\n prod:\n endpoint: https://og.internal:8080\n\
graphs:\n reports:\n server: prod\n",
);
match config.resolve_graph(None, Some("reports")).unwrap() {
GraphLocator::Remote { endpoint, server, graph_id, .. } => {
assert_eq!(endpoint, "https://og.internal:8080");
assert_eq!(server, "prod");
assert_eq!(graph_id, "reports", "graph_id defaults to the entry key");
}
other => panic!("expected remote, got {other:?}"),
}
}
#[test]
fn qualified_server_slash_graph_resolves_without_an_alias() {
let config = load_yaml(
"version: 1\nservers:\n prod:\n endpoint: https://og.internal:8080\n",
);
match config.resolve_graph(None, Some("prod/production")).unwrap() {
GraphLocator::Remote { endpoint, server, graph_id, .. } => {
assert_eq!(endpoint, "https://og.internal:8080");
assert_eq!(server, "prod");
assert_eq!(graph_id, "production");
}
other => panic!("expected remote, got {other:?}"),
}
}
#[test]
fn storage_xor_server_both_set_is_rejected() {
let err = load_yaml_err(
"version: 1\nservers:\n p:\n endpoint: https://h\n\
graphs:\n bad:\n storage: s3://b/x\n server: p\n",
);
assert!(err.contains("exactly one of `storage` or `server`"), "{err}");
}
#[test]
fn graph_with_neither_storage_nor_server_is_rejected() {
let err = load_yaml_err("version: 1\ngraphs:\n bad:\n branch: main\n");
assert!(err.contains("set `storage:`") && err.contains("bad"), "{err}");
}
#[test]
fn remote_graph_with_policy_is_rejected() {
let err = load_yaml_err(
"version: 1\nservers:\n p:\n endpoint: https://h\n\
graphs:\n g:\n server: p\n policy:\n file: ./p.yaml\n",
);
assert!(err.contains("cannot define `policy`"), "{err}");
}
#[test]
fn remote_graph_referencing_unknown_server_is_rejected() {
let err = load_yaml_err("version: 1\ngraphs:\n g:\n server: ghost\n");
assert!(
err.contains("ghost") && err.contains("not defined in `servers:`"),
"{err}"
);
}
#[test]
fn unknown_graph_name_errors() {
let config = load_yaml("version: 1\ngraphs:\n local:\n storage: ./demo.omni\n");
let err = config.resolve_graph(None, Some("ghost")).unwrap_err().to_string();
assert!(err.contains("ghost") && err.contains("not found"), "{err}");
}
#[test]
fn storage_block_rejects_unknown_field() {
// `profile` is intentionally unsupported; deny_unknown_fields catches it.
let err = load_yaml_err(
"version: 1\ngraphs:\n g:\n storage:\n uri: s3://b/x\n profile: default\n",
);
assert!(err.to_lowercase().contains("profile") || err.contains("unknown field"), "{err}");
}
#[test]
fn legacy_remote_uri_splits_graphs_suffix() {
// No `version:` (legacy); a remote uri with the /graphs/{gid} hack splits.
let config = load_yaml("graphs:\n prod:\n uri: https://host:8080/graphs/production\n");
match config.resolve_graph(None, Some("prod")).unwrap() {
GraphLocator::Remote { endpoint, graph_id, .. } => {
assert_eq!(endpoint, "https://host:8080");
assert_eq!(graph_id, "production");
}
other => panic!("expected remote, got {other:?}"),
}
}
#[test]
fn legacy_embedded_uri_still_resolves_embedded() {
let config = load_yaml("graphs:\n local:\n uri: ./demo.omni\n");
assert!(matches!(
config.resolve_graph(None, Some("local")).unwrap(),
GraphLocator::Embedded { .. }
));
}
}