diff --git a/Cargo.lock b/Cargo.lock index 2d7277e..1b1bd7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4588,6 +4588,7 @@ dependencies = [ "cedar-policy", "clap", "color-eyre", + "futures", "omnigraph", "omnigraph-compiler", "serde", diff --git a/README.md b/README.md index 369f73c..d2b7477 100644 --- a/README.md +++ b/README.md @@ -111,8 +111,8 @@ Core repo flow: omnigraph init --schema ./schema.pg ./repo.omni omnigraph load --data ./data.jsonl --mode overwrite ./repo.omni omnigraph snapshot ./repo.omni --branch main --json -omnigraph read ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}' -omnigraph change ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}' +omnigraph read --uri ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}' +omnigraph change --uri ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}' omnigraph branch create --uri ./repo.omni --from main feature-x omnigraph branch merge --uri ./repo.omni feature-x --into main ``` diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 9a74a7f..707cf9d 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -1,4 +1,5 @@ use std::fs; +use std::io::{self, Write}; use std::path::Path; use std::path::PathBuf; @@ -125,7 +126,7 @@ enum Command { config: Option, #[arg(long)] branch: Option, - #[arg(long)] + #[arg(long, hide = true)] jsonl: bool, #[arg(long = "type")] type_names: Vec, @@ -145,7 +146,10 @@ enum Command { /// Execute a read query against a branch or snapshot Read { /// Repo URI + #[arg(long)] uri: Option, + #[arg(hide = true)] + legacy_uri: Option, #[arg(long)] target: Option, #[arg(long)] @@ -172,7 +176,10 @@ enum Command { /// Execute a graph change query against a branch Change { /// Repo URI + #[arg(long)] uri: Option, + #[arg(hide = true)] + legacy_uri: Option, #[arg(long)] target: Option, #[arg(long)] @@ -347,6 +354,7 @@ enum CommitCommand { /// Show a graph commit Show { /// Repo URI + #[arg(long)] uri: Option, #[arg(long)] target: Option, @@ -428,6 +436,8 @@ struct LoadOutput<'a> { mode: &'a str, nodes_loaded: usize, edges_loaded: usize, + node_types_loaded: usize, + edge_types_loaded: usize, } #[derive(Debug, Serialize)] @@ -644,31 +654,6 @@ async fn remote_json( Ok(serde_json::from_str(&text)?) } -async fn remote_text( - client: &reqwest::Client, - method: Method, - url: String, - body: Option, - bearer_token: Option<&str>, -) -> Result { - let request = apply_bearer_token(client.request(method, url), bearer_token); - let request = if let Some(body) = body { - request.json(&body) - } else { - request - }; - let response = request.send().await?; - let status = response.status(); - let text = response.text().await?; - if !status.is_success() { - if let Ok(error) = serde_json::from_str::(&text) { - bail!(error.error); - } - bail!("server returned {}: {}", status, text); - } - Ok(text) -} - fn resolve_uri( config: &OmnigraphConfig, cli_uri: Option, @@ -784,14 +769,18 @@ fn print_load_human( mode: CliLoadMode, nodes_loaded: usize, edges_loaded: usize, + node_types_loaded: usize, + edge_types_loaded: usize, ) { println!( - "loaded {} on branch {} with {}: {} node types, {} edge types", + "loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types", uri, branch, mode.as_str(), nodes_loaded, - edges_loaded + node_types_loaded, + edges_loaded, + edge_types_loaded ); } @@ -1148,10 +1137,9 @@ fn resolve_alias<'a>( Ok(Some((alias_name, alias))) } -fn normalize_alias_args( +fn normalize_legacy_alias_uri( uri: Option, - target: Option<&str>, - default_target_present: bool, + target_available: bool, alias_name: Option<&str>, mut alias_args: Vec, ) -> (Option, Vec) { @@ -1159,12 +1147,7 @@ fn normalize_alias_args( return (None, alias_args); }; - if alias_name.is_some() - && (target.is_some() || default_target_present) - && !is_remote_uri(&candidate) - && !candidate.contains(std::path::MAIN_SEPARATOR) - && !Path::new(&candidate).exists() - { + if alias_name.is_some() && target_available { alias_args.insert(0, candidate); return (None, alias_args); } @@ -1392,36 +1375,53 @@ async fn execute_change_remote( .await } -async fn execute_export( +async fn execute_export_to_writer( uri: &str, branch: &str, type_names: &[String], table_keys: &[String], -) -> Result { + writer: &mut W, +) -> Result<()> { let db = Omnigraph::open(uri).await?; - Ok(db.export_jsonl(branch, type_names, table_keys).await?) + db.export_jsonl_to_writer(branch, type_names, table_keys, writer) + .await?; + writer.flush()?; + Ok(()) } -async fn execute_export_remote( +async fn execute_export_remote_to_writer( client: &reqwest::Client, uri: &str, branch: &str, type_names: &[String], table_keys: &[String], bearer_token: Option<&str>, -) -> Result { - remote_text( - client, - Method::POST, - remote_url(uri, "/export"), - Some(serde_json::to_value(ExportRequest { - branch: Some(branch.to_string()), - type_names: type_names.to_vec(), - table_keys: table_keys.to_vec(), - })?), + writer: &mut W, +) -> Result<()> { + let request = apply_bearer_token( + client.request(Method::POST, remote_url(uri, "/export")), bearer_token, ) - .await + .json(&ExportRequest { + branch: Some(branch.to_string()), + type_names: type_names.to_vec(), + table_keys: table_keys.to_vec(), + }); + let mut response = request.send().await?; + let status = response.status(); + if !status.is_success() { + let text = response.text().await?; + if let Ok(error) = serde_json::from_str::(&text) { + bail!(error.error); + } + bail!("server returned {}: {}", status, text); + } + + while let Some(chunk) = response.chunk().await? { + writer.write_all(&chunk)?; + } + writer.flush()?; + Ok(()) } #[tokio::main] @@ -1479,8 +1479,10 @@ async fn main() -> Result<()> { uri: &uri, branch: &branch, mode: mode.as_str(), - nodes_loaded: result.nodes_loaded.len(), - edges_loaded: result.edges_loaded.len(), + nodes_loaded: result.nodes_loaded.values().sum(), + edges_loaded: result.edges_loaded.values().sum(), + node_types_loaded: result.nodes_loaded.len(), + edge_types_loaded: result.edges_loaded.len(), }; if json { print_json(&payload)?; @@ -1491,6 +1493,8 @@ async fn main() -> Result<()> { mode, payload.nodes_loaded, payload.edges_loaded, + payload.node_types_loaded, + payload.edge_types_loaded, ); } } @@ -1831,7 +1835,7 @@ async fn main() -> Result<()> { target, config, branch, - jsonl: _, + jsonl, type_names, table_keys, } => { @@ -1840,20 +1844,27 @@ async fn main() -> Result<()> { resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let branch = resolve_branch(&config, branch, None, "main"); - let output = if is_remote_uri(&uri) { - execute_export_remote( + if jsonl { + eprintln!("warning: --jsonl is deprecated; `omnigraph export` always emits JSONL"); + } + + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + if is_remote_uri(&uri) { + execute_export_remote_to_writer( &http_client, &uri, &branch, &type_names, &table_keys, bearer_token.as_deref(), + &mut stdout, ) - .await? + .await?; } else { - execute_export(&uri, &branch, &type_names, &table_keys).await? - }; - print!("{output}"); + execute_export_to_writer(&uri, &branch, &type_names, &table_keys, &mut stdout) + .await?; + } } Command::Run { command } => match command { RunCommand::List { @@ -1984,6 +1995,7 @@ async fn main() -> Result<()> { }, Command::Read { uri, + legacy_uri, target, config, alias, @@ -2004,13 +2016,14 @@ async fn main() -> Result<()> { let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Read)?; let alias_name = alias.as_ref().map(|(name, _)| *name); let alias_config = alias.as_ref().map(|(_, alias)| *alias); - let (uri, alias_args) = normalize_alias_args( - uri, - target.as_deref(), - config.cli_target_name().is_some(), - alias_name, - alias_args, - ); + let target_available = target.is_some() + || alias_config + .and_then(|alias| alias.target.as_deref()) + .is_some() + || config.cli_target_name().is_some(); + let (legacy_uri, alias_args) = + normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args); + let uri = uri.or(legacy_uri); let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.target.as_deref())); @@ -2067,6 +2080,7 @@ async fn main() -> Result<()> { } Command::Change { uri, + legacy_uri, target, config, alias, @@ -2085,13 +2099,14 @@ async fn main() -> Result<()> { let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Change)?; let alias_name = alias.as_ref().map(|(name, _)| *name); let alias_config = alias.as_ref().map(|(_, alias)| *alias); - let (uri, alias_args) = normalize_alias_args( - uri, - target.as_deref(), - config.cli_target_name().is_some(), - alias_name, - alias_args, - ); + let target_available = target.is_some() + || alias_config + .and_then(|alias| alias.target.as_deref()) + .is_some() + || config.cli_target_name().is_some(); + let (legacy_uri, alias_args) = + normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args); + let uri = uri.or(legacy_uri); let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.target.as_deref())); diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 62aa16a..d2d20d2 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -318,8 +318,10 @@ fn load_json_outputs_summary_for_main_branch() { assert_eq!(payload["branch"], "main"); assert_eq!(payload["mode"], "overwrite"); - assert_eq!(payload["nodes_loaded"], 2); - assert_eq!(payload["edges_loaded"], 2); + assert_eq!(payload["nodes_loaded"], 6); + assert_eq!(payload["edges_loaded"], 5); + assert_eq!(payload["node_types_loaded"], 2); + assert_eq!(payload["edge_types_loaded"], 2); } #[test] @@ -361,7 +363,7 @@ fn load_into_feature_branch_with_merge_mode_succeeds() { assert!(stdout.contains("branch feature")); assert!(stdout.contains("with merge")); - assert!(stdout.contains("1 node types")); + assert!(stdout.contains("1 nodes across 1 node types")); } #[test] @@ -629,6 +631,46 @@ fn read_alias_from_yaml_config_runs_with_kv_output() { assert!(stdout.contains("p.name: Alice")); } +#[test] +fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + let config = temp.path().join("omnigraph.yaml"); + let query = temp.path().join("aliases.gq"); + let data = temp.path().join("url-like.jsonl"); + init_repo(&repo); + write_jsonl( + &data, + r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#, + ); + output_success(cli().arg("load").arg("--data").arg(&data).arg(&repo)); + write_query_file( + &query, + &std::fs::read_to_string(fixture("test.gq")).unwrap(), + ); + write_config( + &config, + &format!( + "targets:\n local:\n uri: '{}'\nquery:\n roots:\n - .\npolicy: {{}}\naliases:\n owner:\n command: read\n query: aliases.gq\n name: get_person\n args: [name]\n target: local\n format: kv\n", + repo.to_string_lossy() + ), + ); + + let output = output_success( + cli() + .arg("read") + .arg("--config") + .arg(&config) + .arg("--alias") + .arg("owner") + .arg("https://example.com"), + ); + let stdout = stdout_string(&output); + + assert!(stdout.contains("row 1")); + assert!(stdout.contains("p.name: https://example.com")); +} + #[test] fn change_alias_from_yaml_config_persists_changes() { let temp = tempdir().unwrap(); @@ -1202,6 +1244,35 @@ fn snapshot_human_output_includes_branch_and_table_summaries() { assert!(stdout.contains("edge:Knows v")); } +#[test] +fn commit_show_accepts_long_uri_flag() { + let temp = tempdir().unwrap(); + let repo = repo_path(temp.path()); + init_repo(&repo); + load_fixture(&repo); + + let list = output_success(cli().arg("commit").arg("list").arg(&repo).arg("--json")); + let list_payload: Value = serde_json::from_slice(&list.stdout).unwrap(); + let commit_id = list_payload["commits"][0]["graph_commit_id"] + .as_str() + .unwrap() + .to_string(); + + let output = output_success( + cli() + .arg("commit") + .arg("show") + .arg("--uri") + .arg(&repo) + .arg(&commit_id) + .arg("--json"), + ); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + + assert_eq!(payload["graph_commit_id"], commit_id); + assert!(payload["manifest_version"].as_u64().unwrap() >= 1); +} + #[test] fn cli_fails_for_missing_repo() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index 362996d..be17c79 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -23,6 +23,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } tower-http = { workspace = true } cedar-policy = { workspace = true } +futures = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 17dca60..4bcfe4f 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -4,6 +4,8 @@ pub mod policy; use std::collections::{HashMap, HashSet}; use std::fs; +use std::io; +use std::io::Write; use std::path::PathBuf; use std::sync::Arc; @@ -14,6 +16,7 @@ use api::{ IngestRequest, ReadOutput, ReadRequest, RunListOutput, SnapshotQuery, ingest_output, snapshot_payload, }; +use axum::body::{Body, Bytes}; use axum::extract::DefaultBodyLimit; use axum::extract::{Extension, Path, Query, Request, State}; use axum::http::StatusCode; @@ -28,6 +31,7 @@ pub use config::{ ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig, load_config, }; +use futures::stream; use omnigraph::db::{Omnigraph, ReadTarget, RunId}; use omnigraph::error::{ManifestErrorKind, OmniError}; use omnigraph_compiler::json_params_to_param_map; @@ -39,7 +43,7 @@ pub use policy::{ }; use serde_json::Value; use tokio::net::TcpListener; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc}; use tower_http::trace::TraceLayer; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -67,6 +71,23 @@ pub struct AppState { #[derive(Debug, Clone)] struct AuthenticatedActor(Arc); +struct ExportStreamWriter { + sender: mpsc::UnboundedSender>, +} + +impl Write for ExportStreamWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.sender + .send(Ok(Bytes::copy_from_slice(buf))) + .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + impl AuthenticatedActor { fn as_str(&self) -> &str { &self.0 @@ -567,16 +588,28 @@ async fn server_export( target_branch: None, }, )?; - let payload = { - let db = Arc::clone(&state.db).read_owned().await; - db.export_jsonl(&branch, &request.type_names, &request.table_keys) - .await - .map_err(ApiError::from_omni)? - }; + let db = Arc::clone(&state.db); + let type_names = request.type_names.clone(); + let table_keys = request.table_keys.clone(); + let (tx, rx) = mpsc::unbounded_channel::>(); + tokio::spawn(async move { + let result = { + let db = db.read().await; + let mut writer = ExportStreamWriter { sender: tx.clone() }; + db.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer) + .await + }; + if let Err(err) = result { + let _ = tx.send(Err(io::Error::other(err.to_string()))); + } + }); + let body = Body::from_stream(stream::unfold(rx, |mut rx| async move { + rx.recv().await.map(|item| (item, rx)) + })); Ok(( StatusCode::OK, [(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")], - payload, + body, ) .into_response()) } diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 2dc93fa..c26118e 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeSet, HashMap, HashSet}; +use std::io::Write; use std::sync::Arc; use arrow_array::{ @@ -368,9 +369,23 @@ impl Omnigraph { type_names: &[String], table_keys: &[String], ) -> Result { + let mut out = Vec::new(); + self.export_jsonl_to_writer(branch, type_names, table_keys, &mut out) + .await?; + String::from_utf8(out) + .map_err(|err| OmniError::manifest(format!("export produced invalid UTF-8: {}", err))) + } + + pub async fn export_jsonl_to_writer( + &self, + branch: &str, + type_names: &[String], + table_keys: &[String], + writer: &mut W, + ) -> Result<()> { self.ensure_schema_state_valid().await?; let snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?; - self.export_snapshot_jsonl(&snapshot, type_names, table_keys) + self.export_snapshot_jsonl_to_writer(&snapshot, type_names, table_keys, writer) .await } @@ -399,26 +414,19 @@ impl Omnigraph { Ok(Some(record_batch_row_to_json(batch, 0)?)) } - async fn export_snapshot_jsonl( + async fn export_snapshot_jsonl_to_writer( &self, snapshot: &Snapshot, type_names: &[String], table_keys: &[String], - ) -> Result { + writer: &mut W, + ) -> Result<()> { let selected_tables = self.export_table_keys(snapshot, type_names, table_keys)?; - let mut out = String::new(); for table_key in selected_tables { - for row in self.export_table_rows(snapshot, &table_key).await? { - out.push_str(&serde_json::to_string(&row).map_err(|err| { - OmniError::manifest(format!( - "failed to serialize export row for '{}': {}", - table_key, err - )) - })?); - out.push('\n'); - } + self.export_table_to_writer(snapshot, &table_key, writer) + .await?; } - Ok(out) + Ok(()) } fn export_table_keys( @@ -470,11 +478,12 @@ impl Omnigraph { Ok(selected.into_iter().collect()) } - async fn export_table_rows( + async fn export_table_to_writer( &self, snapshot: &Snapshot, table_key: &str, - ) -> Result> { + writer: &mut W, + ) -> Result<()> { let ds = self .table_store .open_snapshot_table(snapshot, table_key) @@ -483,46 +492,36 @@ impl Omnigraph { let blob_properties = blob_properties_for_table_key(self.catalog(), table_key)?; if blob_properties.is_empty() { - let batch = concat_or_empty_batches( - schema_for_table_key(self.catalog(), table_key)?, - self.table_store.scan(&ds, None, None, ordering).await?, - )?; - return self.export_rows_from_batch(table_key, &batch, None).await; + for batch in self.table_store.scan(&ds, None, None, ordering).await? { + self.write_export_rows_from_batch(table_key, &batch, None, writer)?; + } + return Ok(()); } let batches = self .table_store .scan_with(&ds, None, None, ordering, true, |_| Ok(())) .await?; - if batches.is_empty() { - return Ok(Vec::new()); + for batch in batches { + let row_ids = batch + .column_by_name("_rowid") + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| { + OmniError::Lance(format!( + "expected _rowid column when exporting '{}'", + table_key + )) + })? + .values() + .iter() + .copied() + .collect::>(); + let blob_values = self + .export_blob_values(&ds, &batch, &row_ids, blob_properties) + .await?; + self.write_export_rows_from_batch(table_key, &batch, Some(&blob_values), writer)?; } - - let scan_schema = batches[0].schema(); - let batch = if batches.len() == 1 { - batches.into_iter().next().unwrap() - } else { - arrow_select::concat::concat_batches(&scan_schema, &batches) - .map_err(|e| OmniError::Lance(e.to_string()))? - }; - let row_ids = batch - .column_by_name("_rowid") - .and_then(|col| col.as_any().downcast_ref::()) - .ok_or_else(|| { - OmniError::Lance(format!( - "expected _rowid column when exporting '{}'", - table_key - )) - })? - .values() - .iter() - .copied() - .collect::>(); - let blob_values = self - .export_blob_values(&ds, &batch, &row_ids, blob_properties) - .await?; - self.export_rows_from_batch(table_key, &batch, Some(&blob_values)) - .await + Ok(()) } // ─── Graph index ────────────────────────────────────────────────────── @@ -1755,18 +1754,18 @@ impl Omnigraph { Ok(values) } - async fn export_rows_from_batch( + fn write_export_rows_from_batch( &self, table_key: &str, batch: &RecordBatch, blob_values: Option<&HashMap>>>, - ) -> Result> { + writer: &mut W, + ) -> Result<()> { if let Some(type_name) = table_key.strip_prefix("node:") { let node_type = self.catalog.node_types.get(type_name).ok_or_else(|| { OmniError::manifest(format!("unknown node type '{}'", type_name)) })?; - let mut rows = Vec::with_capacity(batch.num_rows()); for row in 0..batch.num_rows() { let mut data = serde_json::Map::new(); data.insert( @@ -1784,12 +1783,16 @@ impl Omnigraph { )?, ); } - rows.push(serde_json::json!({ + write_export_jsonl_row( + writer, + table_key, + &serde_json::json!({ "type": type_name, "data": serde_json::Value::Object(data), - })); + }), + )?; } - return Ok(rows); + return Ok(()); } if let Some(edge_name) = table_key.strip_prefix("edge:") { @@ -1797,7 +1800,6 @@ impl Omnigraph { self.catalog.edge_types.get(edge_name).ok_or_else(|| { OmniError::manifest(format!("unknown edge type '{}'", edge_name)) })?; - let mut rows = Vec::with_capacity(batch.num_rows()); for row in 0..batch.num_rows() { let from = named_string_value(batch, "src", row)?; let to = named_string_value(batch, "dst", row)?; @@ -1817,14 +1819,18 @@ impl Omnigraph { )?, ); } - rows.push(serde_json::json!({ + write_export_jsonl_row( + writer, + table_key, + &serde_json::json!({ "edge": edge_name, "from": from, "to": to, "data": serde_json::Value::Object(data), - })); + }), + )?; } - return Ok(rows); + return Ok(()); } Err(OmniError::manifest(format!( @@ -1834,6 +1840,21 @@ impl Omnigraph { } } +fn write_export_jsonl_row( + writer: &mut W, + table_key: &str, + row: &serde_json::Value, +) -> Result<()> { + serde_json::to_writer(&mut *writer, row).map_err(|err| { + OmniError::manifest(format!( + "failed to serialize export row for '{}': {}", + table_key, err + )) + })?; + writer.write_all(b"\n")?; + Ok(()) +} + async fn export_blob_column_values( source_ds: &Dataset, column_name: &str, diff --git a/docs/cli.md b/docs/cli.md index e15d6af..835ff3d 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -6,8 +6,8 @@ omnigraph init --schema ./schema.pg ./repo.omni omnigraph load --data ./data.jsonl --mode overwrite ./repo.omni omnigraph snapshot ./repo.omni --branch main --json -omnigraph read ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}' -omnigraph change ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}' +omnigraph read --uri ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}' +omnigraph change --uri ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}' ``` ## Branching And Reviewable Data Flows