mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Fix CLI ergonomics and stream export output
This commit is contained in:
parent
af7a74bf2c
commit
4b058b9813
8 changed files with 291 additions and 149 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -4588,6 +4588,7 @@ dependencies = [
|
|||
"cedar-policy",
|
||||
"clap",
|
||||
"color-eyre",
|
||||
"futures",
|
||||
"omnigraph",
|
||||
"omnigraph-compiler",
|
||||
"serde",
|
||||
|
|
|
|||
|
|
@ -111,8 +111,8 @@ Core repo flow:
|
|||
omnigraph init --schema ./schema.pg ./repo.omni
|
||||
omnigraph load --data ./data.jsonl --mode overwrite ./repo.omni
|
||||
omnigraph snapshot ./repo.omni --branch main --json
|
||||
omnigraph read ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}'
|
||||
omnigraph change ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}'
|
||||
omnigraph read --uri ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}'
|
||||
omnigraph change --uri ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}'
|
||||
omnigraph branch create --uri ./repo.omni --from main feature-x
|
||||
omnigraph branch merge --uri ./repo.omni feature-x --into main
|
||||
```
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::fs;
|
||||
use std::io::{self, Write};
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
|
|
@ -125,7 +126,7 @@ enum Command {
|
|||
config: Option<PathBuf>,
|
||||
#[arg(long)]
|
||||
branch: Option<String>,
|
||||
#[arg(long)]
|
||||
#[arg(long, hide = true)]
|
||||
jsonl: bool,
|
||||
#[arg(long = "type")]
|
||||
type_names: Vec<String>,
|
||||
|
|
@ -145,7 +146,10 @@ enum Command {
|
|||
/// Execute a read query against a branch or snapshot
|
||||
Read {
|
||||
/// Repo URI
|
||||
#[arg(long)]
|
||||
uri: Option<String>,
|
||||
#[arg(hide = true)]
|
||||
legacy_uri: Option<String>,
|
||||
#[arg(long)]
|
||||
target: Option<String>,
|
||||
#[arg(long)]
|
||||
|
|
@ -172,7 +176,10 @@ enum Command {
|
|||
/// Execute a graph change query against a branch
|
||||
Change {
|
||||
/// Repo URI
|
||||
#[arg(long)]
|
||||
uri: Option<String>,
|
||||
#[arg(hide = true)]
|
||||
legacy_uri: Option<String>,
|
||||
#[arg(long)]
|
||||
target: Option<String>,
|
||||
#[arg(long)]
|
||||
|
|
@ -347,6 +354,7 @@ enum CommitCommand {
|
|||
/// Show a graph commit
|
||||
Show {
|
||||
/// Repo URI
|
||||
#[arg(long)]
|
||||
uri: Option<String>,
|
||||
#[arg(long)]
|
||||
target: Option<String>,
|
||||
|
|
@ -428,6 +436,8 @@ struct LoadOutput<'a> {
|
|||
mode: &'a str,
|
||||
nodes_loaded: usize,
|
||||
edges_loaded: usize,
|
||||
node_types_loaded: usize,
|
||||
edge_types_loaded: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
|
@ -644,31 +654,6 @@ async fn remote_json<T: DeserializeOwned>(
|
|||
Ok(serde_json::from_str(&text)?)
|
||||
}
|
||||
|
||||
async fn remote_text(
|
||||
client: &reqwest::Client,
|
||||
method: Method,
|
||||
url: String,
|
||||
body: Option<Value>,
|
||||
bearer_token: Option<&str>,
|
||||
) -> Result<String> {
|
||||
let request = apply_bearer_token(client.request(method, url), bearer_token);
|
||||
let request = if let Some(body) = body {
|
||||
request.json(&body)
|
||||
} else {
|
||||
request
|
||||
};
|
||||
let response = request.send().await?;
|
||||
let status = response.status();
|
||||
let text = response.text().await?;
|
||||
if !status.is_success() {
|
||||
if let Ok(error) = serde_json::from_str::<ErrorOutput>(&text) {
|
||||
bail!(error.error);
|
||||
}
|
||||
bail!("server returned {}: {}", status, text);
|
||||
}
|
||||
Ok(text)
|
||||
}
|
||||
|
||||
fn resolve_uri(
|
||||
config: &OmnigraphConfig,
|
||||
cli_uri: Option<String>,
|
||||
|
|
@ -784,14 +769,18 @@ fn print_load_human(
|
|||
mode: CliLoadMode,
|
||||
nodes_loaded: usize,
|
||||
edges_loaded: usize,
|
||||
node_types_loaded: usize,
|
||||
edge_types_loaded: usize,
|
||||
) {
|
||||
println!(
|
||||
"loaded {} on branch {} with {}: {} node types, {} edge types",
|
||||
"loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types",
|
||||
uri,
|
||||
branch,
|
||||
mode.as_str(),
|
||||
nodes_loaded,
|
||||
edges_loaded
|
||||
node_types_loaded,
|
||||
edges_loaded,
|
||||
edge_types_loaded
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -1148,10 +1137,9 @@ fn resolve_alias<'a>(
|
|||
Ok(Some((alias_name, alias)))
|
||||
}
|
||||
|
||||
fn normalize_alias_args(
|
||||
fn normalize_legacy_alias_uri(
|
||||
uri: Option<String>,
|
||||
target: Option<&str>,
|
||||
default_target_present: bool,
|
||||
target_available: bool,
|
||||
alias_name: Option<&str>,
|
||||
mut alias_args: Vec<String>,
|
||||
) -> (Option<String>, Vec<String>) {
|
||||
|
|
@ -1159,12 +1147,7 @@ fn normalize_alias_args(
|
|||
return (None, alias_args);
|
||||
};
|
||||
|
||||
if alias_name.is_some()
|
||||
&& (target.is_some() || default_target_present)
|
||||
&& !is_remote_uri(&candidate)
|
||||
&& !candidate.contains(std::path::MAIN_SEPARATOR)
|
||||
&& !Path::new(&candidate).exists()
|
||||
{
|
||||
if alias_name.is_some() && target_available {
|
||||
alias_args.insert(0, candidate);
|
||||
return (None, alias_args);
|
||||
}
|
||||
|
|
@ -1392,36 +1375,53 @@ async fn execute_change_remote(
|
|||
.await
|
||||
}
|
||||
|
||||
async fn execute_export(
|
||||
async fn execute_export_to_writer<W: Write>(
|
||||
uri: &str,
|
||||
branch: &str,
|
||||
type_names: &[String],
|
||||
table_keys: &[String],
|
||||
) -> Result<String> {
|
||||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
let db = Omnigraph::open(uri).await?;
|
||||
Ok(db.export_jsonl(branch, type_names, table_keys).await?)
|
||||
db.export_jsonl_to_writer(branch, type_names, table_keys, writer)
|
||||
.await?;
|
||||
writer.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn execute_export_remote(
|
||||
async fn execute_export_remote_to_writer<W: Write>(
|
||||
client: &reqwest::Client,
|
||||
uri: &str,
|
||||
branch: &str,
|
||||
type_names: &[String],
|
||||
table_keys: &[String],
|
||||
bearer_token: Option<&str>,
|
||||
) -> Result<String> {
|
||||
remote_text(
|
||||
client,
|
||||
Method::POST,
|
||||
remote_url(uri, "/export"),
|
||||
Some(serde_json::to_value(ExportRequest {
|
||||
branch: Some(branch.to_string()),
|
||||
type_names: type_names.to_vec(),
|
||||
table_keys: table_keys.to_vec(),
|
||||
})?),
|
||||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
let request = apply_bearer_token(
|
||||
client.request(Method::POST, remote_url(uri, "/export")),
|
||||
bearer_token,
|
||||
)
|
||||
.await
|
||||
.json(&ExportRequest {
|
||||
branch: Some(branch.to_string()),
|
||||
type_names: type_names.to_vec(),
|
||||
table_keys: table_keys.to_vec(),
|
||||
});
|
||||
let mut response = request.send().await?;
|
||||
let status = response.status();
|
||||
if !status.is_success() {
|
||||
let text = response.text().await?;
|
||||
if let Ok(error) = serde_json::from_str::<ErrorOutput>(&text) {
|
||||
bail!(error.error);
|
||||
}
|
||||
bail!("server returned {}: {}", status, text);
|
||||
}
|
||||
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
writer.write_all(&chunk)?;
|
||||
}
|
||||
writer.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
|
@ -1479,8 +1479,10 @@ async fn main() -> Result<()> {
|
|||
uri: &uri,
|
||||
branch: &branch,
|
||||
mode: mode.as_str(),
|
||||
nodes_loaded: result.nodes_loaded.len(),
|
||||
edges_loaded: result.edges_loaded.len(),
|
||||
nodes_loaded: result.nodes_loaded.values().sum(),
|
||||
edges_loaded: result.edges_loaded.values().sum(),
|
||||
node_types_loaded: result.nodes_loaded.len(),
|
||||
edge_types_loaded: result.edges_loaded.len(),
|
||||
};
|
||||
if json {
|
||||
print_json(&payload)?;
|
||||
|
|
@ -1491,6 +1493,8 @@ async fn main() -> Result<()> {
|
|||
mode,
|
||||
payload.nodes_loaded,
|
||||
payload.edges_loaded,
|
||||
payload.node_types_loaded,
|
||||
payload.edge_types_loaded,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -1831,7 +1835,7 @@ async fn main() -> Result<()> {
|
|||
target,
|
||||
config,
|
||||
branch,
|
||||
jsonl: _,
|
||||
jsonl,
|
||||
type_names,
|
||||
table_keys,
|
||||
} => {
|
||||
|
|
@ -1840,20 +1844,27 @@ async fn main() -> Result<()> {
|
|||
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
|
||||
let uri = resolve_uri(&config, uri, target.as_deref())?;
|
||||
let branch = resolve_branch(&config, branch, None, "main");
|
||||
let output = if is_remote_uri(&uri) {
|
||||
execute_export_remote(
|
||||
if jsonl {
|
||||
eprintln!("warning: --jsonl is deprecated; `omnigraph export` always emits JSONL");
|
||||
}
|
||||
|
||||
let stdout = io::stdout();
|
||||
let mut stdout = stdout.lock();
|
||||
if is_remote_uri(&uri) {
|
||||
execute_export_remote_to_writer(
|
||||
&http_client,
|
||||
&uri,
|
||||
&branch,
|
||||
&type_names,
|
||||
&table_keys,
|
||||
bearer_token.as_deref(),
|
||||
&mut stdout,
|
||||
)
|
||||
.await?
|
||||
.await?;
|
||||
} else {
|
||||
execute_export(&uri, &branch, &type_names, &table_keys).await?
|
||||
};
|
||||
print!("{output}");
|
||||
execute_export_to_writer(&uri, &branch, &type_names, &table_keys, &mut stdout)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Command::Run { command } => match command {
|
||||
RunCommand::List {
|
||||
|
|
@ -1984,6 +1995,7 @@ async fn main() -> Result<()> {
|
|||
},
|
||||
Command::Read {
|
||||
uri,
|
||||
legacy_uri,
|
||||
target,
|
||||
config,
|
||||
alias,
|
||||
|
|
@ -2004,13 +2016,14 @@ async fn main() -> Result<()> {
|
|||
let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Read)?;
|
||||
let alias_name = alias.as_ref().map(|(name, _)| *name);
|
||||
let alias_config = alias.as_ref().map(|(_, alias)| *alias);
|
||||
let (uri, alias_args) = normalize_alias_args(
|
||||
uri,
|
||||
target.as_deref(),
|
||||
config.cli_target_name().is_some(),
|
||||
alias_name,
|
||||
alias_args,
|
||||
);
|
||||
let target_available = target.is_some()
|
||||
|| alias_config
|
||||
.and_then(|alias| alias.target.as_deref())
|
||||
.is_some()
|
||||
|| config.cli_target_name().is_some();
|
||||
let (legacy_uri, alias_args) =
|
||||
normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args);
|
||||
let uri = uri.or(legacy_uri);
|
||||
let target_name = target
|
||||
.as_deref()
|
||||
.or_else(|| alias_config.and_then(|alias| alias.target.as_deref()));
|
||||
|
|
@ -2067,6 +2080,7 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
Command::Change {
|
||||
uri,
|
||||
legacy_uri,
|
||||
target,
|
||||
config,
|
||||
alias,
|
||||
|
|
@ -2085,13 +2099,14 @@ async fn main() -> Result<()> {
|
|||
let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Change)?;
|
||||
let alias_name = alias.as_ref().map(|(name, _)| *name);
|
||||
let alias_config = alias.as_ref().map(|(_, alias)| *alias);
|
||||
let (uri, alias_args) = normalize_alias_args(
|
||||
uri,
|
||||
target.as_deref(),
|
||||
config.cli_target_name().is_some(),
|
||||
alias_name,
|
||||
alias_args,
|
||||
);
|
||||
let target_available = target.is_some()
|
||||
|| alias_config
|
||||
.and_then(|alias| alias.target.as_deref())
|
||||
.is_some()
|
||||
|| config.cli_target_name().is_some();
|
||||
let (legacy_uri, alias_args) =
|
||||
normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args);
|
||||
let uri = uri.or(legacy_uri);
|
||||
let target_name = target
|
||||
.as_deref()
|
||||
.or_else(|| alias_config.and_then(|alias| alias.target.as_deref()));
|
||||
|
|
|
|||
|
|
@ -318,8 +318,10 @@ fn load_json_outputs_summary_for_main_branch() {
|
|||
|
||||
assert_eq!(payload["branch"], "main");
|
||||
assert_eq!(payload["mode"], "overwrite");
|
||||
assert_eq!(payload["nodes_loaded"], 2);
|
||||
assert_eq!(payload["edges_loaded"], 2);
|
||||
assert_eq!(payload["nodes_loaded"], 6);
|
||||
assert_eq!(payload["edges_loaded"], 5);
|
||||
assert_eq!(payload["node_types_loaded"], 2);
|
||||
assert_eq!(payload["edge_types_loaded"], 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -361,7 +363,7 @@ fn load_into_feature_branch_with_merge_mode_succeeds() {
|
|||
|
||||
assert!(stdout.contains("branch feature"));
|
||||
assert!(stdout.contains("with merge"));
|
||||
assert!(stdout.contains("1 node types"));
|
||||
assert!(stdout.contains("1 nodes across 1 node types"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -629,6 +631,46 @@ fn read_alias_from_yaml_config_runs_with_kv_output() {
|
|||
assert!(stdout.contains("p.name: Alice"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let config = temp.path().join("omnigraph.yaml");
|
||||
let query = temp.path().join("aliases.gq");
|
||||
let data = temp.path().join("url-like.jsonl");
|
||||
init_repo(&repo);
|
||||
write_jsonl(
|
||||
&data,
|
||||
r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#,
|
||||
);
|
||||
output_success(cli().arg("load").arg("--data").arg(&data).arg(&repo));
|
||||
write_query_file(
|
||||
&query,
|
||||
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
|
||||
);
|
||||
write_config(
|
||||
&config,
|
||||
&format!(
|
||||
"targets:\n local:\n uri: '{}'\nquery:\n roots:\n - .\npolicy: {{}}\naliases:\n owner:\n command: read\n query: aliases.gq\n name: get_person\n args: [name]\n target: local\n format: kv\n",
|
||||
repo.to_string_lossy()
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("read")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--alias")
|
||||
.arg("owner")
|
||||
.arg("https://example.com"),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert!(stdout.contains("row 1"));
|
||||
assert!(stdout.contains("p.name: https://example.com"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn change_alias_from_yaml_config_persists_changes() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
|
@ -1202,6 +1244,35 @@ fn snapshot_human_output_includes_branch_and_table_summaries() {
|
|||
assert!(stdout.contains("edge:Knows v"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn commit_show_accepts_long_uri_flag() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
init_repo(&repo);
|
||||
load_fixture(&repo);
|
||||
|
||||
let list = output_success(cli().arg("commit").arg("list").arg(&repo).arg("--json"));
|
||||
let list_payload: Value = serde_json::from_slice(&list.stdout).unwrap();
|
||||
let commit_id = list_payload["commits"][0]["graph_commit_id"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("commit")
|
||||
.arg("show")
|
||||
.arg("--uri")
|
||||
.arg(&repo)
|
||||
.arg(&commit_id)
|
||||
.arg("--json"),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
|
||||
assert_eq!(payload["graph_commit_id"], commit_id);
|
||||
assert!(payload["manifest_version"].as_u64().unwrap() >= 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_fails_for_missing_repo() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ tracing = { workspace = true }
|
|||
tracing-subscriber = { workspace = true }
|
||||
tower-http = { workspace = true }
|
||||
cedar-policy = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ pub mod policy;
|
|||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
|
@ -14,6 +16,7 @@ use api::{
|
|||
IngestRequest, ReadOutput, ReadRequest, RunListOutput, SnapshotQuery, ingest_output,
|
||||
snapshot_payload,
|
||||
};
|
||||
use axum::body::{Body, Bytes};
|
||||
use axum::extract::DefaultBodyLimit;
|
||||
use axum::extract::{Extension, Path, Query, Request, State};
|
||||
use axum::http::StatusCode;
|
||||
|
|
@ -28,6 +31,7 @@ pub use config::{
|
|||
ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
|
||||
load_config,
|
||||
};
|
||||
use futures::stream;
|
||||
use omnigraph::db::{Omnigraph, ReadTarget, RunId};
|
||||
use omnigraph::error::{ManifestErrorKind, OmniError};
|
||||
use omnigraph_compiler::json_params_to_param_map;
|
||||
|
|
@ -39,7 +43,7 @@ pub use policy::{
|
|||
};
|
||||
use serde_json::Value;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::{error, info};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
|
@ -67,6 +71,23 @@ pub struct AppState {
|
|||
#[derive(Debug, Clone)]
|
||||
struct AuthenticatedActor(Arc<str>);
|
||||
|
||||
struct ExportStreamWriter {
|
||||
sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
|
||||
}
|
||||
|
||||
impl Write for ExportStreamWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.sender
|
||||
.send(Ok(Bytes::copy_from_slice(buf)))
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AuthenticatedActor {
|
||||
fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
|
|
@ -567,16 +588,28 @@ async fn server_export(
|
|||
target_branch: None,
|
||||
},
|
||||
)?;
|
||||
let payload = {
|
||||
let db = Arc::clone(&state.db).read_owned().await;
|
||||
db.export_jsonl(&branch, &request.type_names, &request.table_keys)
|
||||
.await
|
||||
.map_err(ApiError::from_omni)?
|
||||
};
|
||||
let db = Arc::clone(&state.db);
|
||||
let type_names = request.type_names.clone();
|
||||
let table_keys = request.table_keys.clone();
|
||||
let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
|
||||
tokio::spawn(async move {
|
||||
let result = {
|
||||
let db = db.read().await;
|
||||
let mut writer = ExportStreamWriter { sender: tx.clone() };
|
||||
db.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
|
||||
.await
|
||||
};
|
||||
if let Err(err) = result {
|
||||
let _ = tx.send(Err(io::Error::other(err.to_string())));
|
||||
}
|
||||
});
|
||||
let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
|
||||
rx.recv().await.map(|item| (item, rx))
|
||||
}));
|
||||
Ok((
|
||||
StatusCode::OK,
|
||||
[(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
|
||||
payload,
|
||||
body,
|
||||
)
|
||||
.into_response())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::{
|
||||
|
|
@ -368,9 +369,23 @@ impl Omnigraph {
|
|||
type_names: &[String],
|
||||
table_keys: &[String],
|
||||
) -> Result<String> {
|
||||
let mut out = Vec::new();
|
||||
self.export_jsonl_to_writer(branch, type_names, table_keys, &mut out)
|
||||
.await?;
|
||||
String::from_utf8(out)
|
||||
.map_err(|err| OmniError::manifest(format!("export produced invalid UTF-8: {}", err)))
|
||||
}
|
||||
|
||||
pub async fn export_jsonl_to_writer<W: Write>(
|
||||
&self,
|
||||
branch: &str,
|
||||
type_names: &[String],
|
||||
table_keys: &[String],
|
||||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
let snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
|
||||
self.export_snapshot_jsonl(&snapshot, type_names, table_keys)
|
||||
self.export_snapshot_jsonl_to_writer(&snapshot, type_names, table_keys, writer)
|
||||
.await
|
||||
}
|
||||
|
||||
|
|
@ -399,26 +414,19 @@ impl Omnigraph {
|
|||
Ok(Some(record_batch_row_to_json(batch, 0)?))
|
||||
}
|
||||
|
||||
async fn export_snapshot_jsonl(
|
||||
async fn export_snapshot_jsonl_to_writer<W: Write>(
|
||||
&self,
|
||||
snapshot: &Snapshot,
|
||||
type_names: &[String],
|
||||
table_keys: &[String],
|
||||
) -> Result<String> {
|
||||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
let selected_tables = self.export_table_keys(snapshot, type_names, table_keys)?;
|
||||
let mut out = String::new();
|
||||
for table_key in selected_tables {
|
||||
for row in self.export_table_rows(snapshot, &table_key).await? {
|
||||
out.push_str(&serde_json::to_string(&row).map_err(|err| {
|
||||
OmniError::manifest(format!(
|
||||
"failed to serialize export row for '{}': {}",
|
||||
table_key, err
|
||||
))
|
||||
})?);
|
||||
out.push('\n');
|
||||
}
|
||||
self.export_table_to_writer(snapshot, &table_key, writer)
|
||||
.await?;
|
||||
}
|
||||
Ok(out)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export_table_keys(
|
||||
|
|
@ -470,11 +478,12 @@ impl Omnigraph {
|
|||
Ok(selected.into_iter().collect())
|
||||
}
|
||||
|
||||
async fn export_table_rows(
|
||||
async fn export_table_to_writer<W: Write>(
|
||||
&self,
|
||||
snapshot: &Snapshot,
|
||||
table_key: &str,
|
||||
) -> Result<Vec<serde_json::Value>> {
|
||||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
let ds = self
|
||||
.table_store
|
||||
.open_snapshot_table(snapshot, table_key)
|
||||
|
|
@ -483,46 +492,36 @@ impl Omnigraph {
|
|||
let blob_properties = blob_properties_for_table_key(self.catalog(), table_key)?;
|
||||
|
||||
if blob_properties.is_empty() {
|
||||
let batch = concat_or_empty_batches(
|
||||
schema_for_table_key(self.catalog(), table_key)?,
|
||||
self.table_store.scan(&ds, None, None, ordering).await?,
|
||||
)?;
|
||||
return self.export_rows_from_batch(table_key, &batch, None).await;
|
||||
for batch in self.table_store.scan(&ds, None, None, ordering).await? {
|
||||
self.write_export_rows_from_batch(table_key, &batch, None, writer)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let batches = self
|
||||
.table_store
|
||||
.scan_with(&ds, None, None, ordering, true, |_| Ok(()))
|
||||
.await?;
|
||||
if batches.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
for batch in batches {
|
||||
let row_ids = batch
|
||||
.column_by_name("_rowid")
|
||||
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"expected _rowid column when exporting '{}'",
|
||||
table_key
|
||||
))
|
||||
})?
|
||||
.values()
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
let blob_values = self
|
||||
.export_blob_values(&ds, &batch, &row_ids, blob_properties)
|
||||
.await?;
|
||||
self.write_export_rows_from_batch(table_key, &batch, Some(&blob_values), writer)?;
|
||||
}
|
||||
|
||||
let scan_schema = batches[0].schema();
|
||||
let batch = if batches.len() == 1 {
|
||||
batches.into_iter().next().unwrap()
|
||||
} else {
|
||||
arrow_select::concat::concat_batches(&scan_schema, &batches)
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
};
|
||||
let row_ids = batch
|
||||
.column_by_name("_rowid")
|
||||
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"expected _rowid column when exporting '{}'",
|
||||
table_key
|
||||
))
|
||||
})?
|
||||
.values()
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
let blob_values = self
|
||||
.export_blob_values(&ds, &batch, &row_ids, blob_properties)
|
||||
.await?;
|
||||
self.export_rows_from_batch(table_key, &batch, Some(&blob_values))
|
||||
.await
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ─── Graph index ──────────────────────────────────────────────────────
|
||||
|
|
@ -1755,18 +1754,18 @@ impl Omnigraph {
|
|||
Ok(values)
|
||||
}
|
||||
|
||||
async fn export_rows_from_batch(
|
||||
fn write_export_rows_from_batch<W: Write>(
|
||||
&self,
|
||||
table_key: &str,
|
||||
batch: &RecordBatch,
|
||||
blob_values: Option<&HashMap<String, Vec<Option<String>>>>,
|
||||
) -> Result<Vec<serde_json::Value>> {
|
||||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
if let Some(type_name) = table_key.strip_prefix("node:") {
|
||||
let node_type =
|
||||
self.catalog.node_types.get(type_name).ok_or_else(|| {
|
||||
OmniError::manifest(format!("unknown node type '{}'", type_name))
|
||||
})?;
|
||||
let mut rows = Vec::with_capacity(batch.num_rows());
|
||||
for row in 0..batch.num_rows() {
|
||||
let mut data = serde_json::Map::new();
|
||||
data.insert(
|
||||
|
|
@ -1784,12 +1783,16 @@ impl Omnigraph {
|
|||
)?,
|
||||
);
|
||||
}
|
||||
rows.push(serde_json::json!({
|
||||
write_export_jsonl_row(
|
||||
writer,
|
||||
table_key,
|
||||
&serde_json::json!({
|
||||
"type": type_name,
|
||||
"data": serde_json::Value::Object(data),
|
||||
}));
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
return Ok(rows);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(edge_name) = table_key.strip_prefix("edge:") {
|
||||
|
|
@ -1797,7 +1800,6 @@ impl Omnigraph {
|
|||
self.catalog.edge_types.get(edge_name).ok_or_else(|| {
|
||||
OmniError::manifest(format!("unknown edge type '{}'", edge_name))
|
||||
})?;
|
||||
let mut rows = Vec::with_capacity(batch.num_rows());
|
||||
for row in 0..batch.num_rows() {
|
||||
let from = named_string_value(batch, "src", row)?;
|
||||
let to = named_string_value(batch, "dst", row)?;
|
||||
|
|
@ -1817,14 +1819,18 @@ impl Omnigraph {
|
|||
)?,
|
||||
);
|
||||
}
|
||||
rows.push(serde_json::json!({
|
||||
write_export_jsonl_row(
|
||||
writer,
|
||||
table_key,
|
||||
&serde_json::json!({
|
||||
"edge": edge_name,
|
||||
"from": from,
|
||||
"to": to,
|
||||
"data": serde_json::Value::Object(data),
|
||||
}));
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
return Ok(rows);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(OmniError::manifest(format!(
|
||||
|
|
@ -1834,6 +1840,21 @@ impl Omnigraph {
|
|||
}
|
||||
}
|
||||
|
||||
fn write_export_jsonl_row<W: Write>(
|
||||
writer: &mut W,
|
||||
table_key: &str,
|
||||
row: &serde_json::Value,
|
||||
) -> Result<()> {
|
||||
serde_json::to_writer(&mut *writer, row).map_err(|err| {
|
||||
OmniError::manifest(format!(
|
||||
"failed to serialize export row for '{}': {}",
|
||||
table_key, err
|
||||
))
|
||||
})?;
|
||||
writer.write_all(b"\n")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn export_blob_column_values(
|
||||
source_ds: &Dataset,
|
||||
column_name: &str,
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@
|
|||
omnigraph init --schema ./schema.pg ./repo.omni
|
||||
omnigraph load --data ./data.jsonl --mode overwrite ./repo.omni
|
||||
omnigraph snapshot ./repo.omni --branch main --json
|
||||
omnigraph read ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}'
|
||||
omnigraph change ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}'
|
||||
omnigraph read --uri ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}'
|
||||
omnigraph change --uri ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}'
|
||||
```
|
||||
|
||||
## Branching And Reviewable Data Flows
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue