mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Merge pull request #5 from ModernRelay/codex-mr-428-schema-apply-v1
Add schema apply command and policy support
This commit is contained in:
commit
e7658836a8
22 changed files with 1903 additions and 146 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -4547,6 +4547,7 @@ dependencies = [
|
|||
"assert_cmd",
|
||||
"clap",
|
||||
"color-eyre",
|
||||
"lance-index",
|
||||
"omnigraph",
|
||||
"omnigraph-compiler",
|
||||
"omnigraph-server",
|
||||
|
|
@ -4589,6 +4590,7 @@ dependencies = [
|
|||
"clap",
|
||||
"color-eyre",
|
||||
"futures",
|
||||
"lance-index",
|
||||
"omnigraph",
|
||||
"omnigraph-compiler",
|
||||
"serde",
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ cargo run -p omnigraph-cli -- read \
|
|||
```
|
||||
|
||||
Server routes include `/healthz`, `/snapshot`, `/export`, `/read`, `/change`,
|
||||
`/ingest`, `/branches`, `/runs`, and `/commits`.
|
||||
`/schema/apply`, `/ingest`, `/branches`, `/runs`, and `/commits`.
|
||||
|
||||
To require auth, set `OMNIGRAPH_SERVER_BEARER_TOKEN` on the server and set the
|
||||
matching bearer token env var in your CLI target config.
|
||||
|
|
@ -110,6 +110,7 @@ Core repo flow:
|
|||
```bash
|
||||
omnigraph init --schema ./schema.pg ./repo.omni
|
||||
omnigraph load --data ./data.jsonl --mode overwrite ./repo.omni
|
||||
omnigraph schema apply --schema ./next.pg ./repo.omni --json
|
||||
omnigraph snapshot ./repo.omni --branch main --json
|
||||
omnigraph read --uri ./repo.omni --query ./queries.gq --name get_person --params '{"name":"Alice"}'
|
||||
omnigraph change --uri ./repo.omni --query ./queries.gq --name insert_person --params '{"name":"Mina","age":28}'
|
||||
|
|
|
|||
|
|
@ -26,3 +26,4 @@ assert_cmd = "2"
|
|||
predicates = "3"
|
||||
serde_json = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
lance-index = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -14,8 +14,9 @@ use omnigraph_server::api::{
|
|||
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
|
||||
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
|
||||
CommitOutput, ErrorOutput, ExportRequest, IngestOutput, IngestRequest, ReadOutput, ReadRequest,
|
||||
RunListOutput, RunOutput, SnapshotOutput, SnapshotTableOutput, commit_output, ingest_output,
|
||||
read_output, run_output, snapshot_payload,
|
||||
RunListOutput, RunOutput, SchemaApplyOutput, SchemaApplyRequest, SnapshotOutput,
|
||||
SnapshotTableOutput, commit_output, ingest_output, read_output, run_output,
|
||||
schema_apply_output, snapshot_payload,
|
||||
};
|
||||
use omnigraph_server::{
|
||||
AliasCommand, OmnigraphConfig, PolicyAction, PolicyDecision, PolicyEngine, PolicyRequest,
|
||||
|
|
@ -280,6 +281,19 @@ enum SchemaCommand {
|
|||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Apply a supported schema migration
|
||||
Apply {
|
||||
/// Repo URI
|
||||
uri: Option<String>,
|
||||
#[arg(long)]
|
||||
target: Option<String>,
|
||||
#[arg(long)]
|
||||
config: Option<PathBuf>,
|
||||
#[arg(long)]
|
||||
schema: PathBuf,
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
|
|
@ -448,6 +462,20 @@ struct SchemaPlanOutput<'a> {
|
|||
steps: &'a [SchemaMigrationStep],
|
||||
}
|
||||
|
||||
fn print_schema_apply_human(output: &SchemaApplyOutput) {
|
||||
println!("schema apply for {}", output.uri);
|
||||
println!("supported: {}", if output.supported { "yes" } else { "no" });
|
||||
println!("applied: {}", if output.applied { "yes" } else { "no" });
|
||||
println!("manifest_version: {}", output.manifest_version);
|
||||
if output.steps.is_empty() {
|
||||
println!("no schema changes");
|
||||
return;
|
||||
}
|
||||
for step in &output.steps {
|
||||
println!("- {}", render_schema_plan_step(step));
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_local_repo_parent(uri: &str) -> Result<()> {
|
||||
if !uri.contains("://") {
|
||||
fs::create_dir_all(uri)?;
|
||||
|
|
@ -1796,6 +1824,39 @@ async fn main() -> Result<()> {
|
|||
print_schema_plan_human(&uri, &plan);
|
||||
}
|
||||
}
|
||||
SchemaCommand::Apply {
|
||||
uri,
|
||||
target,
|
||||
config,
|
||||
schema,
|
||||
json,
|
||||
} => {
|
||||
let config = load_cli_config(config.as_ref())?;
|
||||
let bearer_token =
|
||||
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
|
||||
let uri = resolve_uri(&config, uri, target.as_deref())?;
|
||||
let schema_source = fs::read_to_string(&schema)?;
|
||||
let output = if is_remote_uri(&uri) {
|
||||
remote_json::<SchemaApplyOutput>(
|
||||
&http_client,
|
||||
Method::POST,
|
||||
remote_url(&uri, "/schema/apply"),
|
||||
Some(serde_json::to_value(SchemaApplyRequest {
|
||||
schema_source: schema_source.clone(),
|
||||
})?),
|
||||
bearer_token.as_deref(),
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
let mut db = Omnigraph::open(&uri).await?;
|
||||
schema_apply_output(&uri, db.apply_schema(&schema_source).await?)
|
||||
};
|
||||
if json {
|
||||
print_json(&output)?;
|
||||
} else {
|
||||
print_schema_apply_human(&output);
|
||||
}
|
||||
}
|
||||
},
|
||||
Command::Snapshot {
|
||||
uri,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use std::fs;
|
||||
|
||||
use lance_index::traits::DatasetIndexExt;
|
||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||
use serde_json::Value;
|
||||
use tempfile::tempdir;
|
||||
|
|
@ -299,6 +300,243 @@ fn schema_plan_json_reports_unsupported_type_change() {
|
|||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_applies_supported_migration() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = temp.path().join("next.pg");
|
||||
init_repo(&repo);
|
||||
|
||||
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&repo),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
|
||||
assert_eq!(payload["supported"], true);
|
||||
assert_eq!(payload["applied"], true);
|
||||
assert_eq!(payload["step_count"], 1);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(repo.to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
assert!(
|
||||
db.catalog().node_types["Person"]
|
||||
.properties
|
||||
.contains_key("nickname")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_human_reports_noop() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = fixture("test.pg");
|
||||
init_repo(&repo);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg(&repo),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert!(stdout.contains("applied: no"));
|
||||
assert!(stdout.contains("no schema changes"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_renames_type_and_updates_snapshot() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = temp.path().join("rename.pg");
|
||||
init_repo(&repo);
|
||||
|
||||
let renamed_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
|
||||
.replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
|
||||
.replace(
|
||||
"edge WorksAt: Person -> Company",
|
||||
"edge WorksAt: Human -> Company",
|
||||
);
|
||||
fs::write(&schema_path, renamed_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&repo),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(repo.to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
let snapshot = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(db.snapshot_of(ReadTarget::branch("main")))
|
||||
.unwrap();
|
||||
assert!(snapshot.entry("node:Human").is_some());
|
||||
assert!(snapshot.entry("node:Person").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_renames_property_and_updates_catalog() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = temp.path().join("rename-property.pg");
|
||||
init_repo(&repo);
|
||||
|
||||
let renamed_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "years: I32? @rename_from(\"age\")");
|
||||
fs::write(&schema_path, renamed_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&repo),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(repo.to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
let person = &db.catalog().node_types["Person"];
|
||||
assert!(person.properties.contains_key("years"));
|
||||
assert!(!person.properties.contains_key("age"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_adds_index_for_existing_property() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = temp.path().join("index.pg");
|
||||
init_repo(&repo);
|
||||
|
||||
let before_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let db = Omnigraph::open(repo.to_string_lossy().as_ref())
|
||||
.await
|
||||
.unwrap();
|
||||
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
let dataset = snapshot.open("node:Person").await.unwrap();
|
||||
dataset.load_indices().await.unwrap().len()
|
||||
});
|
||||
|
||||
let indexed_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("name: String @key", "name: String @key @index");
|
||||
fs::write(&schema_path, indexed_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&repo),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let after_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let db = Omnigraph::open(repo.to_string_lossy().as_ref())
|
||||
.await
|
||||
.unwrap();
|
||||
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
let dataset = snapshot.open("node:Person").await.unwrap();
|
||||
dataset.load_indices().await.unwrap().len()
|
||||
});
|
||||
assert!(after_index_count > before_index_count);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_rejects_unsupported_plan() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = temp.path().join("breaking.pg");
|
||||
init_repo(&repo);
|
||||
|
||||
let breaking_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "age: I64?");
|
||||
fs::write(&schema_path, breaking_schema).unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg(&repo),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("changing property type"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_rejects_when_non_main_branch_exists() {
|
||||
let temp = tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
let schema_path = temp.path().join("next.pg");
|
||||
init_repo(&repo);
|
||||
output_success(
|
||||
cli()
|
||||
.arg("branch")
|
||||
.arg("create")
|
||||
.arg("--from")
|
||||
.arg("main")
|
||||
.arg("--uri")
|
||||
.arg(&repo)
|
||||
.arg("feature"),
|
||||
);
|
||||
|
||||
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg(&repo),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("schema apply requires a repo with only main"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_json_outputs_summary_for_main_branch() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -110,6 +110,10 @@ pub fn write_config(path: &Path, source: &str) {
|
|||
fs::write(path, source).unwrap();
|
||||
}
|
||||
|
||||
pub fn write_file(path: &Path, source: &str) {
|
||||
fs::write(path, source).unwrap();
|
||||
}
|
||||
|
||||
fn yaml_string(value: &str) -> String {
|
||||
format!("'{}'", value.replace('\'', "''"))
|
||||
}
|
||||
|
|
@ -278,6 +282,12 @@ impl SystemRepo {
|
|||
path
|
||||
}
|
||||
|
||||
pub fn write_file(&self, name: &str, source: &str) -> PathBuf {
|
||||
let path = self.repo.parent().unwrap().join(name);
|
||||
write_file(&path, source);
|
||||
path
|
||||
}
|
||||
|
||||
pub fn spawn_server(&self) -> TestServer {
|
||||
spawn_server(&self.repo)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ mod support;
|
|||
|
||||
use std::fs;
|
||||
|
||||
use omnigraph::db::Omnigraph;
|
||||
use reqwest::blocking::Client;
|
||||
use serde_json::json;
|
||||
|
||||
|
|
@ -217,6 +218,106 @@ query insert_person($name: String, $age: I32) {
|
|||
assert!(runs_payload["runs"].as_array().unwrap().len() >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "requires loopback socket permissions in sandboxed runners"]
|
||||
fn remote_schema_apply_via_cli_updates_repo() {
|
||||
let repo = SystemRepo::initialized();
|
||||
let server = repo.spawn_server();
|
||||
let config = repo.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url));
|
||||
let next_schema = repo.write_file(
|
||||
"next.pg",
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
),
|
||||
);
|
||||
|
||||
let payload = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--schema")
|
||||
.arg(&next_schema)
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(repo.path().to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
assert!(
|
||||
db.catalog().node_types["Person"]
|
||||
.properties
|
||||
.contains_key("nickname")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "requires loopback socket permissions in sandboxed runners"]
|
||||
fn remote_schema_apply_rejects_unsupported_plan() {
|
||||
let repo = SystemRepo::initialized();
|
||||
let server = repo.spawn_server();
|
||||
let config = repo.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url));
|
||||
let breaking_schema = repo.write_file(
|
||||
"breaking.pg",
|
||||
&fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "age: I64?"),
|
||||
);
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--schema")
|
||||
.arg(&breaking_schema),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("changing property type"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "requires loopback socket permissions in sandboxed runners"]
|
||||
fn remote_schema_apply_rejects_when_non_main_branch_exists() {
|
||||
let repo = SystemRepo::initialized();
|
||||
output_success(
|
||||
cli()
|
||||
.arg("branch")
|
||||
.arg("create")
|
||||
.arg("--from")
|
||||
.arg("main")
|
||||
.arg("--uri")
|
||||
.arg(repo.path())
|
||||
.arg("feature"),
|
||||
);
|
||||
let server = repo.spawn_server();
|
||||
let config = repo.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url));
|
||||
let next_schema = repo.write_file(
|
||||
"next.pg",
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--schema")
|
||||
.arg(&next_schema),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("schema apply requires a repo with only main"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "requires loopback socket permissions in sandboxed runners"]
|
||||
fn remote_read_preserves_projection_order_in_json_and_csv() {
|
||||
|
|
|
|||
|
|
@ -29,3 +29,4 @@ futures = { workspace = true }
|
|||
tempfile = { workspace = true }
|
||||
tower = { workspace = true }
|
||||
serial_test = "3"
|
||||
lance-index = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, RunRecord, Snapshot};
|
||||
use omnigraph::db::{
|
||||
GraphCommit, MergeOutcome, ReadTarget, RunRecord, SchemaApplyResult, Snapshot,
|
||||
};
|
||||
use omnigraph::error::{MergeConflict, MergeConflictKind};
|
||||
use omnigraph::loader::{IngestResult, LoadMode};
|
||||
use omnigraph_compiler::SchemaMigrationStep;
|
||||
use omnigraph_compiler::result::QueryResult;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
|
@ -243,6 +246,21 @@ pub struct ChangeRequest {
|
|||
pub branch: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SchemaApplyRequest {
|
||||
pub schema_source: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SchemaApplyOutput {
|
||||
pub uri: String,
|
||||
pub supported: bool,
|
||||
pub applied: bool,
|
||||
pub step_count: usize,
|
||||
pub manifest_version: u64,
|
||||
pub steps: Vec<SchemaMigrationStep>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IngestRequest {
|
||||
pub branch: Option<String>,
|
||||
|
|
@ -318,6 +336,17 @@ pub fn snapshot_payload(branch: &str, snapshot: &Snapshot) -> SnapshotOutput {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn schema_apply_output(uri: &str, result: SchemaApplyResult) -> SchemaApplyOutput {
|
||||
SchemaApplyOutput {
|
||||
uri: uri.to_string(),
|
||||
supported: result.supported,
|
||||
applied: result.applied,
|
||||
step_count: result.steps.len(),
|
||||
manifest_version: result.manifest_version,
|
||||
steps: result.steps,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_output(run: &RunRecord) -> RunOutput {
|
||||
RunOutput {
|
||||
run_id: run.run_id.as_str().to_string(),
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ use api::{
|
|||
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
|
||||
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
|
||||
CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, HealthOutput, IngestOutput,
|
||||
IngestRequest, ReadOutput, ReadRequest, RunListOutput, SnapshotQuery, ingest_output,
|
||||
snapshot_payload,
|
||||
IngestRequest, ReadOutput, ReadRequest, RunListOutput, SchemaApplyOutput, SchemaApplyRequest,
|
||||
SnapshotQuery, ingest_output, schema_apply_output, snapshot_payload,
|
||||
};
|
||||
use axum::body::{Body, Bytes};
|
||||
use axum::extract::DefaultBodyLimit;
|
||||
|
|
@ -362,6 +362,7 @@ pub fn build_app(state: AppState) -> Router {
|
|||
.route("/export", post(server_export))
|
||||
.route("/read", post(server_read))
|
||||
.route("/change", post(server_change))
|
||||
.route("/schema/apply", post(server_schema_apply))
|
||||
.route(
|
||||
"/ingest",
|
||||
post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
|
||||
|
|
@ -658,6 +659,31 @@ async fn server_change(
|
|||
}))
|
||||
}
|
||||
|
||||
async fn server_schema_apply(
|
||||
State(state): State<AppState>,
|
||||
actor: Option<Extension<AuthenticatedActor>>,
|
||||
Json(request): Json<SchemaApplyRequest>,
|
||||
) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
|
||||
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
|
||||
authorize_request(
|
||||
&state,
|
||||
actor.as_ref().map(|Extension(actor)| actor),
|
||||
PolicyRequest {
|
||||
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
|
||||
action: PolicyAction::SchemaApply,
|
||||
branch: None,
|
||||
target_branch: Some("main".to_string()),
|
||||
},
|
||||
)?;
|
||||
let result = {
|
||||
let mut db = Arc::clone(&state.db).write_owned().await;
|
||||
db.apply_schema(&request.schema_source)
|
||||
.await
|
||||
.map_err(ApiError::from_omni)?
|
||||
};
|
||||
Ok(Json(schema_apply_output(state.uri(), result)))
|
||||
}
|
||||
|
||||
async fn server_ingest(
|
||||
State(state): State<AppState>,
|
||||
actor: Option<Extension<AuthenticatedActor>>,
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ pub enum PolicyAction {
|
|||
Read,
|
||||
Export,
|
||||
Change,
|
||||
SchemaApply,
|
||||
BranchCreate,
|
||||
BranchDelete,
|
||||
BranchMerge,
|
||||
|
|
@ -33,6 +34,7 @@ impl PolicyAction {
|
|||
Self::Read => "read",
|
||||
Self::Export => "export",
|
||||
Self::Change => "change",
|
||||
Self::SchemaApply => "schema_apply",
|
||||
Self::BranchCreate => "branch_create",
|
||||
Self::BranchDelete => "branch_delete",
|
||||
Self::BranchMerge => "branch_merge",
|
||||
|
|
@ -50,6 +52,7 @@ impl PolicyAction {
|
|||
matches!(
|
||||
self,
|
||||
Self::BranchCreate
|
||||
| Self::SchemaApply
|
||||
| Self::BranchDelete
|
||||
| Self::BranchMerge
|
||||
| Self::RunPublish
|
||||
|
|
@ -72,6 +75,7 @@ impl FromStr for PolicyAction {
|
|||
"read" => Ok(Self::Read),
|
||||
"export" => Ok(Self::Export),
|
||||
"change" => Ok(Self::Change),
|
||||
"schema_apply" => Ok(Self::SchemaApply),
|
||||
"branch_create" => Ok(Self::BranchCreate),
|
||||
"branch_delete" => Ok(Self::BranchDelete),
|
||||
"branch_merge" => Ok(Self::BranchMerge),
|
||||
|
|
@ -591,6 +595,7 @@ namespace Omnigraph {
|
|||
action "read" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
action "export" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
action "change" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
action "schema_apply" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
action "branch_create" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
action "branch_delete" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
action "branch_merge" appliesTo { principal: Actor, resource: Repo, context: RequestContext };
|
||||
|
|
@ -809,4 +814,44 @@ rules:
|
|||
|
||||
engine.run_tests(&tests).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_uses_target_branch_scope() {
|
||||
let policy: PolicyConfig = serde_yaml::from_str(
|
||||
r#"
|
||||
version: 1
|
||||
groups:
|
||||
admins: [act-ragnor]
|
||||
protected_branches: [main]
|
||||
rules:
|
||||
- id: admins-schema-apply
|
||||
allow:
|
||||
actors: { group: admins }
|
||||
actions: [schema_apply]
|
||||
target_branch_scope: protected
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let engine = PolicyCompiler::compile(&policy, "repo").unwrap();
|
||||
let allow = engine
|
||||
.authorize(&PolicyRequest {
|
||||
actor_id: "act-ragnor".to_string(),
|
||||
action: PolicyAction::SchemaApply,
|
||||
branch: None,
|
||||
target_branch: Some("main".to_string()),
|
||||
})
|
||||
.unwrap();
|
||||
assert!(allow.allowed);
|
||||
|
||||
let deny = engine
|
||||
.authorize(&PolicyRequest {
|
||||
actor_id: "act-ragnor".to_string(),
|
||||
action: PolicyAction::SchemaApply,
|
||||
branch: None,
|
||||
target_branch: Some("feature".to_string()),
|
||||
})
|
||||
.unwrap();
|
||||
assert!(!deny.allowed);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,12 @@ use std::path::{Path, PathBuf};
|
|||
use axum::Router;
|
||||
use axum::body::{Body, to_bytes};
|
||||
use axum::http::{Method, Request, StatusCode};
|
||||
use lance_index::traits::DatasetIndexExt;
|
||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
use omnigraph_server::api::{
|
||||
BranchCreateRequest, BranchMergeRequest, ChangeRequest, ErrorOutput, ExportRequest,
|
||||
IngestRequest, ReadRequest,
|
||||
IngestRequest, ReadRequest, SchemaApplyRequest,
|
||||
};
|
||||
use omnigraph_server::{AppState, build_app};
|
||||
use serde_json::{Value, json};
|
||||
|
|
@ -86,6 +87,19 @@ rules:
|
|||
target_branch_scope: unprotected
|
||||
"#;
|
||||
|
||||
const SCHEMA_APPLY_POLICY_YAML: &str = r#"
|
||||
version: 1
|
||||
groups:
|
||||
admins: [act-ragnor]
|
||||
protected_branches: [main]
|
||||
rules:
|
||||
- id: admins-schema-apply
|
||||
allow:
|
||||
actors: { group: admins }
|
||||
actions: [schema_apply]
|
||||
target_branch_scope: protected
|
||||
"#;
|
||||
|
||||
fn fixture(name: &str) -> PathBuf {
|
||||
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("../omnigraph/tests/fixtures")
|
||||
|
|
@ -114,6 +128,16 @@ async fn init_repo_with_schema_and_data(schema: &str, data: &str) -> tempfile::T
|
|||
temp
|
||||
}
|
||||
|
||||
async fn init_repo_with_schema(schema: &str) -> tempfile::TempDir {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let repo = repo_path(temp.path());
|
||||
fs::create_dir_all(&repo).unwrap();
|
||||
Omnigraph::init(repo.to_str().unwrap(), schema)
|
||||
.await
|
||||
.unwrap();
|
||||
temp
|
||||
}
|
||||
|
||||
fn repo_path(root: &Path) -> PathBuf {
|
||||
root.join("server.omni")
|
||||
}
|
||||
|
|
@ -206,6 +230,64 @@ async fn app_for_loaded_repo_with_auth_tokens_and_policy(
|
|||
(temp, build_app(state))
|
||||
}
|
||||
|
||||
async fn app_for_repo_with_auth_tokens_and_policy(
|
||||
schema: &str,
|
||||
tokens: &[(&str, &str)],
|
||||
policy: &str,
|
||||
) -> (tempfile::TempDir, Router) {
|
||||
let temp = init_repo_with_schema(schema).await;
|
||||
let repo = repo_path(temp.path());
|
||||
let policy_path = temp.path().join("policy.yaml");
|
||||
fs::write(&policy_path, policy).unwrap();
|
||||
let state = AppState::open_with_bearer_tokens_and_policy(
|
||||
repo.to_string_lossy().to_string(),
|
||||
tokens
|
||||
.iter()
|
||||
.map(|(actor, token)| ((*actor).to_string(), (*token).to_string()))
|
||||
.collect(),
|
||||
Some(&policy_path),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
(temp, build_app(state))
|
||||
}
|
||||
|
||||
fn additive_schema_with_nickname() -> String {
|
||||
fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
)
|
||||
}
|
||||
|
||||
fn renamed_person_schema() -> String {
|
||||
fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
|
||||
.replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
|
||||
.replace(
|
||||
"edge WorksAt: Person -> Company",
|
||||
"edge WorksAt: Human -> Company",
|
||||
)
|
||||
}
|
||||
|
||||
fn renamed_age_schema() -> String {
|
||||
fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "years: I32? @rename_from(\"age\")")
|
||||
}
|
||||
|
||||
fn indexed_name_schema() -> String {
|
||||
fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("name: String @key", "name: String @key @index")
|
||||
}
|
||||
|
||||
fn unsupported_schema_change() -> String {
|
||||
fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "age: I64?")
|
||||
}
|
||||
|
||||
async fn json_response(app: &Router, request: Request<Body>) -> (StatusCode, Value) {
|
||||
let response = app.clone().oneshot(request).await.unwrap();
|
||||
let status = response.status();
|
||||
|
|
@ -214,6 +296,279 @@ async fn json_response(app: &Router, request: Request<Body>) -> (StatusCode, Val
|
|||
(status, value)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_updates_repo_for_authorized_admin() {
|
||||
let (temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
SCHEMA_APPLY_POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
let schema = additive_schema_with_nickname();
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: schema,
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
assert_eq!(payload["applied"], true);
|
||||
let repo = repo_path(temp.path());
|
||||
let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap();
|
||||
assert!(
|
||||
reopened.catalog().node_types["Person"]
|
||||
.properties
|
||||
.contains_key("nickname")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_requires_schema_apply_policy_permission() {
|
||||
let (_temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: additive_schema_with_nickname(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::FORBIDDEN);
|
||||
assert_eq!(
|
||||
payload["code"],
|
||||
serde_json::to_value(omnigraph_server::api::ErrorCode::Forbidden).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_requires_bearer_token_when_policy_enabled() {
|
||||
let (_temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
SCHEMA_APPLY_POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: additive_schema_with_nickname(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::UNAUTHORIZED);
|
||||
assert_eq!(
|
||||
payload["code"],
|
||||
serde_json::to_value(omnigraph_server::api::ErrorCode::Unauthorized).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_can_rename_type() {
|
||||
let (temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
SCHEMA_APPLY_POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: renamed_person_schema(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
assert_eq!(payload["applied"], true);
|
||||
let repo = repo_path(temp.path());
|
||||
let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap();
|
||||
let snapshot = reopened
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(snapshot.entry("node:Human").is_some());
|
||||
assert!(snapshot.entry("node:Person").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_can_rename_property() {
|
||||
let (temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
SCHEMA_APPLY_POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: renamed_age_schema(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
assert_eq!(payload["applied"], true);
|
||||
let repo = repo_path(temp.path());
|
||||
let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap();
|
||||
let person = &reopened.catalog().node_types["Person"];
|
||||
assert!(person.properties.contains_key("years"));
|
||||
assert!(!person.properties.contains_key("age"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_can_add_index() {
|
||||
let (temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
SCHEMA_APPLY_POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
let repo = repo_path(temp.path());
|
||||
let before_index_count = {
|
||||
let db = Omnigraph::open(repo.to_str().unwrap()).await.unwrap();
|
||||
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
let dataset = snapshot.open("node:Person").await.unwrap();
|
||||
dataset.load_indices().await.unwrap().len()
|
||||
};
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: indexed_name_schema(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
assert_eq!(payload["applied"], true);
|
||||
let reopened = Omnigraph::open(repo.to_str().unwrap()).await.unwrap();
|
||||
let snapshot = reopened
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap();
|
||||
let dataset = snapshot.open("node:Person").await.unwrap();
|
||||
let after_index_count = dataset.load_indices().await.unwrap().len();
|
||||
assert!(after_index_count > before_index_count);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_rejects_unsupported_plan() {
|
||||
let (_temp, app) = app_for_repo_with_auth_tokens_and_policy(
|
||||
&fs::read_to_string(fixture("test.pg")).unwrap(),
|
||||
&[("act-ragnor", "admin-token")],
|
||||
SCHEMA_APPLY_POLICY_YAML,
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: unsupported_schema_change(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::BAD_REQUEST);
|
||||
assert_eq!(
|
||||
payload["code"],
|
||||
serde_json::to_value(omnigraph_server::api::ErrorCode::BadRequest).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_apply_route_rejects_when_non_main_branch_exists() {
|
||||
let temp = init_repo_with_schema(&fs::read_to_string(fixture("test.pg")).unwrap()).await;
|
||||
let repo = repo_path(temp.path());
|
||||
let mut db = Omnigraph::open(repo.to_str().unwrap()).await.unwrap();
|
||||
db.branch_create("feature").await.unwrap();
|
||||
drop(db);
|
||||
|
||||
let policy_path = temp.path().join("policy.yaml");
|
||||
fs::write(&policy_path, SCHEMA_APPLY_POLICY_YAML).unwrap();
|
||||
let state = AppState::open_with_bearer_tokens_and_policy(
|
||||
repo.to_string_lossy().to_string(),
|
||||
vec![("act-ragnor".to_string(), "admin-token".to_string())],
|
||||
Some(&policy_path),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let app = build_app(state);
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/schema/apply")
|
||||
.header("content-type", "application/json")
|
||||
.header("authorization", "Bearer admin-token")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&SchemaApplyRequest {
|
||||
schema_source: additive_schema_with_nickname(),
|
||||
})
|
||||
.unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let (status, payload) = json_response(&app, request).await;
|
||||
|
||||
assert_eq!(status, StatusCode::CONFLICT);
|
||||
assert_eq!(
|
||||
payload["code"],
|
||||
serde_json::to_value(omnigraph_server::api::ErrorCode::Conflict).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
struct EnvGuard {
|
||||
saved: Vec<(&'static str, Option<String>)>,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ use crate::failpoints;
|
|||
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
|
||||
|
||||
use super::commit_graph::{CommitGraph, GraphCommit};
|
||||
use super::manifest::{ManifestCoordinator, Snapshot, SubTableUpdate};
|
||||
use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
|
||||
use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri, is_internal_run_branch};
|
||||
|
||||
const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
|
||||
|
|
@ -208,6 +208,10 @@ impl GraphCoordinator {
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
|
||||
self.manifest.list_branches().await
|
||||
}
|
||||
|
||||
pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
|
||||
self.manifest
|
||||
.descendant_branches(name)
|
||||
|
|
@ -414,6 +418,28 @@ impl GraphCoordinator {
|
|||
Ok(manifest_version)
|
||||
}
|
||||
|
||||
pub(crate) async fn commit_manifest_changes(
|
||||
&mut self,
|
||||
changes: &[ManifestChange],
|
||||
) -> Result<u64> {
|
||||
let manifest_version = self.manifest.commit_changes(changes).await?;
|
||||
failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
|
||||
Ok(manifest_version)
|
||||
}
|
||||
|
||||
pub(crate) async fn commit_changes_with_actor(
|
||||
&mut self,
|
||||
changes: &[ManifestChange],
|
||||
actor_id: Option<&str>,
|
||||
) -> Result<PublishedSnapshot> {
|
||||
let manifest_version = self.commit_manifest_changes(changes).await?;
|
||||
let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
|
||||
Ok(PublishedSnapshot {
|
||||
manifest_version,
|
||||
_snapshot_id: snapshot_id,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn record_graph_commit(
|
||||
&mut self,
|
||||
manifest_version: u64,
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ mod repo;
|
|||
#[path = "manifest/state.rs"]
|
||||
mod state;
|
||||
|
||||
use layout::{manifest_uri, open_manifest_dataset};
|
||||
use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
|
||||
pub(crate) use metadata::TableVersionMetadata;
|
||||
#[cfg(test)]
|
||||
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
|
||||
|
|
@ -36,6 +36,7 @@ use state::{ManifestState, read_manifest_state};
|
|||
|
||||
const OBJECT_TYPE_TABLE: &str = "table";
|
||||
const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
|
||||
const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
|
||||
const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
|
||||
|
||||
/// Immutable point-in-time view of the database.
|
||||
|
|
@ -85,6 +86,25 @@ impl SubTableUpdate {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct TableRegistration {
|
||||
pub(crate) table_key: String,
|
||||
pub(crate) table_path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct TableTombstone {
|
||||
pub(crate) table_key: String,
|
||||
pub(crate) tombstone_version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum ManifestChange {
|
||||
Update(SubTableUpdate),
|
||||
RegisterTable(TableRegistration),
|
||||
Tombstone(TableTombstone),
|
||||
}
|
||||
|
||||
impl SubTableEntry {
|
||||
pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
|
||||
open_table_at_version_from_manifest(
|
||||
|
|
@ -97,6 +117,19 @@ impl SubTableEntry {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
|
||||
if let Some(type_name) = table_key.strip_prefix("node:") {
|
||||
return Ok(format!("nodes/{}", type_name_hash(type_name)));
|
||||
}
|
||||
if let Some(type_name) = table_key.strip_prefix("edge:") {
|
||||
return Ok(format!("edges/{}", type_name_hash(type_name)));
|
||||
}
|
||||
Err(OmniError::manifest(format!(
|
||||
"invalid table key '{}'",
|
||||
table_key
|
||||
)))
|
||||
}
|
||||
|
||||
/// An update to apply to the manifest via `commit`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SubTableUpdate {
|
||||
|
|
@ -245,11 +278,20 @@ impl ManifestCoordinator {
|
|||
/// Atomically inserts one immutable `table_version` row per updated table.
|
||||
/// The merge-insert commit on `__manifest` is the graph-level publish point.
|
||||
pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
|
||||
if updates.is_empty() {
|
||||
let changes = updates
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(ManifestChange::Update)
|
||||
.collect::<Vec<_>>();
|
||||
self.commit_changes(&changes).await
|
||||
}
|
||||
|
||||
pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
|
||||
if changes.is_empty() {
|
||||
return Ok(self.version());
|
||||
}
|
||||
|
||||
self.dataset = self.publisher.publish(updates).await?;
|
||||
self.dataset = self.publisher.publish(changes).await?;
|
||||
|
||||
self.known_state = read_manifest_state(&self.dataset).await?;
|
||||
Ok(self.version())
|
||||
|
|
|
|||
|
|
@ -40,6 +40,10 @@ pub(super) fn version_object_id(table_key: &str, version: u64) -> String {
|
|||
format!("{}${}", table_key, format_table_version(version))
|
||||
}
|
||||
|
||||
pub(super) fn tombstone_object_id(table_key: &str, version: u64) -> String {
|
||||
format!("{}$tombstone${}", table_key, format_table_version(version))
|
||||
}
|
||||
|
||||
pub(super) fn table_id_to_key(request_id: Option<&Vec<String>>) -> lance_namespace::Result<String> {
|
||||
match request_id {
|
||||
Some(request_id) if request_id.len() == 1 && !request_id[0].is_empty() => {
|
||||
|
|
|
|||
|
|
@ -15,11 +15,11 @@
|
|||
//! This module should disappear once Lance Rust can do branch-aware batch table
|
||||
//! version publication against a managed namespace manifest.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatchIterator;
|
||||
use async_trait::async_trait;
|
||||
use lance::Dataset;
|
||||
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
|
||||
use lance_namespace::NamespaceError;
|
||||
|
|
@ -27,16 +27,20 @@ use lance_namespace::models::CreateTableVersionRequest;
|
|||
|
||||
use crate::error::{OmniError, Result};
|
||||
|
||||
use super::layout::{open_manifest_dataset, version_object_id};
|
||||
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
|
||||
use super::metadata::parse_namespace_version_request;
|
||||
use super::state::{
|
||||
manifest_rows_batch, manifest_schema, read_manifest_entries, read_manifest_state,
|
||||
manifest_rows_batch, manifest_schema, read_manifest_entries, read_registered_table_locations,
|
||||
read_tombstone_versions,
|
||||
};
|
||||
use super::{
|
||||
ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION,
|
||||
SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone,
|
||||
};
|
||||
use super::{OBJECT_TYPE_TABLE_VERSION, SubTableEntry, SubTableUpdate};
|
||||
|
||||
#[async_trait]
|
||||
pub(super) trait ManifestBatchPublisher: Send + Sync {
|
||||
async fn publish(&self, updates: &[SubTableUpdate]) -> Result<Dataset>;
|
||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset>;
|
||||
}
|
||||
|
||||
pub(super) struct GraphNamespacePublisher {
|
||||
|
|
@ -47,6 +51,8 @@ pub(super) struct GraphNamespacePublisher {
|
|||
#[derive(Debug)]
|
||||
struct PendingVersionRow {
|
||||
object_id: String,
|
||||
object_type: String,
|
||||
location: Option<String>,
|
||||
metadata: Option<String>,
|
||||
table_key: String,
|
||||
table_version: Option<u64>,
|
||||
|
|
@ -72,17 +78,13 @@ impl GraphNamespacePublisher {
|
|||
&self,
|
||||
) -> Result<(
|
||||
Dataset,
|
||||
HashMap<String, ()>,
|
||||
HashMap<String, String>,
|
||||
HashMap<(String, u64), SubTableEntry>,
|
||||
HashMap<(String, u64), ()>,
|
||||
)> {
|
||||
let dataset = self.dataset().await?;
|
||||
let current = read_manifest_state(&dataset).await?;
|
||||
let registered_tables = read_registered_table_locations(&dataset).await?;
|
||||
let existing_entries = read_manifest_entries(&dataset).await?;
|
||||
let known_tables = current
|
||||
.entries
|
||||
.iter()
|
||||
.map(|entry| (entry.table_key.clone(), ()))
|
||||
.collect();
|
||||
let existing_versions = existing_entries
|
||||
.iter()
|
||||
.map(|entry| {
|
||||
|
|
@ -92,67 +94,153 @@ impl GraphNamespacePublisher {
|
|||
)
|
||||
})
|
||||
.collect();
|
||||
Ok((dataset, known_tables, existing_versions))
|
||||
let existing_tombstones = read_tombstone_versions(&dataset).await?;
|
||||
Ok((
|
||||
dataset,
|
||||
registered_tables,
|
||||
existing_versions,
|
||||
existing_tombstones,
|
||||
))
|
||||
}
|
||||
|
||||
fn build_pending_rows(
|
||||
requests: &[CreateTableVersionRequest],
|
||||
known_tables: &HashMap<String, ()>,
|
||||
changes: &[ManifestChange],
|
||||
known_tables: &HashMap<String, String>,
|
||||
existing_versions: &HashMap<(String, u64), SubTableEntry>,
|
||||
existing_tombstones: &HashMap<(String, u64), ()>,
|
||||
) -> Result<Vec<PendingVersionRow>> {
|
||||
let mut request_versions = HashMap::<(String, u64), ()>::new();
|
||||
let mut rows = Vec::with_capacity(requests.len());
|
||||
let mut known_tables = known_tables.clone();
|
||||
let mut rows = Vec::with_capacity(changes.len());
|
||||
|
||||
for request in requests {
|
||||
let (table_key, table_version, row_count, table_branch, version_metadata) =
|
||||
parse_namespace_version_request(request)
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
if !known_tables.contains_key(table_key.as_str()) {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::TableNotFound {
|
||||
message: format!("table {} not found", table_key),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
if request_versions
|
||||
.insert((table_key.clone(), table_version), ())
|
||||
.is_some()
|
||||
for change in changes {
|
||||
if let ManifestChange::RegisterTable(TableRegistration {
|
||||
table_key,
|
||||
table_path,
|
||||
}) = change
|
||||
{
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::ConcurrentModification {
|
||||
message: format!(
|
||||
"table version {} already exists for {}",
|
||||
table_version, table_key
|
||||
),
|
||||
if let Some(existing_path) = known_tables.get(table_key) {
|
||||
if existing_path != table_path {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::ConcurrentModification {
|
||||
message: format!(
|
||||
"table {} already exists with different path {}",
|
||||
table_key, existing_path
|
||||
),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
} else {
|
||||
known_tables.insert(table_key.clone(), table_path.clone());
|
||||
}
|
||||
rows.push(PendingVersionRow {
|
||||
object_id: table_key.clone(),
|
||||
object_type: OBJECT_TYPE_TABLE.to_string(),
|
||||
location: Some(table_path.clone()),
|
||||
metadata: None,
|
||||
table_key: table_key.clone(),
|
||||
table_version: None,
|
||||
table_branch: None,
|
||||
row_count: None,
|
||||
});
|
||||
}
|
||||
if let Some(existing) = existing_versions.get(&(table_key.clone(), table_version)) {
|
||||
let is_owner_branch_handoff =
|
||||
existing.row_count == row_count && existing.table_branch != table_branch;
|
||||
if !is_owner_branch_handoff {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::ConcurrentModification {
|
||||
message: format!(
|
||||
"table version {} already exists for {}",
|
||||
table_version, table_key
|
||||
),
|
||||
}
|
||||
|
||||
for change in changes {
|
||||
match change {
|
||||
ManifestChange::RegisterTable(_) => {}
|
||||
ManifestChange::Update(update) => {
|
||||
let request = update.to_create_table_version_request();
|
||||
let (table_key, table_version, row_count, table_branch, version_metadata) =
|
||||
parse_namespace_version_request(&request)
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
if !known_tables.contains_key(table_key.as_str()) {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::TableNotFound {
|
||||
message: format!("table {} not found", table_key),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
if request_versions
|
||||
.insert((table_key.clone(), table_version), ())
|
||||
.is_some()
|
||||
{
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::ConcurrentModification {
|
||||
message: format!(
|
||||
"table version {} already exists for {}",
|
||||
table_version, table_key
|
||||
),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
if let Some(existing) =
|
||||
existing_versions.get(&(table_key.clone(), table_version))
|
||||
{
|
||||
let is_owner_branch_handoff = existing.row_count == row_count
|
||||
&& existing.table_branch != table_branch;
|
||||
if !is_owner_branch_handoff {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::ConcurrentModification {
|
||||
message: format!(
|
||||
"table version {} already exists for {}",
|
||||
table_version, table_key
|
||||
),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
rows.push(PendingVersionRow {
|
||||
object_id: version_object_id(&table_key, table_version),
|
||||
object_type: OBJECT_TYPE_TABLE_VERSION.to_string(),
|
||||
location: None,
|
||||
metadata: Some(version_metadata.to_json_string()?),
|
||||
table_key,
|
||||
table_version: Some(table_version),
|
||||
table_branch,
|
||||
row_count: Some(row_count),
|
||||
});
|
||||
}
|
||||
ManifestChange::Tombstone(TableTombstone {
|
||||
table_key,
|
||||
tombstone_version,
|
||||
}) => {
|
||||
if !known_tables.contains_key(table_key.as_str()) {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::TableNotFound {
|
||||
message: format!("table {} not found", table_key),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
if existing_tombstones.contains_key(&(table_key.clone(), *tombstone_version)) {
|
||||
return Err(OmniError::Lance(
|
||||
NamespaceError::ConcurrentModification {
|
||||
message: format!(
|
||||
"table tombstone {} already exists for {}",
|
||||
tombstone_version, table_key
|
||||
),
|
||||
}
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
rows.push(PendingVersionRow {
|
||||
object_id: tombstone_object_id(table_key, *tombstone_version),
|
||||
object_type: OBJECT_TYPE_TABLE_TOMBSTONE.to_string(),
|
||||
location: None,
|
||||
metadata: None,
|
||||
table_key: table_key.clone(),
|
||||
table_version: Some(*tombstone_version),
|
||||
table_branch: None,
|
||||
row_count: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
rows.push(PendingVersionRow {
|
||||
object_id: version_object_id(&table_key, table_version),
|
||||
metadata: Some(version_metadata.to_json_string()?),
|
||||
table_key,
|
||||
table_version: Some(table_version),
|
||||
table_branch,
|
||||
row_count: Some(row_count),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(rows)
|
||||
|
|
@ -170,8 +258,8 @@ impl GraphNamespacePublisher {
|
|||
|
||||
for row in rows {
|
||||
object_ids.push(row.object_id);
|
||||
object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string());
|
||||
locations.push(None);
|
||||
object_types.push(row.object_type);
|
||||
locations.push(row.location);
|
||||
metadata.push(row.metadata);
|
||||
table_keys.push(row.table_key);
|
||||
table_versions.push(row.table_version);
|
||||
|
|
@ -214,23 +302,41 @@ impl GraphNamespacePublisher {
|
|||
&self,
|
||||
requests: &[CreateTableVersionRequest],
|
||||
) -> Result<Dataset> {
|
||||
if requests.is_empty() {
|
||||
return self.dataset().await;
|
||||
}
|
||||
|
||||
let (dataset, known_tables, existing_versions) = self.load_publish_state().await?;
|
||||
let rows = Self::build_pending_rows(requests, &known_tables, &existing_versions)?;
|
||||
self.merge_rows(dataset, rows).await
|
||||
let changes = requests
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|request| {
|
||||
let (table_key, table_version, row_count, table_branch, version_metadata) =
|
||||
parse_namespace_version_request(&request)
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
Ok(ManifestChange::Update(SubTableUpdate {
|
||||
table_key,
|
||||
table_version,
|
||||
table_branch,
|
||||
row_count,
|
||||
version_metadata,
|
||||
}))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
self.publish(&changes).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ManifestBatchPublisher for GraphNamespacePublisher {
|
||||
async fn publish(&self, updates: &[SubTableUpdate]) -> Result<Dataset> {
|
||||
let requests: Vec<CreateTableVersionRequest> = updates
|
||||
.iter()
|
||||
.map(SubTableUpdate::to_create_table_version_request)
|
||||
.collect();
|
||||
self.publish_requests(&requests).await
|
||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
|
||||
if changes.is_empty() {
|
||||
return self.dataset().await;
|
||||
}
|
||||
|
||||
let (dataset, known_tables, existing_versions, existing_tombstones) =
|
||||
self.load_publish_state().await?;
|
||||
let rows = Self::build_pending_rows(
|
||||
changes,
|
||||
&known_tables,
|
||||
&existing_versions,
|
||||
&existing_tombstones,
|
||||
)?;
|
||||
self.merge_rows(dataset, rows).await
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ use crate::error::{OmniError, Result};
|
|||
|
||||
use super::layout::version_object_id;
|
||||
use super::metadata::TableVersionMetadata;
|
||||
use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_VERSION};
|
||||
use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SubTableEntry {
|
||||
|
|
@ -28,6 +28,19 @@ pub(super) struct ManifestState {
|
|||
pub(super) entries: Vec<SubTableEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TableTombstoneEntry {
|
||||
table_key: String,
|
||||
tombstone_version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ManifestScan {
|
||||
table_locations: HashMap<String, String>,
|
||||
version_entries: Vec<SubTableEntry>,
|
||||
tombstones: Vec<TableTombstoneEntry>,
|
||||
}
|
||||
|
||||
pub(super) fn manifest_schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("object_id", DataType::Utf8, false),
|
||||
|
|
@ -48,10 +61,10 @@ pub(super) fn manifest_schema() -> SchemaRef {
|
|||
|
||||
pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestState> {
|
||||
let version = dataset.version().version;
|
||||
let entries = read_manifest_entries(dataset).await?;
|
||||
let scan = read_manifest_scan(dataset).await?;
|
||||
let mut latest_versions = HashMap::<String, SubTableEntry>::new();
|
||||
|
||||
for entry in entries {
|
||||
for entry in scan.version_entries {
|
||||
match latest_versions.get(&entry.table_key) {
|
||||
Some(existing) if existing.table_version >= entry.table_version => {}
|
||||
_ => {
|
||||
|
|
@ -60,13 +73,52 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestSta
|
|||
}
|
||||
}
|
||||
|
||||
let mut entries: Vec<SubTableEntry> = latest_versions.into_values().collect();
|
||||
let mut tombstones = HashMap::<String, u64>::new();
|
||||
for tombstone in scan.tombstones {
|
||||
match tombstones.get(&tombstone.table_key) {
|
||||
Some(existing) if *existing >= tombstone.tombstone_version => {}
|
||||
_ => {
|
||||
tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut entries: Vec<SubTableEntry> = latest_versions
|
||||
.into_values()
|
||||
.filter(|entry| {
|
||||
tombstones
|
||||
.get(&entry.table_key)
|
||||
.map(|tombstone_version| *tombstone_version < entry.table_version)
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.collect();
|
||||
entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
|
||||
|
||||
Ok(ManifestState { version, entries })
|
||||
}
|
||||
|
||||
pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTableEntry>> {
|
||||
Ok(read_manifest_scan(dataset).await?.version_entries)
|
||||
}
|
||||
|
||||
pub(super) async fn read_registered_table_locations(
|
||||
dataset: &Dataset,
|
||||
) -> Result<HashMap<String, String>> {
|
||||
Ok(read_manifest_scan(dataset).await?.table_locations)
|
||||
}
|
||||
|
||||
pub(super) async fn read_tombstone_versions(
|
||||
dataset: &Dataset,
|
||||
) -> Result<HashMap<(String, u64), ()>> {
|
||||
Ok(read_manifest_scan(dataset)
|
||||
.await?
|
||||
.tombstones
|
||||
.into_iter()
|
||||
.map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn read_manifest_scan(dataset: &Dataset) -> Result<ManifestScan> {
|
||||
let batches: Vec<RecordBatch> = dataset
|
||||
.scan()
|
||||
.try_into_stream()
|
||||
|
|
@ -78,6 +130,7 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTa
|
|||
|
||||
let mut table_locations = HashMap::new();
|
||||
let mut version_entries = Vec::new();
|
||||
let mut tombstones = Vec::new();
|
||||
|
||||
for batch in &batches {
|
||||
let object_types = string_column(batch, "object_type")?;
|
||||
|
|
@ -123,6 +176,13 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTa
|
|||
version_metadata: TableVersionMetadata::from_json_str(metadata.value(row))?,
|
||||
});
|
||||
}
|
||||
OBJECT_TYPE_TABLE_TOMBSTONE => {
|
||||
let tombstone_version = required_u64(versions, row, "table_version")?;
|
||||
tombstones.push(TableTombstoneEntry {
|
||||
table_key,
|
||||
tombstone_version,
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
|
@ -149,7 +209,11 @@ pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTa
|
|||
.then(a.table_version.cmp(&b.table_version))
|
||||
});
|
||||
|
||||
Ok(entries)
|
||||
Ok(ManifestScan {
|
||||
table_locations,
|
||||
version_entries: entries,
|
||||
tombstones,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn entries_to_batch(
|
||||
|
|
|
|||
|
|
@ -132,6 +132,63 @@ async fn test_commit_advances_version() {
|
|||
assert_eq!(company.row_count, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_changes_can_register_new_table_and_tombstone_old_one() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
|
||||
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
let before_version = mc.version();
|
||||
let person_entry = mc.snapshot().entry("node:Person").unwrap().clone();
|
||||
|
||||
let table_key = "node:Human".to_string();
|
||||
let table_path = table_path_for_table_key(&table_key).unwrap();
|
||||
let dataset_uri = format!("{}/{}", uri, table_path);
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
Field::new("name", DataType::Utf8, false),
|
||||
Field::new("age", DataType::Int32, true),
|
||||
]));
|
||||
let ds = crate::table_store::TableStore::create_empty_dataset(&dataset_uri, &schema)
|
||||
.await
|
||||
.unwrap();
|
||||
let state = crate::table_store::TableStore::new(uri)
|
||||
.table_state(&dataset_uri, &ds)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
mc.commit_changes(&[
|
||||
ManifestChange::RegisterTable(TableRegistration {
|
||||
table_key: table_key.clone(),
|
||||
table_path: table_path.clone(),
|
||||
}),
|
||||
ManifestChange::Update(SubTableUpdate {
|
||||
table_key: table_key.clone(),
|
||||
table_version: state.version,
|
||||
table_branch: None,
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
}),
|
||||
ManifestChange::Tombstone(TableTombstone {
|
||||
table_key: "node:Person".to_string(),
|
||||
tombstone_version: person_entry.table_version + 1,
|
||||
}),
|
||||
])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let head = mc.snapshot();
|
||||
assert!(head.entry("node:Human").is_some());
|
||||
assert!(head.entry("node:Person").is_none());
|
||||
|
||||
let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(historical.entry("node:Person").is_some());
|
||||
assert!(historical.entry("node:Human").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_snapshot_open_sub_table() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
@ -889,13 +946,16 @@ impl RecordingPublisher {
|
|||
|
||||
#[async_trait]
|
||||
impl ManifestBatchPublisher for RecordingPublisher {
|
||||
async fn publish(&self, updates: &[SubTableUpdate]) -> Result<Dataset> {
|
||||
let requests: Vec<CreateTableVersionRequest> = updates
|
||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
|
||||
let requests: Vec<CreateTableVersionRequest> = changes
|
||||
.iter()
|
||||
.map(SubTableUpdate::to_create_table_version_request)
|
||||
.filter_map(|change| match change {
|
||||
ManifestChange::Update(update) => Some(update.to_create_table_version_request()),
|
||||
ManifestChange::RegisterTable(_) | ManifestChange::Tombstone(_) => None,
|
||||
})
|
||||
.collect();
|
||||
self.requests.lock().await.extend_from_slice(&requests);
|
||||
self.inner.publish_requests(&requests).await
|
||||
self.inner.publish(changes).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -903,7 +963,7 @@ struct FailingPublisher;
|
|||
|
||||
#[async_trait]
|
||||
impl ManifestBatchPublisher for FailingPublisher {
|
||||
async fn publish(&self, _updates: &[SubTableUpdate]) -> Result<Dataset> {
|
||||
async fn publish(&self, _changes: &[ManifestChange]) -> Result<Dataset> {
|
||||
Err(OmniError::manifest(
|
||||
"injected batch publisher failure".to_string(),
|
||||
))
|
||||
|
|
|
|||
|
|
@ -8,6 +8,6 @@ mod schema_state;
|
|||
pub use commit_graph::GraphCommit;
|
||||
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
|
||||
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
|
||||
pub use omnigraph::{MergeOutcome, Omnigraph};
|
||||
pub use omnigraph::{MergeOutcome, Omnigraph, SchemaApplyResult};
|
||||
pub(crate) use run_registry::is_internal_run_branch;
|
||||
pub use run_registry::{RunId, RunRecord, RunStatus};
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use std::sync::Arc;
|
|||
use arrow_array::{
|
||||
Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
|
||||
Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
|
||||
RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array,
|
||||
RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
|
||||
};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use lance::Dataset;
|
||||
|
|
@ -17,7 +17,8 @@ use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
|
|||
use omnigraph_compiler::schema::parser::parse_schema;
|
||||
use omnigraph_compiler::types::ScalarType;
|
||||
use omnigraph_compiler::{
|
||||
SchemaIR, SchemaMigrationPlan, build_catalog_from_ir, build_schema_ir, plan_schema_migration,
|
||||
SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
|
||||
build_schema_ir, plan_schema_migration,
|
||||
};
|
||||
|
||||
use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
|
||||
|
|
@ -28,7 +29,9 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_u
|
|||
use crate::table_store::TableStore;
|
||||
|
||||
use super::commit_graph::GraphCommit;
|
||||
use super::manifest::Snapshot;
|
||||
use super::manifest::{
|
||||
ManifestChange, Snapshot, TableRegistration, TableTombstone, table_path_for_table_key,
|
||||
};
|
||||
use super::schema_state::{
|
||||
SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
|
||||
validate_schema_contract, write_schema_contract,
|
||||
|
|
@ -42,6 +45,14 @@ pub enum MergeOutcome {
|
|||
Merged,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SchemaApplyResult {
|
||||
pub supported: bool,
|
||||
pub applied: bool,
|
||||
pub manifest_version: u64,
|
||||
pub steps: Vec<SchemaMigrationStep>,
|
||||
}
|
||||
|
||||
/// Top-level handle to an Omnigraph database.
|
||||
///
|
||||
/// An Omnigraph is a Lance-native graph database with git-style branching.
|
||||
|
|
@ -160,6 +171,324 @@ impl Omnigraph {
|
|||
.map_err(|err| OmniError::manifest(err.to_string()))
|
||||
}
|
||||
|
||||
pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
let branches = self.coordinator.all_branches().await?;
|
||||
let public_non_main = branches
|
||||
.iter()
|
||||
.filter(|branch| branch.as_str() != "main")
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
if !public_non_main.is_empty() {
|
||||
return Err(OmniError::manifest_conflict(format!(
|
||||
"schema apply requires a repo with only main; found non-main branches: {}",
|
||||
public_non_main.join(", ")
|
||||
)));
|
||||
}
|
||||
|
||||
let accepted_ir = read_accepted_schema_ir(self.uri(), Arc::clone(&self.storage)).await?;
|
||||
let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
|
||||
let plan = plan_schema_migration(&accepted_ir, &desired_ir)
|
||||
.map_err(|err| OmniError::manifest(err.to_string()))?;
|
||||
if !plan.supported {
|
||||
let reason = plan
|
||||
.steps
|
||||
.iter()
|
||||
.find_map(|step| match step {
|
||||
SchemaMigrationStep::UnsupportedChange { reason, .. } => Some(reason.as_str()),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or("unsupported schema migration plan");
|
||||
return Err(OmniError::manifest(reason.to_string()));
|
||||
}
|
||||
if plan.steps.is_empty() {
|
||||
return Ok(SchemaApplyResult {
|
||||
supported: true,
|
||||
applied: false,
|
||||
manifest_version: self.version(),
|
||||
steps: plan.steps,
|
||||
});
|
||||
}
|
||||
|
||||
let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
|
||||
fixup_blob_schemas(&mut desired_catalog);
|
||||
|
||||
let snapshot = self.snapshot();
|
||||
let mut added_tables = BTreeSet::new();
|
||||
let mut renamed_tables = HashMap::new();
|
||||
let mut rewritten_tables = BTreeSet::new();
|
||||
let mut indexed_tables = BTreeSet::new();
|
||||
let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
|
||||
let mut changed_edge_tables = false;
|
||||
|
||||
for step in &plan.steps {
|
||||
match step {
|
||||
SchemaMigrationStep::AddType { type_kind, name } => {
|
||||
let table_key = schema_table_key(*type_kind, name);
|
||||
if table_key.starts_with("edge:") {
|
||||
changed_edge_tables = true;
|
||||
}
|
||||
added_tables.insert(table_key);
|
||||
}
|
||||
SchemaMigrationStep::RenameType {
|
||||
type_kind,
|
||||
from,
|
||||
to,
|
||||
} => {
|
||||
let source_key = schema_table_key(*type_kind, from);
|
||||
let target_key = schema_table_key(*type_kind, to);
|
||||
if source_key.starts_with("edge:") {
|
||||
changed_edge_tables = true;
|
||||
}
|
||||
renamed_tables.insert(target_key, source_key);
|
||||
}
|
||||
SchemaMigrationStep::AddProperty {
|
||||
type_kind,
|
||||
type_name,
|
||||
..
|
||||
} => {
|
||||
let table_key = schema_table_key(*type_kind, type_name);
|
||||
if table_key.starts_with("edge:") {
|
||||
changed_edge_tables = true;
|
||||
}
|
||||
rewritten_tables.insert(table_key);
|
||||
}
|
||||
SchemaMigrationStep::RenameProperty {
|
||||
type_kind,
|
||||
type_name,
|
||||
from,
|
||||
to,
|
||||
} => {
|
||||
let table_key = schema_table_key(*type_kind, type_name);
|
||||
if table_key.starts_with("edge:") {
|
||||
changed_edge_tables = true;
|
||||
}
|
||||
rewritten_tables.insert(table_key.clone());
|
||||
property_renames
|
||||
.entry(table_key)
|
||||
.or_default()
|
||||
.insert(to.clone(), from.clone());
|
||||
}
|
||||
SchemaMigrationStep::AddConstraint {
|
||||
type_kind,
|
||||
type_name,
|
||||
..
|
||||
} => {
|
||||
indexed_tables.insert(schema_table_key(*type_kind, type_name));
|
||||
}
|
||||
SchemaMigrationStep::UpdateTypeMetadata { .. }
|
||||
| SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
|
||||
SchemaMigrationStep::UnsupportedChange { reason, .. } => {
|
||||
return Err(OmniError::manifest(reason.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut table_registrations = HashMap::<String, String>::new();
|
||||
let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
|
||||
let mut table_tombstones = HashMap::<String, u64>::new();
|
||||
|
||||
for table_key in &added_tables {
|
||||
let table_path = table_path_for_table_key(table_key)?;
|
||||
let dataset_uri = self.table_store.dataset_uri(&table_path);
|
||||
let schema = schema_for_table_key(&desired_catalog, table_key)?;
|
||||
let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
|
||||
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
|
||||
.await?;
|
||||
let state = self.table_store.table_state(&dataset_uri, &ds).await?;
|
||||
table_registrations.insert(table_key.clone(), table_path);
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
table_key: table_key.clone(),
|
||||
table_version: state.version,
|
||||
table_branch: None,
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
for (target_table_key, source_table_key) in &renamed_tables {
|
||||
let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing source table '{}' for schema rename",
|
||||
source_table_key
|
||||
))
|
||||
})?;
|
||||
let source_ds = snapshot.open(source_table_key).await?;
|
||||
let batch = self
|
||||
.batch_for_schema_apply_rewrite(
|
||||
&source_ds,
|
||||
source_table_key,
|
||||
&self.catalog,
|
||||
target_table_key,
|
||||
&desired_catalog,
|
||||
property_renames.get(target_table_key),
|
||||
)
|
||||
.await?;
|
||||
let table_path = table_path_for_table_key(target_table_key)?;
|
||||
let dataset_uri = self.table_store.dataset_uri(&table_path);
|
||||
let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
|
||||
self.build_indices_on_dataset_for_catalog(
|
||||
&desired_catalog,
|
||||
target_table_key,
|
||||
&mut target_ds,
|
||||
)
|
||||
.await?;
|
||||
let state = self
|
||||
.table_store
|
||||
.table_state(&dataset_uri, &target_ds)
|
||||
.await?;
|
||||
table_registrations.insert(target_table_key.clone(), table_path);
|
||||
table_updates.insert(
|
||||
target_table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
table_key: target_table_key.clone(),
|
||||
table_version: state.version,
|
||||
table_branch: None,
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
},
|
||||
);
|
||||
table_tombstones.insert(
|
||||
source_table_key.clone(),
|
||||
source_entry.table_version.saturating_add(1),
|
||||
);
|
||||
}
|
||||
|
||||
for table_key in &rewritten_tables {
|
||||
if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
|
||||
continue;
|
||||
}
|
||||
let entry = snapshot.entry(table_key).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing source table '{}' for schema apply",
|
||||
table_key
|
||||
))
|
||||
})?;
|
||||
let source_ds = snapshot.open(table_key).await?;
|
||||
let batch = self
|
||||
.batch_for_schema_apply_rewrite(
|
||||
&source_ds,
|
||||
table_key,
|
||||
&self.catalog,
|
||||
table_key,
|
||||
&desired_catalog,
|
||||
property_renames.get(table_key),
|
||||
)
|
||||
.await?;
|
||||
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
|
||||
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
|
||||
let mut state = self
|
||||
.table_store
|
||||
.table_state(&dataset_uri, &target_ds)
|
||||
.await?;
|
||||
if indexed_tables.contains(table_key) {
|
||||
self.build_indices_on_dataset_for_catalog(
|
||||
&desired_catalog,
|
||||
table_key,
|
||||
&mut target_ds,
|
||||
)
|
||||
.await?;
|
||||
state = self
|
||||
.table_store
|
||||
.table_state(&dataset_uri, &target_ds)
|
||||
.await?;
|
||||
}
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
table_key: table_key.clone(),
|
||||
table_version: state.version,
|
||||
table_branch: None,
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
for table_key in &indexed_tables {
|
||||
if added_tables.contains(table_key)
|
||||
|| renamed_tables.contains_key(table_key)
|
||||
|| rewritten_tables.contains(table_key)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let entry = snapshot.entry(table_key).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing table '{}' for schema index apply",
|
||||
table_key
|
||||
))
|
||||
})?;
|
||||
let dataset_uri = self.table_store.dataset_uri(&entry.table_path);
|
||||
let mut ds = self
|
||||
.table_store
|
||||
.open_dataset_head_for_write(table_key, &dataset_uri, None)
|
||||
.await?;
|
||||
self.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
|
||||
.await?;
|
||||
let state = self.table_store.table_state(&dataset_uri, &ds).await?;
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
table_key: table_key.clone(),
|
||||
table_version: state.version,
|
||||
table_branch: None,
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let mut manifest_changes = Vec::new();
|
||||
for (table_key, table_path) in table_registrations {
|
||||
manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
|
||||
table_key,
|
||||
table_path,
|
||||
}));
|
||||
}
|
||||
for update in table_updates.into_values() {
|
||||
manifest_changes.push(ManifestChange::Update(update));
|
||||
}
|
||||
for (table_key, tombstone_version) in table_tombstones {
|
||||
manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
|
||||
table_key,
|
||||
tombstone_version,
|
||||
}));
|
||||
}
|
||||
|
||||
let actor_id = self.current_audit_actor().map(str::to_string);
|
||||
let PublishedSnapshot {
|
||||
manifest_version,
|
||||
_snapshot_id: _,
|
||||
} = self
|
||||
.coordinator
|
||||
.commit_changes_with_actor(&manifest_changes, actor_id.as_deref())
|
||||
.await?;
|
||||
|
||||
let schema_path = join_uri(&self.root_uri, SCHEMA_SOURCE_FILENAME);
|
||||
self.storage
|
||||
.write_text(&schema_path, desired_schema_source)
|
||||
.await?;
|
||||
write_schema_contract(&self.root_uri, self.storage.as_ref(), &desired_ir).await?;
|
||||
|
||||
self.catalog = desired_catalog;
|
||||
self.schema_source = desired_schema_source.to_string();
|
||||
self.coordinator.refresh().await?;
|
||||
self.runtime_cache.invalidate_all().await;
|
||||
if changed_edge_tables {
|
||||
self.invalidate_graph_index().await;
|
||||
}
|
||||
|
||||
Ok(SchemaApplyResult {
|
||||
supported: true,
|
||||
applied: true,
|
||||
manifest_version,
|
||||
steps: plan.steps,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn table_store(&self) -> &TableStore {
|
||||
&self.table_store
|
||||
}
|
||||
|
|
@ -1368,6 +1697,16 @@ impl Omnigraph {
|
|||
&self,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
) -> Result<()> {
|
||||
self.build_indices_on_dataset_for_catalog(&self.catalog, table_key, ds)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn build_indices_on_dataset_for_catalog(
|
||||
&self,
|
||||
catalog: &Catalog,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
) -> Result<()> {
|
||||
if let Some(type_name) = table_key.strip_prefix("node:") {
|
||||
if !self.table_store.has_btree_index(ds, "id").await? {
|
||||
|
|
@ -1379,7 +1718,7 @@ impl Omnigraph {
|
|||
})?;
|
||||
}
|
||||
|
||||
if let Some(node_type) = self.catalog.node_types.get(type_name) {
|
||||
if let Some(node_type) = catalog.node_types.get(type_name) {
|
||||
for index_cols in &node_type.indices {
|
||||
if index_cols.len() != 1 {
|
||||
continue;
|
||||
|
|
@ -1600,58 +1939,100 @@ impl Omnigraph {
|
|||
source_ds: &Dataset,
|
||||
table_key: &str,
|
||||
) -> Result<RecordBatch> {
|
||||
let target_schema = schema_for_table_key(self.catalog(), table_key)?;
|
||||
let blob_properties = blob_properties_for_table_key(self.catalog(), table_key)?;
|
||||
if blob_properties.is_empty() {
|
||||
let batches = self.table_store().scan_batches(source_ds).await?;
|
||||
return concat_or_empty_batches(target_schema, batches);
|
||||
}
|
||||
self.batch_for_schema_apply_rewrite(
|
||||
source_ds,
|
||||
table_key,
|
||||
&self.catalog,
|
||||
table_key,
|
||||
&self.catalog,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
let batches = self
|
||||
.table_store()
|
||||
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
|
||||
.await?;
|
||||
let batch = concat_or_empty_batches(target_schema.clone(), batches)?;
|
||||
if batch.num_rows() == 0 {
|
||||
return Ok(batch);
|
||||
async fn batch_for_schema_apply_rewrite(
|
||||
&self,
|
||||
source_ds: &Dataset,
|
||||
source_table_key: &str,
|
||||
source_catalog: &Catalog,
|
||||
target_table_key: &str,
|
||||
target_catalog: &Catalog,
|
||||
property_renames: Option<&HashMap<String, String>>,
|
||||
) -> Result<RecordBatch> {
|
||||
let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
|
||||
let source_blob_properties =
|
||||
blob_properties_for_table_key(source_catalog, source_table_key)?;
|
||||
let target_blob_properties =
|
||||
blob_properties_for_table_key(target_catalog, target_table_key)?;
|
||||
let needs_row_ids =
|
||||
!source_blob_properties.is_empty() || !target_blob_properties.is_empty();
|
||||
let batches = if needs_row_ids {
|
||||
self.table_store()
|
||||
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
|
||||
.await?
|
||||
} else {
|
||||
self.table_store().scan_batches(source_ds).await?
|
||||
};
|
||||
if batches.is_empty() {
|
||||
return Ok(RecordBatch::new_empty(target_schema));
|
||||
}
|
||||
let source_schema = batches[0].schema();
|
||||
let batch = concat_or_empty_batches(source_schema, batches)?;
|
||||
|
||||
let row_ids = batch
|
||||
.column_by_name("_rowid")
|
||||
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"expected _rowid column when rewriting '{}'",
|
||||
table_key
|
||||
))
|
||||
})?;
|
||||
let row_ids: Vec<u64> = row_ids.values().iter().copied().collect();
|
||||
let row_ids = if needs_row_ids {
|
||||
Some(
|
||||
batch
|
||||
.column_by_name("_rowid")
|
||||
.and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"expected _rowid column when rewriting '{}'",
|
||||
source_table_key
|
||||
))
|
||||
})?
|
||||
.values()
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut columns = Vec::with_capacity(target_schema.fields().len());
|
||||
for field in target_schema.fields() {
|
||||
if blob_properties.contains(field.name()) {
|
||||
let descriptions = batch
|
||||
.column_by_name(field.name())
|
||||
.and_then(|col| col.as_any().downcast_ref::<StructArray>())
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"expected blob descriptions for '{}.{}'",
|
||||
table_key,
|
||||
field.name()
|
||||
))
|
||||
})?;
|
||||
columns.push(
|
||||
self.rebuild_blob_column(source_ds, field.name(), descriptions, &row_ids)
|
||||
.await?,
|
||||
);
|
||||
let source_name = property_renames
|
||||
.and_then(|renames| renames.get(field.name()))
|
||||
.map(String::as_str)
|
||||
.unwrap_or_else(|| field.name().as_str());
|
||||
if let Some(column) = batch.column_by_name(source_name) {
|
||||
if target_blob_properties.contains(field.name())
|
||||
&& source_blob_properties.contains(source_name)
|
||||
{
|
||||
let descriptions =
|
||||
column
|
||||
.as_any()
|
||||
.downcast_ref::<StructArray>()
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"expected blob descriptions for '{}.{}'",
|
||||
source_table_key, source_name
|
||||
))
|
||||
})?;
|
||||
let rebuilt = self
|
||||
.rebuild_blob_column(
|
||||
source_ds,
|
||||
source_name,
|
||||
descriptions,
|
||||
row_ids.as_deref().unwrap_or(&[]),
|
||||
)
|
||||
.await?;
|
||||
columns.push(rebuilt);
|
||||
} else {
|
||||
columns.push(column.clone());
|
||||
}
|
||||
} else {
|
||||
columns.push(batch.column_by_name(field.name()).cloned().ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"missing column '{}.{}' in rewrite batch",
|
||||
table_key,
|
||||
field.name()
|
||||
))
|
||||
})?);
|
||||
columns.push(new_null_array(field.data_type(), batch.num_rows()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2130,6 +2511,14 @@ fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
|
|||
build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
|
||||
}
|
||||
|
||||
fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
|
||||
match type_kind {
|
||||
SchemaTypeKind::Node => format!("node:{}", name),
|
||||
SchemaTypeKind::Edge => format!("edge:{}", name),
|
||||
SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
|
||||
}
|
||||
}
|
||||
|
||||
fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
|
||||
if let Some(type_name) = table_key.strip_prefix("node:") {
|
||||
let node_type: &NodeType = catalog
|
||||
|
|
@ -2327,8 +2716,10 @@ fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Va
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db::manifest::ManifestCoordinator;
|
||||
use async_trait::async_trait;
|
||||
use omnigraph_compiler::{SchemaMigrationStep, SchemaTypeKind};
|
||||
use serde_json::Value;
|
||||
use std::fs;
|
||||
use std::sync::Mutex;
|
||||
|
||||
|
|
@ -2621,6 +3012,182 @@ edge WorksAt: Person -> Company
|
|||
);
|
||||
}
|
||||
|
||||
async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
|
||||
let snapshot = db.snapshot();
|
||||
let ds = snapshot.open(table_key).await.unwrap();
|
||||
let batches = db.table_store().scan_batches(&ds).await.unwrap();
|
||||
batches
|
||||
.into_iter()
|
||||
.flat_map(|batch| {
|
||||
(0..batch.num_rows())
|
||||
.map(|row| record_batch_row_to_json(&batch, row).unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
|
||||
let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
|
||||
let schema: Arc<Schema> = Arc::new(ds.schema().into());
|
||||
let columns: Vec<Arc<dyn Array>> = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.map(|field| match field.name().as_str() {
|
||||
"id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
|
||||
"name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
|
||||
"age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
|
||||
_ => new_null_array(field.data_type(), 1),
|
||||
})
|
||||
.collect();
|
||||
let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
|
||||
let state = db
|
||||
.table_store()
|
||||
.append_batch(&full_path, &mut ds, batch)
|
||||
.await
|
||||
.unwrap();
|
||||
db.commit_updates(&[crate::db::SubTableUpdate {
|
||||
table_key: "node:Person".to_string(),
|
||||
table_version: state.version,
|
||||
table_branch,
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
}])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_noop_returns_not_applied() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
|
||||
let result = db.apply_schema(TEST_SCHEMA).await.unwrap();
|
||||
assert!(result.supported);
|
||||
assert!(!result.applied);
|
||||
assert!(result.steps.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
seed_person_row(&mut db, "Alice", Some(30)).await;
|
||||
|
||||
let desired = TEST_SCHEMA.replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
let result = db.apply_schema(&desired).await.unwrap();
|
||||
assert!(result.applied);
|
||||
|
||||
let reopened = Omnigraph::open(uri).await.unwrap();
|
||||
let rows = table_rows_json(&reopened, "node:Person").await;
|
||||
assert_eq!(rows.len(), 1);
|
||||
assert_eq!(rows[0]["name"], "Alice");
|
||||
assert_eq!(rows[0]["age"], 30);
|
||||
assert!(rows[0]["nickname"].is_null());
|
||||
assert!(
|
||||
reopened.catalog().node_types["Person"]
|
||||
.properties
|
||||
.contains_key("nickname")
|
||||
);
|
||||
assert!(dir.path().join("_schema.pg").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_renames_property_and_preserves_values() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
seed_person_row(&mut db, "Alice", Some(30)).await;
|
||||
|
||||
let desired = TEST_SCHEMA.replace(
|
||||
" age: I32?\n}",
|
||||
" years: I32? @rename_from(\"age\")\n}",
|
||||
);
|
||||
db.apply_schema(&desired).await.unwrap();
|
||||
|
||||
let reopened = Omnigraph::open(uri).await.unwrap();
|
||||
let rows = table_rows_json(&reopened, "node:Person").await;
|
||||
assert_eq!(rows[0]["name"], "Alice");
|
||||
assert_eq!(rows[0]["years"], 30);
|
||||
assert!(rows[0].get("age").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
seed_person_row(&mut db, "Alice", Some(30)).await;
|
||||
let before_version = db.snapshot().version();
|
||||
|
||||
let desired = TEST_SCHEMA
|
||||
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
|
||||
.replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
|
||||
.replace(
|
||||
"edge WorksAt: Person -> Company",
|
||||
"edge WorksAt: Human -> Company",
|
||||
);
|
||||
db.apply_schema(&desired).await.unwrap();
|
||||
|
||||
let head = db.snapshot();
|
||||
assert!(head.entry("node:Person").is_none());
|
||||
assert!(head.entry("node:Human").is_some());
|
||||
let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(historical.entry("node:Person").is_some());
|
||||
assert!(historical.entry("node:Human").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_rejects_when_non_main_branch_exists() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
db.branch_create("feature").await.unwrap();
|
||||
|
||||
let desired = TEST_SCHEMA.replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
let err = db.apply_schema(&desired).await.unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("schema apply requires a repo with only main")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_adds_index_for_existing_property() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
|
||||
let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
|
||||
db.apply_schema(&desired).await.unwrap();
|
||||
|
||||
let snapshot = db.snapshot();
|
||||
let ds = snapshot.open("node:Person").await.unwrap();
|
||||
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
let before_version = db.snapshot().version();
|
||||
|
||||
let desired = TEST_SCHEMA.replace("age: I32?", "age: I64?");
|
||||
let err = db.apply_schema(&desired).await.unwrap_err();
|
||||
assert!(err.to_string().contains("changing property type"));
|
||||
assert_eq!(db.snapshot().version(), before_version);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_nonexistent_fails() {
|
||||
let result = Omnigraph::open("/tmp/nonexistent_omnigraph_test_xyz").await;
|
||||
|
|
|
|||
|
|
@ -399,6 +399,20 @@ impl TableStore {
|
|||
self.append_batch(dataset_uri, ds, batch).await
|
||||
}
|
||||
|
||||
pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
|
||||
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Overwrite,
|
||||
enable_stable_row_ids: true,
|
||||
data_storage_version: Some(LanceFileVersion::V2_2),
|
||||
allow_external_blob_outside_bases: true,
|
||||
..Default::default()
|
||||
};
|
||||
Dataset::write(reader, dataset_uri, Some(params))
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
pub async fn merge_insert_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ and configure the matching `bearer_token_env` in `omnigraph.yaml`.
|
|||
|
||||
```bash
|
||||
omnigraph schema plan --schema ./next.pg ./repo.omni --json
|
||||
omnigraph schema apply --schema ./next.pg ./repo.omni --json
|
||||
omnigraph policy validate --config ./omnigraph.yaml
|
||||
omnigraph policy test --config ./omnigraph.yaml
|
||||
omnigraph policy explain --config ./omnigraph.yaml --actor act-alice --action read --branch main
|
||||
|
|
@ -87,3 +88,6 @@ The config file can also define:
|
|||
- auth env files
|
||||
- query aliases for common read and change commands
|
||||
- `policy.file` for Cedar authorization rules
|
||||
|
||||
When policy is enabled, `schema apply` is authorized through the
|
||||
`schema_apply` action and is typically limited to admins on protected `main`.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue