From 327eb821b5d9fae77cc0482cac915deea8cf1a24 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 12 May 2026 22:57:20 +0000 Subject: [PATCH] Remove orphaned loader/{constraints,embeddings,jsonl}.rs files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These three files in crates/omnigraph/src/loader/ have no `mod` declaration anywhere in the workspace and no `#[path = "…"]` reference. They are not compiled — `touch`-ing them does not trigger `cargo check` to recompile anything. Their imports (`crate::catalog::schema_ir`, `crate::error::NanoError`, `crate::store::manifest::hash_string`, `crate::types::ScalarType`, `super::super::graph::DatasetAccumulator`) reference modules that no longer exist in the engine crate, so they could not even be wired in without further work. They are vestigial code from an earlier monolithic crate layout. The live functionality is independently implemented inside crates/omnigraph/src/loader/mod.rs. These files have been orphaned since the initial public commit. `cargo check --workspace --all-targets` and `cargo test --workspace --no-run` both pass with no new warnings. Co-Authored-By: Ragnor Comerford --- crates/omnigraph/src/loader/constraints.rs | 476 ------ crates/omnigraph/src/loader/embeddings.rs | 1732 -------------------- crates/omnigraph/src/loader/jsonl.rs | 1532 ----------------- 3 files changed, 3740 deletions(-) delete mode 100644 crates/omnigraph/src/loader/constraints.rs delete mode 100644 crates/omnigraph/src/loader/embeddings.rs delete mode 100644 crates/omnigraph/src/loader/jsonl.rs diff --git a/crates/omnigraph/src/loader/constraints.rs b/crates/omnigraph/src/loader/constraints.rs deleted file mode 100644 index d76decb..0000000 --- a/crates/omnigraph/src/loader/constraints.rs +++ /dev/null @@ -1,476 +0,0 @@ -use std::collections::HashMap; -#[cfg(test)] -use std::collections::HashSet; - -use arrow_array::{ - Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, - Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array, -}; -use arrow_schema::{DataType, Field, Schema}; - -use crate::catalog::schema_ir::SchemaIR; -use crate::error::{NanoError, Result}; - -use super::super::graph::DatasetAccumulator; - -#[derive(Debug, Default)] -pub(crate) struct NodeConstraintAnnotations { - pub(crate) key_props: HashMap, - pub(crate) unique_props: HashMap>, -} - -pub(crate) fn load_node_constraint_annotations( - schema_ir: &SchemaIR, -) -> Result { - let mut constraints = NodeConstraintAnnotations::default(); - - for node in schema_ir.node_types() { - let mut node_key_prop: Option = None; - let mut node_unique_props: Vec = Vec::new(); - - for prop in &node.properties { - if prop.key && node_key_prop.replace(prop.name.clone()).is_some() { - return Err(NanoError::Storage(format!( - "node type {} has multiple @key properties; only one is currently supported", - node.name - ))); - } - if prop.unique { - node_unique_props.push(prop.name.clone()); - } - } - - if let Some(prop_name) = node_key_prop { - if !node_unique_props.contains(&prop_name) { - node_unique_props.push(prop_name.clone()); - } - constraints.key_props.insert(node.name.clone(), prop_name); - } - if !node_unique_props.is_empty() { - node_unique_props.sort(); - node_unique_props.dedup(); - constraints - .unique_props - .insert(node.name.clone(), node_unique_props); - } - } - - Ok(constraints) -} - -pub(crate) fn enforce_node_unique_constraints( - storage: &DatasetAccumulator, - unique_props: &HashMap>, -) -> Result<()> { - for (type_name, properties) in unique_props { - let Some(batch) = storage.get_all_nodes(type_name)? else { - continue; - }; - - for property in properties { - let prop_idx = - node_property_index(batch.schema().as_ref(), property).ok_or_else(|| { - NanoError::Storage(format!( - "node type {} missing @unique property {}", - type_name, property - )) - })?; - let arr = batch.column(prop_idx); - let mut seen: HashMap = HashMap::new(); - for row in 0..batch.num_rows() { - let Some(value) = unique_value_string(arr, row, type_name, property)? else { - continue; - }; - if let Some(prev_row) = seen.insert(value.clone(), row) { - return Err(NanoError::UniqueConstraint { - type_name: type_name.clone(), - property: property.clone(), - value, - first_row: prev_row, - second_row: row, - }); - } - } - } - } - Ok(()) -} - -#[cfg(test)] -pub(crate) fn collect_incoming_node_types(data_source: &str) -> Result> { - let mut node_types = HashSet::new(); - for line in data_source.lines() { - let line = line.trim(); - if line.is_empty() || line.starts_with("//") { - continue; - } - - let obj: serde_json::Value = serde_json::from_str(line) - .map_err(|e| NanoError::Storage(format!("JSON parse error: {}", e)))?; - if let Some(type_name) = obj.get("type").and_then(|v| v.as_str()) { - node_types.insert(type_name.to_string()); - } - } - Ok(node_types) -} - -pub(crate) fn build_name_seed_for_keyed_load( - storage: &DatasetAccumulator, - key_props: &HashMap, -) -> Result> { - let mut seed = HashMap::new(); - - for (type_name, key_prop) in key_props { - let Some(batch) = storage.get_all_nodes(type_name)? else { - continue; - }; - - let key_idx = node_property_index(batch.schema().as_ref(), key_prop).ok_or_else(|| { - NanoError::Storage(format!( - "node type {} missing @key property {}", - type_name, key_prop - )) - })?; - let key_arr = batch.column(key_idx).clone(); - let id_arr = batch - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - NanoError::Storage(format!("node type {} has non-UInt64 id column", type_name)) - })?; - - for row in 0..batch.num_rows() { - let key = key_value_string(&key_arr, row, key_prop)?; - seed.insert((type_name.clone(), key), id_arr.value(row)); - } - } - - Ok(seed) -} - -pub(crate) fn build_name_seed_for_append( - storage: &DatasetAccumulator, - key_props: &HashMap, -) -> Result> { - build_name_seed_for_keyed_load(storage, key_props) -} - -pub(crate) fn node_property_index(schema: &Schema, prop_name: &str) -> Option { - schema - .fields() - .iter() - .enumerate() - .skip(1) - .find_map(|(idx, field)| (field.name() == prop_name).then_some(idx)) -} - -pub(crate) fn node_property_field<'a>(schema: &'a Schema, prop_name: &str) -> Option<&'a Field> { - node_property_index(schema, prop_name).map(|idx| schema.field(idx)) -} - -pub(crate) fn key_value_string(array: &ArrayRef, row: usize, prop_name: &str) -> Result { - let value = scalar_value_string(array, row, "key", None, prop_name)?; - if let Some(value) = value { - return Ok(value); - } - Err(NanoError::Storage(format!( - "@key property {} cannot be null", - prop_name - ))) -} - -fn unique_value_string( - array: &ArrayRef, - row: usize, - type_name: &str, - prop_name: &str, -) -> Result> { - scalar_value_string(array, row, "unique", Some(type_name), prop_name) -} - -fn scalar_value_string( - array: &ArrayRef, - row: usize, - annotation: &str, - type_name: Option<&str>, - prop_name: &str, -) -> Result> { - if array.is_null(row) { - return Ok(None); - } - - let value = match array.data_type() { - DataType::Utf8 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Boolean => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Int32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Int64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::UInt32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::UInt64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Float32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Float64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Date32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Date64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - _ => None, - }; - - let value = value.ok_or_else(|| { - let target = match type_name { - Some(name) => format!("{}.{}", name, prop_name), - None => prop_name.to_string(), - }; - NanoError::Storage(format!( - "unsupported @{} data type {:?} for {}", - annotation, - array.data_type(), - target - )) - })?; - - Ok(Some(value)) -} - -#[cfg(test)] -mod tests { - use std::collections::{HashMap, HashSet}; - - use arrow_array::StringArray; - - use crate::catalog::schema_ir::{build_catalog_from_ir, build_schema_ir}; - use crate::schema::parser::parse_schema; - - use super::super::jsonl::load_jsonl_data; - use super::*; - - fn build_schema_ir_and_storage(schema_src: &str) -> (SchemaIR, DatasetAccumulator) { - let schema = parse_schema(schema_src).unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let catalog = build_catalog_from_ir(&ir).unwrap(); - (ir, DatasetAccumulator::new(catalog)) - } - - #[test] - fn load_node_constraint_annotations_collects_key_and_unique() { - let schema = r#"node Person { - name: String @key - email: String @unique - alias: String? @unique -}"#; - let (ir, _) = build_schema_ir_and_storage(schema); - let annotations = load_node_constraint_annotations(&ir).unwrap(); - - assert_eq!(annotations.key_props.get("Person").unwrap(), "name"); - assert_eq!( - annotations.unique_props.get("Person").unwrap(), - &vec!["alias".to_string(), "email".to_string(), "name".to_string()] - ); - } - - #[test] - fn collect_incoming_node_types_ignores_comments_and_blanks() { - let data = r#" -// comment -{"type":"Person","data":{"name":"Alice"}} - -{"edge":"Knows","from":"Alice","to":"Bob"} -{"type":"Company","data":{"name":"Acme"}} -"#; - let types = collect_incoming_node_types(data).unwrap(); - assert_eq!( - types, - HashSet::from(["Person".to_string(), "Company".to_string()]) - ); - } - - #[test] - fn enforce_node_unique_constraints_detects_duplicate_non_null() { - let schema = r#"node Person { - name: String - email: String? @unique -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::new(); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Alice","email":"dupe@example.com"}} -{"type":"Person","data":{"name":"Bob","email":"dupe@example.com"}}"#, - &key_props, - ) - .unwrap(); - - let unique_props = HashMap::from([("Person".to_string(), vec!["email".to_string()])]); - let err = enforce_node_unique_constraints(&storage, &unique_props).unwrap_err(); - match err { - NanoError::UniqueConstraint { - type_name, - property, - value, - .. - } => { - assert_eq!(type_name, "Person"); - assert_eq!(property, "email"); - assert_eq!(value, "dupe@example.com"); - } - other => panic!("expected UniqueConstraint, got {other}"), - } - } - - #[test] - fn enforce_node_unique_constraints_allows_multiple_nulls() { - let schema = r#"node Person { - name: String - nick: String? @unique -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::new(); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Alice","nick":null}} -{"type":"Person","data":{"name":"Bob","nick":null}}"#, - &key_props, - ) - .unwrap(); - - let unique_props = HashMap::from([("Person".to_string(), vec!["nick".to_string()])]); - enforce_node_unique_constraints(&storage, &unique_props).unwrap(); - } - - #[test] - fn enforce_node_unique_constraints_uses_user_property_named_id() { - let schema = r#"node Person { - id: String @unique - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::new(); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"id":"user-1","name":"Alice"}} -{"type":"Person","data":{"id":"user-1","name":"Bob"}}"#, - &key_props, - ) - .unwrap(); - - let unique_props = HashMap::from([("Person".to_string(), vec!["id".to_string()])]); - let err = enforce_node_unique_constraints(&storage, &unique_props).unwrap_err(); - match err { - NanoError::UniqueConstraint { - type_name, - property, - value, - .. - } => { - assert_eq!(type_name, "Person"); - assert_eq!(property, "id"); - assert_eq!(value, "user-1"); - } - other => panic!("expected UniqueConstraint, got {other}"), - } - } - - #[test] - fn build_name_seed_for_keyed_load_uses_declared_key_property() { - let schema = r#"node Person { - uid: String @key - name: String -} -node Company { - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::from([("Person".to_string(), "uid".to_string())]); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"uid":"u1","name":"Alice"}} -{"type":"Company","data":{"name":"Acme"}}"#, - &key_props, - ) - .unwrap(); - - let seed = build_name_seed_for_keyed_load(&storage, &key_props).unwrap(); - - assert!(seed.contains_key(&("Person".to_string(), "u1".to_string()))); - assert!(!seed.contains_key(&("Company".to_string(), "Acme".to_string()))); - } - - #[test] - fn build_name_seed_for_keyed_load_uses_user_property_named_id() { - let schema = r#"node Person { - id: String @key - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::from([("Person".to_string(), "id".to_string())]); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"id":"user-1","name":"Alice"}}"#, - &key_props, - ) - .unwrap(); - - let seed = build_name_seed_for_keyed_load(&storage, &key_props).unwrap(); - assert!(seed.contains_key(&("Person".to_string(), "user-1".to_string()))); - } - - #[test] - fn build_name_seed_for_append_keeps_all_existing_keyed_nodes() { - let schema = r#"node Person { - uid: String @key - name: String -} -node Company { - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::from([("Person".to_string(), "uid".to_string())]); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"uid":"u1","name":"Alice"}} -{"type":"Company","data":{"name":"Acme"}}"#, - &key_props, - ) - .unwrap(); - - let seed = build_name_seed_for_append(&storage, &key_props).unwrap(); - assert!(seed.contains_key(&("Person".to_string(), "u1".to_string()))); - assert!(!seed.contains_key(&("Company".to_string(), "Acme".to_string()))); - } - - #[test] - fn key_value_string_rejects_null() { - let arr: ArrayRef = std::sync::Arc::new(StringArray::from(vec![Some("x"), None])); - assert_eq!(key_value_string(&arr, 0, "name").unwrap(), "x"); - let err = key_value_string(&arr, 1, "name").unwrap_err(); - assert!(err.to_string().contains("cannot be null")); - } -} diff --git a/crates/omnigraph/src/loader/embeddings.rs b/crates/omnigraph/src/loader/embeddings.rs deleted file mode 100644 index 58ecb93..0000000 --- a/crates/omnigraph/src/loader/embeddings.rs +++ /dev/null @@ -1,1732 +0,0 @@ -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; -use std::io::{BufRead, BufWriter, Write}; -use std::path::{Path, PathBuf}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use serde::{Deserialize, Serialize}; - -use crate::catalog::schema_ir::{PropDef, SchemaIR}; -use crate::embedding::EmbeddingClient; -use crate::error::{NanoError, Result}; -use crate::store::manifest::hash_string; -use crate::types::ScalarType; - -const EMBEDDING_CACHE_FILENAME: &str = "_embedding_cache.jsonl"; -const DEFAULT_EMBED_BATCH_SIZE: usize = 64; -const DEFAULT_EMBED_CHUNK_CHARS: usize = 0; -const DEFAULT_EMBED_CHUNK_OVERLAP_CHARS: usize = 128; -const DEFAULT_EMBED_CACHE_MAX_ENTRIES: usize = 50_000; -const DEFAULT_EMBED_CACHE_LOCK_STALE_SECS: usize = 60; -const EMBEDDING_CACHE_LOCK_RETRIES: usize = 200; -const EMBEDDING_CACHE_LOCK_RETRY_DELAY_MS: u64 = 10; - -#[derive(Debug, Clone)] -pub(crate) struct EmbedSpec { - pub target_prop: String, - pub source_prop: String, - pub dim: usize, -} - -#[derive(Debug, Clone)] -pub(crate) struct EmbedValueRequest { - pub source_text: String, - pub dim: usize, -} - -#[cfg_attr(not(test), allow(dead_code))] -#[derive(Debug, Clone)] -struct PendingAssignment { - line_index: usize, - target_prop: String, - source_text: String, - dim: usize, - content_hash: String, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct CacheKey { - model: String, - dim: usize, - content_hash: String, - chunk_chars: usize, - chunk_overlap_chars: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct CacheRecord { - model: String, - dim: usize, - content_hash: String, - vector: Vec, - #[serde(default)] - chunk_chars: usize, - #[serde(default)] - chunk_overlap_chars: usize, -} - -enum ParsedLine { - Raw(String), - Json(serde_json::Value), -} - -struct StreamPendingLine { - line_id: usize, - line: ParsedLine, - missing_assignments: usize, -} - -#[derive(Debug, Clone)] -struct StreamPendingAssignment { - line_id: usize, - target_prop: String, - source_text: String, - dim: usize, - content_hash: String, -} - -impl StreamPendingAssignment { - fn cache_key(&self, model: &str, chunking: EmbedChunkingConfig) -> CacheKey { - CacheKey { - model: model.to_string(), - dim: self.dim, - content_hash: self.content_hash.clone(), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct EmbedChunkingConfig { - chunk_chars: usize, - chunk_overlap_chars: usize, -} - -impl EmbedChunkingConfig { - fn from_env() -> Self { - let chunk_chars = parse_env_usize("NANOGRAPH_EMBED_CHUNK_CHARS", DEFAULT_EMBED_CHUNK_CHARS); - let overlap = parse_env_usize( - "NANOGRAPH_EMBED_CHUNK_OVERLAP_CHARS", - DEFAULT_EMBED_CHUNK_OVERLAP_CHARS, - ); - Self::new(chunk_chars, overlap) - } - - fn new(chunk_chars: usize, chunk_overlap_chars: usize) -> Self { - let chunk_overlap_chars = if chunk_chars == 0 { - 0 - } else { - chunk_overlap_chars.min(chunk_chars.saturating_sub(1)) - }; - Self { - chunk_chars, - chunk_overlap_chars, - } - } - - fn is_enabled(self) -> bool { - self.chunk_chars > 0 - } -} - -#[allow(dead_code)] -pub(crate) async fn materialize_embeddings_for_load( - db_path: &Path, - schema_ir: &SchemaIR, - data_source: &str, -) -> Result { - materialize_embeddings_for_load_inner(db_path, schema_ir, data_source, None).await -} - -#[cfg_attr(not(test), allow(dead_code))] -async fn materialize_embeddings_for_load_inner( - db_path: &Path, - schema_ir: &SchemaIR, - data_source: &str, - client_override: Option<&EmbeddingClient>, -) -> Result { - materialize_embeddings_for_load_inner_with_chunking( - db_path, - schema_ir, - data_source, - client_override, - EmbedChunkingConfig::from_env(), - ) - .await -} - -pub(crate) fn has_embedding_specs(schema_ir: &SchemaIR) -> bool { - schema_ir.node_types().any(|node| { - node.properties - .iter() - .any(|prop| prop.embed_source.is_some()) - }) -} - -pub(crate) async fn materialize_embeddings_for_load_to_tempfile( - db_path: &Path, - schema_ir: &SchemaIR, - reader: R, -) -> Result { - materialize_embeddings_for_load_to_tempfile_inner(db_path, schema_ir, reader, None).await -} - -pub(crate) async fn resolve_embedding_requests( - db_path: &Path, - requests: &[EmbedValueRequest], -) -> Result>> { - resolve_embedding_requests_with_chunking(db_path, requests, EmbedChunkingConfig::from_env()) - .await -} - -async fn materialize_embeddings_for_load_to_tempfile_inner( - db_path: &Path, - schema_ir: &SchemaIR, - reader: R, - client_override: Option<&EmbeddingClient>, -) -> Result { - materialize_embeddings_for_load_to_tempfile_inner_with_chunking( - db_path, - schema_ir, - reader, - client_override, - EmbedChunkingConfig::from_env(), - ) - .await -} - -async fn resolve_embedding_requests_with_chunking( - db_path: &Path, - requests: &[EmbedValueRequest], - chunking: EmbedChunkingConfig, -) -> Result>> { - if requests.is_empty() { - return Ok(Vec::new()); - } - - let cache_path = db_path.join(EMBEDDING_CACHE_FILENAME); - let mut cache = load_embedding_cache(&cache_path)?; - let client = EmbeddingClient::from_env() - .map_err(|err| NanoError::Storage(format!("embedding initialization failed: {}", err)))?; - let model = client.model().to_string(); - let batch_size = parse_env_usize("NANOGRAPH_EMBED_BATCH_SIZE", DEFAULT_EMBED_BATCH_SIZE); - - let mut results: Vec>> = vec![None; requests.len()]; - let mut missing_by_dim: BTreeMap> = BTreeMap::new(); - let mut missing_indices: HashMap> = HashMap::new(); - - for (idx, request) in requests.iter().enumerate() { - if request.dim == 0 { - return Err(NanoError::Storage( - "embedding dimension must be greater than zero".to_string(), - )); - } - - let key = CacheKey { - model: model.clone(), - dim: request.dim, - content_hash: hash_string(&request.source_text), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - }; - - if let Some(vector) = cache.get(&key) { - results[idx] = Some(vector.clone()); - continue; - } - - missing_indices.entry(key.clone()).or_default().push(idx); - let entries = missing_by_dim.entry(request.dim).or_default(); - if !entries.iter().any(|(existing, _)| existing == &key) { - entries.push((key, request.source_text.clone())); - } - } - - let mut new_cache_records = Vec::new(); - for (dim, entries) in missing_by_dim { - if chunking.is_enabled() { - for (key, text) in entries { - let vector = - embed_text_with_chunking(&client, &text, dim, batch_size, chunking).await?; - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - continue; - } - - for chunk in entries.chunks(batch_size.max(1)) { - let texts: Vec = chunk.iter().map(|(_, text)| text.clone()).collect(); - let vectors = client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if vectors.len() != chunk.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - chunk.len(), - vectors.len() - ))); - } - - for ((key, _), vector) in chunk.iter().zip(vectors.into_iter()) { - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - } - } - - append_embedding_cache(&cache_path, &new_cache_records)?; - - for (key, indices) in missing_indices { - let vector = cache.get(&key).ok_or_else(|| { - NanoError::Storage(format!( - "embedding cache miss for content hash {}", - key.content_hash - )) - })?; - for idx in indices { - results[idx] = Some(vector.clone()); - } - } - - results - .into_iter() - .enumerate() - .map(|(idx, vector)| { - vector.ok_or_else(|| { - NanoError::Storage(format!( - "missing embedding result for request index {}", - idx - )) - }) - }) - .collect() -} - -async fn materialize_embeddings_for_load_to_tempfile_inner_with_chunking( - db_path: &Path, - schema_ir: &SchemaIR, - reader: R, - client_override: Option<&EmbeddingClient>, - chunking: EmbedChunkingConfig, -) -> Result { - let output_path = create_materialized_temp_file(db_path)?; - let embed_specs = collect_embed_specs(schema_ir)?; - let cache_path = db_path.join(EMBEDDING_CACHE_FILENAME); - - if embed_specs.is_empty() { - let mut writer = BufWriter::new(std::fs::File::create(&output_path)?); - copy_reader_to_writer(reader, &mut writer)?; - writer.flush()?; - return Ok(output_path); - } - - let mut cache = load_embedding_cache(&cache_path)?; - let owned_client; - let client = if let Some(client) = client_override { - client - } else { - owned_client = EmbeddingClient::from_env().map_err(|err| { - NanoError::Storage(format!("embedding initialization failed: {}", err)) - })?; - &owned_client - }; - let model = client.model().to_string(); - let batch_size = parse_env_usize("NANOGRAPH_EMBED_BATCH_SIZE", DEFAULT_EMBED_BATCH_SIZE); - let mut writer = BufWriter::new(std::fs::File::create(&output_path)?); - let mut pending_lines: VecDeque = VecDeque::new(); - let mut pending_by_dim: BTreeMap> = BTreeMap::new(); - let mut new_cache_records = Vec::new(); - let mut next_line_id = 0usize; - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() || trimmed.starts_with("//") { - pending_lines.push_back(StreamPendingLine { - line_id: next_line_id, - line: ParsedLine::Raw(line), - missing_assignments: 0, - }); - next_line_id += 1; - flush_ready_stream_lines(&mut writer, &mut pending_lines)?; - continue; - } - - let mut obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!("JSON parse error on line {}: {}", line_no + 1, e)) - })?; - let mut output_line = ParsedLine::Raw(line); - let mut missing_assignments = 0usize; - - if let Some(type_name) = obj - .get("type") - .and_then(|value| value.as_str()) - .map(|value| value.to_string()) - && let Some(specs) = embed_specs.get(type_name.as_str()) - { - let data_obj = obj - .get_mut("data") - .and_then(|value| value.as_object_mut()) - .ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} is missing object field `data`", - type_name, - line_no + 1 - )) - })?; - let mut mutated = false; - - for spec in specs { - let needs_embedding = match data_obj.get(&spec.target_prop) { - Some(value) => value.is_null(), - None => true, - }; - if !needs_embedding { - continue; - } - - let source_value = data_obj.get(&spec.source_prop).ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} missing @embed source property `{}` for `{}`", - type_name, - line_no + 1, - spec.source_prop, - spec.target_prop - )) - })?; - let source_text = source_value.as_str().ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} @embed source property `{}` must be String", - type_name, - line_no + 1, - spec.source_prop - )) - })?; - - let assignment = StreamPendingAssignment { - line_id: next_line_id, - target_prop: spec.target_prop.clone(), - source_text: source_text.to_string(), - dim: spec.dim, - content_hash: hash_string(source_text), - }; - let cache_key = assignment.cache_key(&model, chunking); - if let Some(vector) = cache.get(&cache_key) { - data_obj.insert( - spec.target_prop.clone(), - serde_json::to_value(vector).map_err(|e| { - NanoError::Storage(format!("serialize embedding vector failed: {}", e)) - })?, - ); - } else { - missing_assignments += 1; - pending_by_dim - .entry(spec.dim) - .or_default() - .push_back(assignment); - } - mutated = true; - } - - if mutated { - output_line = ParsedLine::Json(obj); - } - } - - pending_lines.push_back(StreamPendingLine { - line_id: next_line_id, - line: output_line, - missing_assignments, - }); - next_line_id += 1; - - let mut runtime = StreamEmbedRuntime { - cache: &mut cache, - model: &model, - client, - new_cache_records: &mut new_cache_records, - batch_size, - chunking, - }; - resolve_pending_stream_batches( - &mut pending_by_dim, - &mut pending_lines, - &mut runtime, - false, - ) - .await?; - flush_ready_stream_lines(&mut writer, &mut pending_lines)?; - } - - let mut runtime = StreamEmbedRuntime { - cache: &mut cache, - model: &model, - client, - new_cache_records: &mut new_cache_records, - batch_size, - chunking, - }; - resolve_pending_stream_batches(&mut pending_by_dim, &mut pending_lines, &mut runtime, true) - .await?; - flush_ready_stream_lines(&mut writer, &mut pending_lines)?; - writer.flush()?; - - if !pending_lines.is_empty() { - return Err(NanoError::Storage( - "embedding materialization left unresolved output rows".to_string(), - )); - } - - append_embedding_cache(&cache_path, &new_cache_records)?; - Ok(output_path) -} - -#[cfg_attr(not(test), allow(dead_code))] -async fn materialize_embeddings_for_load_inner_with_chunking( - db_path: &Path, - schema_ir: &SchemaIR, - data_source: &str, - client_override: Option<&EmbeddingClient>, - chunking: EmbedChunkingConfig, -) -> Result { - let embed_specs = collect_embed_specs(schema_ir)?; - if embed_specs.is_empty() { - return Ok(data_source.to_string()); - } - - let mut lines = Vec::new(); - let mut pending = Vec::new(); - parse_input_lines(data_source, &embed_specs, &mut lines, &mut pending)?; - if pending.is_empty() { - return Ok(data_source.to_string()); - } - - let cache_path = db_path.join(EMBEDDING_CACHE_FILENAME); - let mut cache = load_embedding_cache(&cache_path)?; - - let owned_client; - let client = if let Some(client) = client_override { - client - } else { - owned_client = EmbeddingClient::from_env().map_err(|err| { - NanoError::Storage(format!("embedding initialization failed: {}", err)) - })?; - &owned_client - }; - let model = client.model().to_string(); - - let mut missing_by_dim: BTreeMap> = BTreeMap::new(); - for assignment in &pending { - let key = CacheKey { - model: model.clone(), - dim: assignment.dim, - content_hash: assignment.content_hash.clone(), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - }; - if cache.contains_key(&key) { - continue; - } - let entries = missing_by_dim.entry(assignment.dim).or_default(); - if !entries.iter().any(|(existing, _)| existing == &key) { - entries.push((key, assignment.source_text.clone())); - } - } - - let batch_size = parse_env_usize("NANOGRAPH_EMBED_BATCH_SIZE", DEFAULT_EMBED_BATCH_SIZE); - let mut new_cache_records = Vec::new(); - for (dim, entries) in missing_by_dim { - if chunking.is_enabled() { - for (key, text) in entries { - let vector = - embed_text_with_chunking(client, &text, dim, batch_size, chunking).await?; - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - continue; - } - - for chunk in entries.chunks(batch_size) { - let texts: Vec = chunk.iter().map(|(_, text)| text.clone()).collect(); - let vectors = client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if vectors.len() != chunk.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - chunk.len(), - vectors.len() - ))); - } - for ((key, _), vector) in chunk.iter().zip(vectors.into_iter()) { - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - } - } - append_embedding_cache(&cache_path, &new_cache_records)?; - - apply_embeddings_to_lines(&mut lines, &pending, &cache, &model, chunking)?; - render_output_lines(data_source, lines) -} - -#[cfg_attr(not(test), allow(dead_code))] -fn parse_input_lines( - data_source: &str, - embed_specs: &HashMap>, - lines: &mut Vec, - pending: &mut Vec, -) -> Result<()> { - for (line_no, raw_line) in data_source.lines().enumerate() { - let trimmed = raw_line.trim(); - if trimmed.is_empty() || trimmed.starts_with("//") { - lines.push(ParsedLine::Raw(raw_line.to_string())); - continue; - } - - let mut obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!("JSON parse error on line {}: {}", line_no + 1, e)) - })?; - - if let Some(type_name) = obj - .get("type") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - && let Some(specs) = embed_specs.get(type_name.as_str()) - { - let data_obj = obj - .get_mut("data") - .and_then(|v| v.as_object_mut()) - .ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} is missing object field `data`", - type_name, - line_no + 1 - )) - })?; - let line_index = lines.len(); - - for spec in specs { - let needs_embedding = match data_obj.get(&spec.target_prop) { - Some(value) => value.is_null(), - None => true, - }; - if !needs_embedding { - continue; - } - - let source_value = data_obj.get(&spec.source_prop).ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} missing @embed source property `{}` for `{}`", - type_name, - line_no + 1, - spec.source_prop, - spec.target_prop - )) - })?; - let source_text = source_value.as_str().ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} @embed source property `{}` must be String", - type_name, - line_no + 1, - spec.source_prop - )) - })?; - - pending.push(PendingAssignment { - line_index, - target_prop: spec.target_prop.clone(), - source_text: source_text.to_string(), - dim: spec.dim, - content_hash: hash_string(source_text), - }); - } - } - - lines.push(ParsedLine::Json(obj)); - } - Ok(()) -} - -#[cfg_attr(not(test), allow(dead_code))] -fn apply_embeddings_to_lines( - lines: &mut [ParsedLine], - pending: &[PendingAssignment], - cache: &HashMap>, - model: &str, - chunking: EmbedChunkingConfig, -) -> Result<()> { - for assignment in pending { - let key = CacheKey { - model: model.to_string(), - dim: assignment.dim, - content_hash: assignment.content_hash.clone(), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - }; - let vector = cache.get(&key).ok_or_else(|| { - NanoError::Storage(format!( - "embedding cache miss for content hash {}", - assignment.content_hash - )) - })?; - let line = lines.get_mut(assignment.line_index).ok_or_else(|| { - NanoError::Storage(format!( - "embedding assignment line out of range: {}", - assignment.line_index - )) - })?; - let ParsedLine::Json(obj) = line else { - return Err(NanoError::Storage(format!( - "embedding assignment line {} is not JSON", - assignment.line_index - ))); - }; - let data_obj = obj - .get_mut("data") - .and_then(|v| v.as_object_mut()) - .ok_or_else(|| { - NanoError::Storage("node row is missing object field `data`".to_string()) - })?; - data_obj.insert( - assignment.target_prop.clone(), - serde_json::to_value(vector).map_err(|e| { - NanoError::Storage(format!("serialize embedding vector failed: {}", e)) - })?, - ); - } - Ok(()) -} - -#[cfg_attr(not(test), allow(dead_code))] -fn render_output_lines(original: &str, lines: Vec) -> Result { - let mut out = String::new(); - for (idx, line) in lines.into_iter().enumerate() { - if idx > 0 { - out.push('\n'); - } - match line { - ParsedLine::Raw(raw) => out.push_str(&raw), - ParsedLine::Json(obj) => { - out.push_str(&serde_json::to_string(&obj).map_err(|e| { - NanoError::Storage(format!("serialize JSONL row failed: {}", e)) - })?) - } - } - } - if original.ends_with('\n') { - out.push('\n'); - } - Ok(out) -} - -async fn resolve_pending_stream_batches( - pending_by_dim: &mut BTreeMap>, - pending_lines: &mut VecDeque, - runtime: &mut StreamEmbedRuntime<'_>, - flush_all: bool, -) -> Result<()> { - loop { - let next_dim = pending_by_dim - .iter() - .find(|(_, queue)| { - if flush_all { - !queue.is_empty() - } else { - queue.len() >= runtime.batch_size.max(1) - } - }) - .map(|(dim, _)| *dim); - let Some(dim) = next_dim else { - break; - }; - - let queue = pending_by_dim.get_mut(&dim).ok_or_else(|| { - NanoError::Storage(format!("missing pending embedding queue for dim {}", dim)) - })?; - resolve_pending_stream_batch(queue, pending_lines, runtime).await?; - if queue.is_empty() { - pending_by_dim.remove(&dim); - } - } - - Ok(()) -} - -async fn resolve_pending_stream_batch( - queue: &mut VecDeque, - pending_lines: &mut VecDeque, - runtime: &mut StreamEmbedRuntime<'_>, -) -> Result<()> { - let batch_size = runtime.batch_size.max(1); - let mut assignments = Vec::new(); - let mut unique_entries = Vec::new(); - let mut seen_keys = HashSet::new(); - - while let Some(assignment) = queue.pop_front() { - let cache_key = assignment.cache_key(runtime.model, runtime.chunking); - if seen_keys.insert(cache_key.clone()) { - unique_entries.push((cache_key, assignment.source_text.clone())); - } - assignments.push(assignment); - if unique_entries.len() >= batch_size { - break; - } - } - - if unique_entries.is_empty() { - return Ok(()); - } - - if runtime.chunking.is_enabled() { - for (cache_key, text) in &unique_entries { - let vector = embed_text_with_chunking( - runtime.client, - text, - cache_key.dim, - batch_size, - runtime.chunking, - ) - .await?; - if vector.len() != cache_key.dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - cache_key.content_hash, - cache_key.dim, - vector.len() - ))); - } - runtime.cache.insert(cache_key.clone(), vector.clone()); - runtime.new_cache_records.push(CacheRecord { - model: cache_key.model.clone(), - dim: cache_key.dim, - content_hash: cache_key.content_hash.clone(), - vector, - chunk_chars: cache_key.chunk_chars, - chunk_overlap_chars: cache_key.chunk_overlap_chars, - }); - } - } else { - let texts: Vec = unique_entries - .iter() - .map(|(_, text)| text.clone()) - .collect(); - let dim = unique_entries[0].0.dim; - let vectors = runtime - .client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if vectors.len() != unique_entries.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - unique_entries.len(), - vectors.len() - ))); - } - - for ((cache_key, _), vector) in unique_entries.iter().zip(vectors.into_iter()) { - if vector.len() != cache_key.dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - cache_key.content_hash, - cache_key.dim, - vector.len() - ))); - } - runtime.cache.insert(cache_key.clone(), vector.clone()); - runtime.new_cache_records.push(CacheRecord { - model: cache_key.model.clone(), - dim: cache_key.dim, - content_hash: cache_key.content_hash.clone(), - vector, - chunk_chars: cache_key.chunk_chars, - chunk_overlap_chars: cache_key.chunk_overlap_chars, - }); - } - } - - for assignment in &assignments { - apply_stream_assignment( - pending_lines, - assignment, - runtime.cache, - runtime.model, - runtime.chunking, - )?; - } - - Ok(()) -} - -struct StreamEmbedRuntime<'a> { - cache: &'a mut HashMap>, - model: &'a str, - client: &'a EmbeddingClient, - new_cache_records: &'a mut Vec, - batch_size: usize, - chunking: EmbedChunkingConfig, -} - -fn apply_stream_assignment( - pending_lines: &mut VecDeque, - assignment: &StreamPendingAssignment, - cache: &HashMap>, - model: &str, - chunking: EmbedChunkingConfig, -) -> Result<()> { - let cache_key = assignment.cache_key(model, chunking); - let vector = cache.get(&cache_key).ok_or_else(|| { - NanoError::Storage(format!( - "embedding cache miss for content hash {}", - assignment.content_hash - )) - })?; - let line = pending_lines - .iter_mut() - .find(|line| line.line_id == assignment.line_id) - .ok_or_else(|| { - NanoError::Storage(format!( - "embedding assignment line out of range: {}", - assignment.line_id - )) - })?; - let ParsedLine::Json(obj) = &mut line.line else { - return Err(NanoError::Storage(format!( - "embedding assignment line {} is not JSON", - assignment.line_id - ))); - }; - let data_obj = obj - .get_mut("data") - .and_then(|value| value.as_object_mut()) - .ok_or_else(|| NanoError::Storage("node row is missing object field `data`".to_string()))?; - data_obj.insert( - assignment.target_prop.clone(), - serde_json::to_value(vector) - .map_err(|e| NanoError::Storage(format!("serialize embedding vector failed: {}", e)))?, - ); - if line.missing_assignments == 0 { - return Err(NanoError::Storage(format!( - "embedding assignment line {} underflow", - assignment.line_id - ))); - } - line.missing_assignments -= 1; - Ok(()) -} - -fn flush_ready_stream_lines( - writer: &mut BufWriter, - pending_lines: &mut VecDeque, -) -> Result<()> { - while pending_lines - .front() - .map(|line| line.missing_assignments == 0) - .unwrap_or(false) - { - let line = pending_lines.pop_front().ok_or_else(|| { - NanoError::Storage("pending embedding output queue unexpectedly empty".to_string()) - })?; - match line.line { - ParsedLine::Raw(raw) => writer.write_all(raw.as_bytes())?, - ParsedLine::Json(obj) => serde_json::to_writer(&mut *writer, &obj) - .map_err(|e| NanoError::Storage(format!("serialize JSONL row failed: {}", e)))?, - } - writer.write_all(b"\n")?; - } - Ok(()) -} - -fn copy_reader_to_writer( - reader: R, - writer: &mut BufWriter, -) -> Result<()> { - for line in reader.lines() { - let line = line?; - writer.write_all(line.as_bytes())?; - writer.write_all(b"\n")?; - } - Ok(()) -} - -fn create_materialized_temp_file(db_path: &Path) -> Result { - std::fs::create_dir_all(db_path)?; - let pid = std::process::id(); - for attempt in 0..256u32 { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - let path = db_path.join(format!( - ".nanograph_embed_materialized_{}_{}_{}.jsonl", - pid, now, attempt - )); - match std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&path) - { - Ok(_) => return Ok(path), - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue, - Err(err) => return Err(err.into()), - } - } - - Err(NanoError::Storage( - "failed to create temp embedding materialization file".to_string(), - )) -} - -async fn embed_text_with_chunking( - client: &EmbeddingClient, - source_text: &str, - dim: usize, - batch_size: usize, - chunking: EmbedChunkingConfig, -) -> Result> { - let chunks = split_text_into_chunks( - source_text, - chunking.chunk_chars, - chunking.chunk_overlap_chars, - ); - if chunks.len() == 1 { - return client - .embed_text(&chunks[0], dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err))); - } - - let batch_size = batch_size.max(1); - let mut vectors = Vec::with_capacity(chunks.len()); - for chunk_batch in chunks.chunks(batch_size) { - let texts: Vec = chunk_batch.to_vec(); - let mut embedded = client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if embedded.len() != texts.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - texts.len(), - embedded.len() - ))); - } - vectors.append(&mut embedded); - } - - average_pool_embeddings(&vectors, dim) -} - -fn split_text_into_chunks(text: &str, chunk_chars: usize, overlap_chars: usize) -> Vec { - if chunk_chars == 0 { - return vec![text.to_string()]; - } - - let total_chars = text.chars().count(); - if total_chars <= chunk_chars { - return vec![text.to_string()]; - } - - let mut char_boundaries = Vec::with_capacity(total_chars + 1); - char_boundaries.push(0); - for (idx, _) in text.char_indices().skip(1) { - char_boundaries.push(idx); - } - char_boundaries.push(text.len()); - - let step = chunk_chars.saturating_sub(overlap_chars).max(1); - let mut out = Vec::new(); - let mut start_char = 0usize; - while start_char < total_chars { - let end_char = (start_char + chunk_chars).min(total_chars); - let start_byte = char_boundaries[start_char]; - let end_byte = char_boundaries[end_char]; - out.push(text[start_byte..end_byte].to_string()); - if end_char == total_chars { - break; - } - start_char = start_char.saturating_add(step); - } - - if out.is_empty() { - vec![text.to_string()] - } else { - out - } -} - -fn average_pool_embeddings(vectors: &[Vec], dim: usize) -> Result> { - if vectors.is_empty() { - return Err(NanoError::Storage( - "embedding aggregation received no chunk vectors".to_string(), - )); - } - - let mut accum = vec![0.0f64; dim]; - for vector in vectors { - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch during chunk aggregation: expected {}, got {}", - dim, - vector.len() - ))); - } - for (idx, value) in vector.iter().enumerate() { - accum[idx] += *value as f64; - } - } - - let inv_len = 1.0f64 / vectors.len() as f64; - let mut pooled: Vec = accum - .into_iter() - .map(|sum| (sum * inv_len) as f32) - .collect(); - let norm = pooled - .iter() - .map(|v| (*v as f64) * (*v as f64)) - .sum::() - .sqrt() as f32; - if norm > f32::EPSILON { - for value in &mut pooled { - *value /= norm; - } - } - Ok(pooled) -} - -pub(crate) fn collect_embed_specs(schema_ir: &SchemaIR) -> Result>> { - let mut specs_by_type: HashMap> = HashMap::new(); - for node in schema_ir.node_types() { - let mut prop_by_name: HashMap<&str, &PropDef> = HashMap::new(); - for prop in &node.properties { - prop_by_name.insert(prop.name.as_str(), prop); - } - - let mut node_specs = Vec::new(); - for prop in &node.properties { - let Some(source_prop) = prop.embed_source.as_ref() else { - continue; - }; - - if prop.list { - return Err(NanoError::Storage(format!( - "@embed target {}.{} cannot be a list type", - node.name, prop.name - ))); - } - let dim = match ScalarType::from_str_name(&prop.scalar_type) { - Some(ScalarType::Vector(dim)) if dim > 0 => dim as usize, - _ => { - return Err(NanoError::Storage(format!( - "@embed target {}.{} must be Vector(dim)", - node.name, prop.name - ))); - } - }; - - let source_def = prop_by_name.get(source_prop.as_str()).ok_or_else(|| { - NanoError::Storage(format!( - "@embed on {}.{} references unknown source property {}", - node.name, prop.name, source_prop - )) - })?; - if source_def.list || source_def.scalar_type != "String" { - return Err(NanoError::Storage(format!( - "@embed source {}.{} must be String", - node.name, source_prop - ))); - } - - node_specs.push(EmbedSpec { - target_prop: prop.name.clone(), - source_prop: source_prop.clone(), - dim, - }); - } - - if !node_specs.is_empty() { - specs_by_type.insert(node.name.clone(), node_specs); - } - } - Ok(specs_by_type) -} - -fn load_embedding_cache(path: &Path) -> Result>> { - let records = load_embedding_cache_records(path)?; - let mut cache = HashMap::new(); - for record in records { - let key = cache_key_from_record(&record); - cache.insert(key, record.vector); - } - Ok(cache) -} - -fn append_embedding_cache(path: &Path, records: &[CacheRecord]) -> Result<()> { - let max_entries = parse_env_usize( - "NANOGRAPH_EMBED_CACHE_MAX_ENTRIES", - DEFAULT_EMBED_CACHE_MAX_ENTRIES, - ); - append_embedding_cache_with_limit(path, records, max_entries) -} - -fn append_embedding_cache_with_limit( - path: &Path, - records: &[CacheRecord], - max_entries: usize, -) -> Result<()> { - if records.is_empty() { - return Ok(()); - } - let _lock = acquire_embedding_cache_lock(path)?; - let mut merged = load_embedding_cache_records(path)?; - merged.extend(records.iter().cloned()); - let compacted = compact_embedding_cache_records(merged, max_entries); - write_embedding_cache_records(path, &compacted)?; - Ok(()) -} - -fn load_embedding_cache_records(path: &Path) -> Result> { - if !path.exists() { - return Ok(Vec::new()); - } - let data = std::fs::read_to_string(path)?; - parse_embedding_cache_records(path, &data) -} - -fn parse_embedding_cache_records(path: &Path, data: &str) -> Result> { - let mut records = Vec::new(); - for (line_no, line) in data.lines().enumerate() { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let record: CacheRecord = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!( - "invalid embedding cache at {} line {}: {}", - path.display(), - line_no + 1, - e - )) - })?; - if record.vector.len() != record.dim { - return Err(NanoError::Storage(format!( - "invalid embedding cache at {} line {}: vector dim {} does not match {}", - path.display(), - line_no + 1, - record.vector.len(), - record.dim - ))); - } - records.push(record); - } - Ok(records) -} - -fn write_embedding_cache_records(path: &Path, records: &[CacheRecord]) -> Result<()> { - let mut file = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(path)?; - for record in records { - let mut line = serde_json::to_vec(record).map_err(|e| { - NanoError::Storage(format!( - "failed to write embedding cache {}: {}", - path.display(), - e - )) - })?; - line.push(b'\n'); - file.write_all(&line)?; - } - file.flush()?; - Ok(()) -} - -fn compact_embedding_cache_records( - records: Vec, - max_entries: usize, -) -> Vec { - let max_entries = max_entries.max(1); - let mut seen = HashSet::new(); - let mut compacted_rev = Vec::with_capacity(records.len().min(max_entries)); - for record in records.into_iter().rev() { - if seen.insert(cache_key_from_record(&record)) { - compacted_rev.push(record); - if compacted_rev.len() == max_entries { - break; - } - } - } - compacted_rev.reverse(); - compacted_rev -} - -fn cache_key_from_record(record: &CacheRecord) -> CacheKey { - CacheKey { - model: record.model.clone(), - dim: record.dim, - content_hash: record.content_hash.clone(), - chunk_chars: record.chunk_chars, - chunk_overlap_chars: record.chunk_overlap_chars, - } -} - -struct EmbeddingCacheLock { - path: PathBuf, - _file: std::fs::File, -} - -impl Drop for EmbeddingCacheLock { - fn drop(&mut self) { - let _ = std::fs::remove_file(&self.path); - } -} - -fn embedding_cache_lock_path(path: &Path) -> PathBuf { - let mut lock_path = path.as_os_str().to_os_string(); - lock_path.push(".lock"); - PathBuf::from(lock_path) -} - -fn acquire_embedding_cache_lock(path: &Path) -> Result { - let stale_after_secs = parse_env_usize( - "NANOGRAPH_EMBED_CACHE_LOCK_STALE_SECS", - DEFAULT_EMBED_CACHE_LOCK_STALE_SECS, - ); - let stale_after = Duration::from_secs(stale_after_secs as u64); - acquire_embedding_cache_lock_with_stale_after(path, stale_after) -} - -fn acquire_embedding_cache_lock_with_stale_after( - path: &Path, - stale_after: Duration, -) -> Result { - let lock_path = embedding_cache_lock_path(path); - for attempt in 0..EMBEDDING_CACHE_LOCK_RETRIES { - match std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&lock_path) - { - Ok(file) => { - return Ok(EmbeddingCacheLock { - path: lock_path, - _file: file, - }); - } - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { - if lock_file_is_stale(&lock_path, stale_after) { - match std::fs::remove_file(&lock_path) { - Ok(()) => continue, - Err(remove_err) if remove_err.kind() == std::io::ErrorKind::NotFound => { - continue; - } - Err(remove_err) => { - return Err(NanoError::Storage(format!( - "failed to remove stale embedding cache lock {}: {}", - lock_path.display(), - remove_err - ))); - } - } - } - if attempt + 1 == EMBEDDING_CACHE_LOCK_RETRIES { - return Err(NanoError::Storage(format!( - "embedding cache lock timed out for {} (lock file: {})", - path.display(), - lock_path.display() - ))); - } - std::thread::sleep(Duration::from_millis(EMBEDDING_CACHE_LOCK_RETRY_DELAY_MS)); - } - Err(err) => { - return Err(NanoError::Storage(format!( - "failed to acquire embedding cache lock {}: {}", - lock_path.display(), - err - ))); - } - } - } - - Err(NanoError::Storage(format!( - "embedding cache lock acquisition failed for {}", - path.display() - ))) -} - -fn lock_file_is_stale(lock_path: &Path, stale_after: Duration) -> bool { - let metadata = match std::fs::metadata(lock_path) { - Ok(meta) => meta, - Err(_) => return false, - }; - let timestamp = metadata.modified().ok().or_else(|| metadata.created().ok()); - let Some(timestamp) = timestamp else { - return false; - }; - match timestamp.elapsed() { - Ok(age) => age >= stale_after, - Err(_) => false, - } -} - -fn parse_env_usize(name: &str, default: usize) -> usize { - std::env::var(name) - .ok() - .and_then(|v| v.parse::().ok()) - .filter(|v| *v > 0) - .unwrap_or(default) -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::io::Cursor; - use std::sync::{Arc, Barrier}; - - use tempfile::TempDir; - - use crate::catalog::schema_ir::build_schema_ir; - use crate::schema::parser::parse_schema; - - use super::*; - - #[tokio::test] - async fn materialize_embeddings_populates_missing_vector() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - title: String - embedding: Vector(6) @embed(title) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = r#"{"type":"Doc","data":{"slug":"a","title":"alpha"}} -{"type":"Doc","data":{"slug":"b","title":"beta"}} -"#; - let temp = TempDir::new().unwrap(); - let client = EmbeddingClient::mock_for_tests(); - let out = materialize_embeddings_for_load_inner(temp.path(), &ir, data, Some(&client)) - .await - .unwrap(); - assert!(out.contains("\"embedding\"")); - assert!(temp.path().join(EMBEDDING_CACHE_FILENAME).exists()); - } - - #[tokio::test] - async fn materialize_embeddings_is_noop_when_vectors_present() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - title: String - embedding: Vector(3) @embed(title) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = - r#"{"type":"Doc","data":{"slug":"a","title":"alpha","embedding":[1.0,0.0,0.0]}}"#; - let temp = TempDir::new().unwrap(); - let out = materialize_embeddings_for_load_inner( - temp.path(), - &ir, - data, - Some(&EmbeddingClient::mock_for_tests()), - ) - .await - .unwrap(); - assert_eq!(out, data); - assert!(!temp.path().join(EMBEDDING_CACHE_FILENAME).exists()); - } - - #[tokio::test] - async fn materialize_embeddings_to_tempfile_matches_string_path() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - title: String - embedding: Vector(6) @embed(title) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = r#"{"type":"Doc","data":{"slug":"a","title":"alpha"}} -{"type":"Doc","data":{"slug":"b","title":"beta"}} -"#; - let temp = TempDir::new().unwrap(); - let client = EmbeddingClient::mock_for_tests(); - - let string_out = - materialize_embeddings_for_load_inner(temp.path(), &ir, data, Some(&client)) - .await - .unwrap(); - let tempfile_out = materialize_embeddings_for_load_to_tempfile_inner( - temp.path(), - &ir, - Cursor::new(data.as_bytes()), - Some(&client), - ) - .await - .unwrap(); - let stream_out = std::fs::read_to_string(tempfile_out).unwrap(); - - let parse_rows = |text: &str| { - text.lines() - .filter(|line| !line.trim().is_empty()) - .map(|line| serde_json::from_str::(line).unwrap()) - .collect::>() - }; - - assert_eq!(parse_rows(&string_out), parse_rows(&stream_out)); - } - - #[test] - fn split_text_into_chunks_respects_overlap() { - let chunks = split_text_into_chunks("abcdefghij", 4, 1); - assert_eq!(chunks, vec!["abcd", "defg", "ghij"]); - } - - #[test] - fn append_embedding_cache_handles_concurrent_writers() { - let temp = TempDir::new().unwrap(); - let cache_path = temp.path().join(EMBEDDING_CACHE_FILENAME); - let writer_count = 8usize; - let barrier = Arc::new(Barrier::new(writer_count)); - let mut threads = Vec::new(); - - for idx in 0..writer_count { - let path = cache_path.clone(); - let barrier = Arc::clone(&barrier); - threads.push(std::thread::spawn(move || { - let record = CacheRecord { - model: "test-model".to_string(), - dim: 3, - content_hash: format!("hash-{}", idx), - vector: vec![idx as f32, 1.0, 2.0], - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - barrier.wait(); - append_embedding_cache(&path, &[record]).unwrap(); - })); - } - - for thread in threads { - thread.join().unwrap(); - } - - let file = std::fs::read_to_string(&cache_path).unwrap(); - let lines: Vec<&str> = file - .lines() - .filter(|line| !line.trim().is_empty()) - .collect(); - assert_eq!(lines.len(), writer_count); - - let mut seen = HashSet::new(); - for line in lines { - let record: CacheRecord = serde_json::from_str(line).unwrap(); - assert!(seen.insert(record.content_hash)); - } - } - - #[test] - fn append_embedding_cache_with_limit_compacts_and_deduplicates() { - let temp = TempDir::new().unwrap(); - let cache_path = temp.path().join(EMBEDDING_CACHE_FILENAME); - - let record = |hash: &str, marker: f32| CacheRecord { - model: "test-model".to_string(), - dim: 3, - content_hash: hash.to_string(), - vector: vec![marker, 1.0, 2.0], - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - - append_embedding_cache_with_limit( - &cache_path, - &[record("a", 1.0), record("b", 2.0), record("c", 3.0)], - 3, - ) - .unwrap(); - append_embedding_cache_with_limit(&cache_path, &[record("d", 4.0), record("b", 20.0)], 3) - .unwrap(); - - let cache = load_embedding_cache(&cache_path).unwrap(); - assert_eq!(cache.len(), 3); - - let key_b = CacheKey { - model: "test-model".to_string(), - dim: 3, - content_hash: "b".to_string(), - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - let key_c = CacheKey { - content_hash: "c".to_string(), - ..key_b.clone() - }; - let key_d = CacheKey { - content_hash: "d".to_string(), - ..key_b.clone() - }; - - assert_eq!(cache.get(&key_b).unwrap()[0], 20.0); - assert!(cache.contains_key(&key_c)); - assert!(cache.contains_key(&key_d)); - } - - #[test] - fn acquire_embedding_cache_lock_reclaims_stale_lock_file() { - let temp = TempDir::new().unwrap(); - let cache_path = temp.path().join(EMBEDDING_CACHE_FILENAME); - let lock_path = embedding_cache_lock_path(&cache_path); - - std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&lock_path) - .unwrap(); - std::thread::sleep(Duration::from_secs(2)); - - let lock = - acquire_embedding_cache_lock_with_stale_after(&cache_path, Duration::from_secs(1)) - .unwrap(); - drop(lock); - - assert!(!lock_path.exists()); - } - - #[tokio::test] - async fn materialize_embeddings_chunking_pools_chunk_vectors() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - body: String - embedding: Vector(6) @embed(body) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = r#"{"type":"Doc","data":{"slug":"doc-1","body":"alpha beta gamma delta epsilon zeta"}}"#; - let temp = TempDir::new().unwrap(); - let client = EmbeddingClient::mock_for_tests(); - let chunking = EmbedChunkingConfig::new(12, 3); - let out = materialize_embeddings_for_load_inner_with_chunking( - temp.path(), - &ir, - data, - Some(&client), - chunking, - ) - .await - .unwrap(); - - let embedded: serde_json::Value = serde_json::from_str(&out).unwrap(); - let values = embedded["data"]["embedding"].as_array().unwrap(); - let actual: Vec = values.iter().map(|v| v.as_f64().unwrap() as f32).collect(); - - let chunk_texts = split_text_into_chunks( - "alpha beta gamma delta epsilon zeta", - chunking.chunk_chars, - chunking.chunk_overlap_chars, - ); - let chunk_vectors = client.embed_texts(&chunk_texts, 6).await.unwrap(); - let expected = average_pool_embeddings(&chunk_vectors, 6).unwrap(); - - assert_eq!(actual.len(), expected.len()); - for (got, want) in actual.iter().zip(expected.iter()) { - assert!((got - want).abs() < 1e-6, "got={}, want={}", got, want); - } - } - - #[test] - fn cache_key_differs_by_chunking_config() { - let key_a = CacheKey { - model: "text-embedding-3-small".to_string(), - dim: 8, - content_hash: "abc".to_string(), - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - let key_b = CacheKey { - chunk_chars: 256, - chunk_overlap_chars: 64, - ..key_a.clone() - }; - assert_ne!(key_a, key_b); - } -} diff --git a/crates/omnigraph/src/loader/jsonl.rs b/crates/omnigraph/src/loader/jsonl.rs deleted file mode 100644 index 8eb9617..0000000 --- a/crates/omnigraph/src/loader/jsonl.rs +++ /dev/null @@ -1,1532 +0,0 @@ -use std::collections::{BTreeMap, HashMap}; -use std::fs::{File, OpenOptions}; -use std::io::{BufRead, BufReader, BufWriter, Cursor, Write}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; - -use arrow_array::builder::{ - ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder, - Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder, - UInt32Builder, UInt64Builder, make_builder, -}; -use arrow_array::{ - Array, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int32Array, - Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array, -}; -use arrow_schema::{DataType, Field, Schema}; - -use crate::error::{NanoError, Result}; - -use super::super::graph::DatasetAccumulator; -use super::constraints::{key_value_string, node_property_field}; - -#[cfg_attr(not(test), allow(dead_code))] -/// Load JSONL-formatted data into a DatasetAccumulator. -/// Each line is either a node `{"type": "...", "data": {...}}` or edge `{"edge": "...", "from": "...", "to": "..."}`. -pub(crate) fn load_jsonl_data( - storage: &mut DatasetAccumulator, - data: &str, - key_props: &HashMap, -) -> Result<()> { - load_jsonl_data_with_name_seed(storage, data, key_props, None) -} - -#[cfg_attr(not(test), allow(dead_code))] -/// Load JSONL-formatted data into a DatasetAccumulator with an optional pre-populated -/// @key-value-to-id mapping for resolving edges that reference existing nodes. -pub(crate) fn load_jsonl_data_with_name_seed( - storage: &mut DatasetAccumulator, - data: &str, - key_props: &HashMap, - name_seed: Option<&HashMap<(String, String), u64>>, -) -> Result<()> { - let cursor = Cursor::new(data.as_bytes()); - load_jsonl_reader_with_name_seed(storage, cursor, key_props, name_seed) -} - -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn load_jsonl_reader( - storage: &mut DatasetAccumulator, - reader: R, - key_props: &HashMap, -) -> Result<()> { - load_jsonl_reader_with_name_seed(storage, reader, key_props, None) -} - -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn load_jsonl_reader_with_name_seed( - storage: &mut DatasetAccumulator, - reader: R, - key_props: &HashMap, - name_seed: Option<&HashMap<(String, String), u64>>, -) -> Result<()> { - let spool_dir = std::env::temp_dir(); - load_jsonl_reader_with_name_seed_at_path(storage, &spool_dir, reader, key_props, name_seed) -} - -pub(crate) fn load_jsonl_reader_with_name_seed_at_path( - storage: &mut DatasetAccumulator, - spool_dir: &Path, - reader: R, - key_props: &HashMap, - name_seed: Option<&HashMap<(String, String), u64>>, -) -> Result<()> { - let batch_size = parse_env_usize("NANOGRAPH_LOAD_ROW_BATCH_SIZE", 2048); - let mut spool_paths = TempSpoolPaths::default(); - let mut node_paths = HashMap::new(); - let mut node_writers = HashMap::new(); - let mut edge_paths = HashMap::new(); - let mut edge_writers = HashMap::new(); - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() || trimmed.starts_with("//") { - continue; - } - - let obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!("JSON parse error on line {}: {}", line_no + 1, e)) - })?; - - if let Some(type_name) = obj.get("type").and_then(|v| v.as_str()) { - if !storage.catalog.node_types.contains_key(type_name) { - return Err(NanoError::Storage(format!( - "unknown node type in data: {}", - type_name - ))); - } - let writer = spool_writer_for_type( - spool_dir, - "load_nodes", - type_name, - &mut node_writers, - &mut node_paths, - &mut spool_paths, - )?; - write_jsonl_line(writer, &obj)?; - } else if let Some(edge_type) = obj.get("edge").and_then(|v| v.as_str()) { - let edge_name = resolve_edge_name(storage, edge_type)?; - let writer = spool_writer_for_type( - spool_dir, - "load_edges", - &edge_name, - &mut edge_writers, - &mut edge_paths, - &mut spool_paths, - )?; - write_jsonl_line(writer, &obj)?; - } - } - - drop(node_writers); - drop(edge_writers); - - let mut key_to_id: HashMap<(String, String), u64> = name_seed.cloned().unwrap_or_default(); - - let mut node_types: Vec = node_paths.keys().cloned().collect(); - node_types.sort(); - for type_name in node_types { - let path = node_paths.get(&type_name).ok_or_else(|| { - NanoError::Storage(format!("missing node spool path for {}", type_name)) - })?; - load_spooled_nodes( - storage, - &type_name, - path, - key_props, - &mut key_to_id, - batch_size, - )?; - } - - let mut edge_names: Vec = edge_paths.keys().cloned().collect(); - edge_names.sort(); - for edge_name in edge_names { - let path = edge_paths.get(&edge_name).ok_or_else(|| { - NanoError::Storage(format!("missing edge spool path for {}", edge_name)) - })?; - load_spooled_edges(storage, &edge_name, path, key_props, &key_to_id, batch_size)?; - } - - Ok(()) -} - -#[derive(Debug)] -struct PendingNodeRow { - row_idx: usize, - data: serde_json::Map, -} - -#[derive(Debug)] -struct ResolvedEdge { - from_id: u64, - to_id: u64, - data: Option>, -} - -#[derive(Default)] -struct TempSpoolPaths { - paths: Vec, -} - -impl TempSpoolPaths { - fn push(&mut self, path: PathBuf) { - self.paths.push(path); - } -} - -impl Drop for TempSpoolPaths { - fn drop(&mut self) { - for path in &self.paths { - let _ = std::fs::remove_file(path); - } - } -} - -fn load_spooled_nodes( - storage: &mut DatasetAccumulator, - type_name: &str, - path: &Path, - key_props: &HashMap, - key_to_id: &mut HashMap<(String, String), u64>, - batch_size: usize, -) -> Result<()> { - let file = File::open(path)?; - let reader = BufReader::new(file); - let mut rows = Vec::with_capacity(batch_size); - let mut next_row_idx = 0usize; - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!( - "JSON parse error in node spool {} line {}: {}", - type_name, - line_no + 1, - e - )) - })?; - let data = obj - .get("data") - .and_then(|value| value.as_object()) - .cloned() - .ok_or_else(|| { - NanoError::Storage(format!( - "node {} is missing object field `data` in spooled load", - type_name - )) - })?; - rows.push(PendingNodeRow { - row_idx: next_row_idx, - data, - }); - next_row_idx += 1; - if rows.len() >= batch_size { - flush_node_rows(storage, type_name, &mut rows, key_props, key_to_id)?; - } - } - - if !rows.is_empty() { - flush_node_rows(storage, type_name, &mut rows, key_props, key_to_id)?; - } - - Ok(()) -} - -fn flush_node_rows( - storage: &mut DatasetAccumulator, - type_name: &str, - rows: &mut Vec, - key_props: &HashMap, - key_to_id: &mut HashMap<(String, String), u64>, -) -> Result<()> { - if rows.is_empty() { - return Ok(()); - } - - let node_type = - storage.catalog.node_types.get(type_name).ok_or_else(|| { - NanoError::Storage(format!("unknown node type in data: {}", type_name)) - })?; - let prop_fields: Vec = node_type - .arrow_schema - .fields() - .iter() - .skip(1) - .map(|field| field.as_ref().clone()) - .collect(); - let mut builders: Vec> = - vec![Vec::with_capacity(rows.len()); prop_fields.len()]; - - for row in rows.iter() { - for (idx, field) in prop_fields.iter().enumerate() { - let value = row - .data - .get(field.name()) - .cloned() - .unwrap_or(serde_json::Value::Null); - if value.is_null() && !field.is_nullable() { - return Err(NanoError::Storage(format!( - "node {}: required field '{}' missing on row {}", - type_name, - field.name(), - row.row_idx - ))); - } - if let Some(prop_type) = node_type.properties.get(field.name()) { - validate_json_value(type_name, field.name(), prop_type, &value)?; - } - builders[idx].push(value); - } - } - - let mut columns: Vec> = Vec::with_capacity(prop_fields.len()); - for (idx, field) in prop_fields.iter().enumerate() { - columns.push(json_values_to_array( - &builders[idx], - field.data_type(), - field.is_nullable(), - )?); - } - - let prop_schema = Arc::new(Schema::new(prop_fields.clone())); - let batch = RecordBatch::try_new(prop_schema, columns) - .map_err(|e| NanoError::Storage(format!("batch error: {}", e)))?; - - let key_rows: Option> = if let Some(key_prop) = key_props.get(type_name) { - let key_col_idx = prop_fields - .iter() - .position(|field| field.name() == key_prop) - .ok_or_else(|| { - NanoError::Storage(format!( - "node type {} missing @key property {}", - type_name, key_prop - )) - })?; - let key_arr = batch.column(key_col_idx).clone(); - let mut keys = Vec::with_capacity(batch.num_rows()); - for row in 0..batch.num_rows() { - keys.push(key_value_string(&key_arr, row, key_prop)?); - } - Some(keys) - } else { - None - }; - - let assigned_ids = storage.insert_nodes(type_name, batch)?; - if let Some(keys) = key_rows { - for (row, key) in keys.into_iter().enumerate() { - key_to_id.insert((type_name.to_string(), key), assigned_ids[row]); - } - } - - rows.clear(); - Ok(()) -} - -fn load_spooled_edges( - storage: &mut DatasetAccumulator, - edge_name: &str, - path: &Path, - key_props: &HashMap, - key_to_id: &HashMap<(String, String), u64>, - batch_size: usize, -) -> Result<()> { - let file = File::open(path)?; - let reader = BufReader::new(file); - let mut edges_by_pair: BTreeMap<(u64, u64), ResolvedEdge> = BTreeMap::new(); - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!( - "JSON parse error in edge spool {} line {}: {}", - edge_name, - line_no + 1, - e - )) - })?; - let resolved = resolve_edge_object(storage, &obj, key_props, key_to_id)?; - edges_by_pair.insert((resolved.from_id, resolved.to_id), resolved); - } - - if edges_by_pair.is_empty() { - return Ok(()); - } - - let resolved_edges: Vec<&ResolvedEdge> = edges_by_pair.values().collect(); - for chunk in resolved_edges.chunks(batch_size.max(1)) { - insert_resolved_edge_chunk(storage, edge_name, chunk)?; - } - - Ok(()) -} - -fn insert_resolved_edge_chunk( - storage: &mut DatasetAccumulator, - edge_name: &str, - edges: &[&ResolvedEdge], -) -> Result<()> { - let src_ids: Vec = edges.iter().map(|edge| edge.from_id).collect(); - let dst_ids: Vec = edges.iter().map(|edge| edge.to_id).collect(); - - let edge_seg = storage - .edge_segments - .get(edge_name) - .ok_or_else(|| NanoError::Storage(format!("no edge segment: {}", edge_name)))?; - let edge_type = - storage.catalog.edge_types.get(edge_name).ok_or_else(|| { - NanoError::Storage(format!("unknown edge type in data: {}", edge_name)) - })?; - let prop_fields: Vec = edge_seg - .schema - .fields() - .iter() - .skip(3) - .map(|field| field.as_ref().clone()) - .collect(); - - let prop_batch = if prop_fields.is_empty() { - None - } else { - let mut columns: Vec> = Vec::with_capacity(prop_fields.len()); - for field in &prop_fields { - let values: Vec = edges - .iter() - .map(|edge| { - edge.data - .as_ref() - .and_then(|data| data.get(field.name())) - .cloned() - .unwrap_or(serde_json::Value::Null) - }) - .collect(); - if let Some(prop_type) = edge_type.properties.get(field.name()) { - for value in &values { - validate_json_value(edge_name, field.name(), prop_type, value)?; - } - } - columns.push(json_values_to_array( - &values, - field.data_type(), - field.is_nullable(), - )?); - } - let schema = Arc::new(Schema::new(prop_fields)); - Some( - RecordBatch::try_new(schema, columns) - .map_err(|e| NanoError::Storage(format!("edge prop batch error: {}", e)))?, - ) - }; - - storage.insert_edges(edge_name, &src_ids, &dst_ids, prop_batch)?; - Ok(()) -} - -fn resolve_edge_object( - storage: &DatasetAccumulator, - edge_obj: &serde_json::Value, - key_props: &HashMap, - key_to_id: &HashMap<(String, String), u64>, -) -> Result { - let edge_type = edge_obj - .get("edge") - .and_then(|value| value.as_str()) - .ok_or_else(|| NanoError::Storage("edge missing type".to_string()))?; - let et = resolve_edge_type(storage, edge_type)?; - - let from_token = edge_obj - .get("from") - .and_then(|value| value.as_str()) - .ok_or_else(|| NanoError::Storage("edge missing from".to_string()))?; - let to_token = edge_obj - .get("to") - .and_then(|value| value.as_str()) - .ok_or_else(|| NanoError::Storage("edge missing to".to_string()))?; - - let from_type = et.from_type.clone(); - let to_type = et.to_type.clone(); - let edge_name = et.name.clone(); - - let (src_key_prop, dst_key_prop) = match (key_props.get(&from_type), key_props.get(&to_type)) { - (Some(src), Some(dst)) => (src, dst), - _ => { - return Err(NanoError::Storage(format!( - "edge '{}' requires @key on source type '{}' and destination type '{}'", - edge_name, from_type, to_type - ))); - } - }; - - let from_key_type = storage - .catalog - .node_types - .get(&from_type) - .and_then(|node_type| node_property_field(node_type.arrow_schema.as_ref(), src_key_prop)) - .map(|field| field.data_type().clone()) - .ok_or_else(|| { - NanoError::Storage(format!( - "missing @key field {} on source type {}", - src_key_prop, from_type - )) - })?; - let to_key_type = storage - .catalog - .node_types - .get(&to_type) - .and_then(|node_type| node_property_field(node_type.arrow_schema.as_ref(), dst_key_prop)) - .map(|field| field.data_type().clone()) - .ok_or_else(|| { - NanoError::Storage(format!( - "missing @key field {} on destination type {}", - dst_key_prop, to_type - )) - })?; - - let from_key = parse_edge_endpoint_key_token(from_token, &from_key_type).map_err(|e| { - NanoError::Storage(format!( - "invalid edge endpoint key for {}.{} from='{}': {}", - from_type, src_key_prop, from_token, e - )) - })?; - let to_key = parse_edge_endpoint_key_token(to_token, &to_key_type).map_err(|e| { - NanoError::Storage(format!( - "invalid edge endpoint key for {}.{} to='{}': {}", - to_type, dst_key_prop, to_token, e - )) - })?; - - let from_id = *key_to_id - .get(&(from_type.clone(), from_key.clone())) - .ok_or_else(|| { - NanoError::Storage(format!( - "node not found by @key: {}.{}={}", - from_type, src_key_prop, from_key - )) - })?; - let to_id = *key_to_id - .get(&(to_type.clone(), to_key.clone())) - .ok_or_else(|| { - NanoError::Storage(format!( - "node not found by @key: {}.{}={}", - to_type, dst_key_prop, to_key - )) - })?; - - Ok(ResolvedEdge { - from_id, - to_id, - data: edge_obj - .get("data") - .and_then(|value| value.as_object()) - .cloned(), - }) -} - -fn resolve_edge_name(storage: &DatasetAccumulator, edge_type: &str) -> Result { - Ok(resolve_edge_type(storage, edge_type)?.name.clone()) -} - -fn resolve_edge_type<'a>( - storage: &'a DatasetAccumulator, - edge_type: &str, -) -> Result<&'a crate::catalog::EdgeType> { - storage - .catalog - .edge_types - .get(edge_type) - .or_else(|| { - storage - .catalog - .edge_name_index - .get(edge_type) - .and_then(|name| storage.catalog.edge_types.get(name)) - }) - .ok_or_else(|| NanoError::Storage(format!("unknown edge type: {}", edge_type))) -} - -fn spool_writer_for_type<'a>( - spool_dir: &Path, - prefix: &str, - type_name: &str, - writers: &'a mut HashMap>, - paths: &mut HashMap, - spool_paths: &mut TempSpoolPaths, -) -> Result<&'a mut BufWriter> { - if !writers.contains_key(type_name) { - let path = create_temp_spool_file(spool_dir, prefix, type_name)?; - spool_paths.push(path.clone()); - let writer = BufWriter::new( - OpenOptions::new() - .create_new(false) - .write(true) - .open(&path)?, - ); - writers.insert(type_name.to_string(), writer); - paths.insert(type_name.to_string(), path); - } - writers - .get_mut(type_name) - .ok_or_else(|| NanoError::Storage(format!("failed to open spool writer for {}", type_name))) -} - -fn create_temp_spool_file(spool_dir: &Path, prefix: &str, type_name: &str) -> Result { - std::fs::create_dir_all(spool_dir)?; - let pid = std::process::id(); - let sanitized = type_name.replace(['/', '\\', ' '], "_"); - for attempt in 0..256u32 { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - let path = spool_dir.join(format!( - ".nanograph_{}_{}_{}_{}_{}.jsonl", - prefix, sanitized, pid, now, attempt - )); - match OpenOptions::new().create_new(true).write(true).open(&path) { - Ok(_) => return Ok(path), - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue, - Err(err) => return Err(err.into()), - } - } - - Err(NanoError::Storage(format!( - "failed to create temp spool file for {}", - type_name - ))) -} - -fn write_jsonl_line(writer: &mut BufWriter, value: &serde_json::Value) -> Result<()> { - serde_json::to_writer(&mut *writer, value) - .map_err(|e| NanoError::Storage(format!("serialize JSONL row failed: {}", e)))?; - writer.write_all(b"\n")?; - Ok(()) -} - -fn parse_env_usize(name: &str, default: usize) -> usize { - std::env::var(name) - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|value| *value > 0) - .unwrap_or(default) -} - -fn validate_json_value( - type_name: &str, - field_name: &str, - prop_type: &crate::types::PropType, - value: &serde_json::Value, -) -> Result<()> { - if value.is_null() { - return Ok(()); - } - if prop_type.list { - let Some(items) = value.as_array() else { - return Err(type_mismatch_error( - type_name, - field_name, - &expected_type_name(prop_type), - value, - )); - }; - let item_type = crate::types::PropType { - scalar: prop_type.scalar, - nullable: true, - list: false, - enum_values: prop_type.enum_values.clone(), - }; - for item in items { - validate_json_value(type_name, field_name, &item_type, item)?; - } - return Ok(()); - } - if let Some(enum_values) = &prop_type.enum_values { - let Some(raw) = value.as_str() else { - return Err(type_mismatch_error( - type_name, - field_name, - &expected_type_name(prop_type), - value, - )); - }; - if enum_values.iter().any(|allowed| allowed == raw) { - return Ok(()); - } - return Err(NanoError::Storage(format!( - "invalid enum value '{}' for {}.{} (expected: {})", - raw, - type_name, - field_name, - enum_values.join(", ") - ))); - } - - let valid = match prop_type.scalar { - crate::types::ScalarType::String => value.is_string(), - crate::types::ScalarType::Bool => value.is_boolean(), - crate::types::ScalarType::I32 => { - value.as_i64().and_then(|n| i32::try_from(n).ok()).is_some() - } - crate::types::ScalarType::I64 => value.as_i64().is_some(), - crate::types::ScalarType::U32 => { - value.as_u64().and_then(|n| u32::try_from(n).ok()).is_some() - } - crate::types::ScalarType::U64 => value.as_u64().is_some(), - crate::types::ScalarType::F32 => value.as_f64().is_some(), - crate::types::ScalarType::F64 => value.as_f64().is_some(), - crate::types::ScalarType::Date => parse_date32_json_value(value).is_ok(), - crate::types::ScalarType::DateTime => parse_date64_json_value(value).is_ok(), - crate::types::ScalarType::Vector(dim) => match value.as_array() { - Some(items) if items.len() == dim as usize => { - items.iter().all(|item| item.as_f64().is_some()) - } - _ => false, - }, - }; - if valid { - Ok(()) - } else { - Err(type_mismatch_error( - type_name, - field_name, - &expected_type_name(prop_type), - value, - )) - } -} - -fn expected_type_name(prop_type: &crate::types::PropType) -> String { - let base = if let Some(enum_values) = &prop_type.enum_values { - format!("enum({})", enum_values.join(", ")) - } else { - prop_type.scalar.to_string() - }; - if prop_type.list { - format!("[{}]", base) - } else { - base - } -} - -fn type_mismatch_error( - type_name: &str, - field_name: &str, - expected: &str, - value: &serde_json::Value, -) -> NanoError { - NanoError::Storage(format!( - "type mismatch for {}.{}: expected {}, got {}", - type_name, - field_name, - expected, - describe_json_value(value) - )) -} - -fn describe_json_value(value: &serde_json::Value) -> String { - match value { - serde_json::Value::Null => "Null".to_string(), - serde_json::Value::Bool(v) => format!("Bool {}", v), - serde_json::Value::Number(v) => { - if v.is_i64() || v.is_u64() { - format!("Integer {}", v) - } else { - format!("Float {}", v) - } - } - serde_json::Value::String(v) => format!("String {:?}", v), - serde_json::Value::Array(v) => format!("Array {}", serde_json::Value::Array(v.clone())), - serde_json::Value::Object(v) => { - format!("Object {}", serde_json::Value::Object(v.clone())) - } - } -} - -/// Convert JSON values to an Arrow array based on the target DataType. -pub(crate) fn json_values_to_array( - values: &[serde_json::Value], - dt: &DataType, - nullable: bool, -) -> Result> { - let arr: Arc = match dt { - DataType::Utf8 => { - let arr: StringArray = values - .iter() - .map(|v| v.as_str().map(|s| s.to_string())) - .collect(); - Arc::new(arr) - } - DataType::Int32 => { - let arr: Int32Array = values - .iter() - .map(|v| v.as_i64().map(|n| n as i32)) - .collect(); - Arc::new(arr) - } - DataType::Int64 => { - let arr: Int64Array = values.iter().map(|v| v.as_i64()).collect(); - Arc::new(arr) - } - DataType::UInt64 => { - let arr: UInt64Array = values.iter().map(|v| v.as_u64()).collect(); - Arc::new(arr) - } - DataType::Float64 => { - let arr: Float64Array = values.iter().map(|v| v.as_f64()).collect(); - Arc::new(arr) - } - DataType::Boolean => { - let arr: BooleanArray = values.iter().map(|v| v.as_bool()).collect(); - Arc::new(arr) - } - DataType::Float32 => { - let arr: Float32Array = values - .iter() - .map(|v| v.as_f64().map(|n| n as f32)) - .collect(); - Arc::new(arr) - } - DataType::UInt32 => { - let arr: UInt32Array = values - .iter() - .map(|v| v.as_u64().map(|n| n as u32)) - .collect(); - Arc::new(arr) - } - DataType::Date32 => { - let mut out = Vec::with_capacity(values.len()); - for value in values { - out.push(parse_date32_json_value(value)?); - } - Arc::new(Date32Array::from(out)) - } - DataType::Date64 => { - let mut out = Vec::with_capacity(values.len()); - for value in values { - out.push(parse_date64_json_value(value)?); - } - Arc::new(Date64Array::from(out)) - } - DataType::List(field) => { - let mut builder = ListBuilder::with_capacity( - make_builder(field.data_type(), values.len()), - values.len(), - ) - .with_field(field.clone()); - for value in values { - if value.is_null() { - builder.append(false); - continue; - } - let Some(items) = value.as_array() else { - builder.append(false); - continue; - }; - for item in items { - append_json_to_builder(builder.values(), field.data_type(), item)?; - } - builder.append(true); - } - Arc::new(builder.finish()) - } - DataType::FixedSizeList(field, dim) => { - if *dim <= 0 { - return Err(NanoError::Storage(format!( - "invalid FixedSizeList dimension: {}", - dim - ))); - } - if field.data_type() != &DataType::Float32 { - return Err(NanoError::Storage(format!( - "unsupported FixedSizeList element type {:?}; expected Float32", - field.data_type() - ))); - } - - let list_len = *dim as usize; - let mut builder = FixedSizeListBuilder::with_capacity( - Float32Builder::with_capacity(values.len() * list_len), - *dim, - values.len(), - ) - .with_field(field.clone()); - - for value in values { - if value.is_null() { - for _ in 0..list_len { - builder.values().append_null(); - } - builder.append(false); - continue; - } - let items = value.as_array().ok_or_else(|| { - NanoError::Storage(format!( - "expected JSON array for FixedSizeList, got {}", - dim, value - )) - })?; - if items.len() != list_len { - return Err(NanoError::Storage(format!( - "FixedSizeList length mismatch: got {}", - dim, - items.len() - ))); - } - - for item in items { - let num = item.as_f64().ok_or_else(|| { - NanoError::Storage(format!( - "expected numeric vector element in FixedSizeList, got {}", - dim, item - )) - })?; - builder.values().append_value(num as f32); - } - builder.append(true); - } - Arc::new(builder.finish()) - } - _ => { - // Fallback to string - let arr: StringArray = values.iter().map(|v| Some(v.to_string())).collect(); - Arc::new(arr) - } - }; - if !nullable && arr.null_count() > 0 { - return Err(NanoError::Storage(format!( - "field has {} null value(s) from type mismatch (expected {:?})", - arr.null_count(), - dt - ))); - } - Ok(arr) -} - -fn append_json_to_builder( - builder: &mut Box, - dt: &DataType, - value: &serde_json::Value, -) -> Result<()> { - match dt { - DataType::Utf8 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Utf8 builder downcast failed".to_string()) - })?; - if let Some(s) = value.as_str() { - b.append_value(s); - } else { - b.append_null(); - } - } - DataType::Boolean => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Boolean builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_bool() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Int32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Int32 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_i64() { - if let Ok(n) = i32::try_from(v) { - b.append_value(n); - } else { - b.append_null(); - } - } else { - b.append_null(); - } - } - DataType::Int64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Int64 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_i64() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::UInt32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list UInt32 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_u64() { - if let Ok(n) = u32::try_from(v) { - b.append_value(n); - } else { - b.append_null(); - } - } else { - b.append_null(); - } - } - DataType::UInt64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list UInt64 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_u64() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Float32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Float32 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_f64() { - b.append_value(v as f32); - } else { - b.append_null(); - } - } - DataType::Float64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Float64 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_f64() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Date32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Date32 builder downcast failed".to_string()) - })?; - if let Some(v) = parse_date32_json_value(value)? { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Date64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Date64 builder downcast failed".to_string()) - })?; - if let Some(v) = parse_date64_json_value(value)? { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::List(field) => { - let b = builder - .as_any_mut() - .downcast_mut::>>() - .ok_or_else(|| { - NanoError::Storage("nested list builder downcast failed".to_string()) - })?; - if value.is_null() { - b.append(false); - } else if let Some(items) = value.as_array() { - for item in items { - append_json_to_builder(b.values(), field.data_type(), item)?; - } - b.append(true); - } else { - b.append(false); - } - } - other => { - return Err(NanoError::Storage(format!( - "unsupported list element data type {:?}", - other - ))); - } - } - - Ok(()) -} - -fn parse_date32_json_value(value: &serde_json::Value) -> Result> { - if value.is_null() { - return Ok(None); - } - if let Some(days) = value.as_i64() { - return i32::try_from(days) - .map(Some) - .map_err(|_| NanoError::Storage(format!("Date32 value out of range: {}", days))); - } - if let Some(days) = value.as_u64() { - return i32::try_from(days) - .map(Some) - .map_err(|_| NanoError::Storage(format!("Date32 value out of range: {}", days))); - } - if let Some(s) = value.as_str() { - return Ok(Some(parse_date32_literal(s)?)); - } - Ok(None) -} - -fn parse_date64_json_value(value: &serde_json::Value) -> Result> { - if value.is_null() { - return Ok(None); - } - if let Some(ms) = value.as_i64() { - return Ok(Some(ms)); - } - if let Some(ms) = value.as_u64() { - return i64::try_from(ms) - .map(Some) - .map_err(|_| NanoError::Storage(format!("Date64 value out of range: {}", ms))); - } - if let Some(s) = value.as_str() { - return Ok(Some(parse_date64_literal(s)?)); - } - Ok(None) -} - -fn parse_edge_endpoint_key_token(token: &str, dt: &DataType) -> Result { - match dt { - DataType::Utf8 => Ok(token.to_string()), - DataType::Boolean => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected bool token: {}", e))), - DataType::Int32 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Int32 token: {}", e))), - DataType::Int64 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Int64 token: {}", e))), - DataType::UInt32 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected UInt32 token: {}", e))), - DataType::UInt64 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected UInt64 token: {}", e))), - DataType::Float32 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Float32 token: {}", e))), - DataType::Float64 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Float64 token: {}", e))), - DataType::Date32 => parse_date32_literal(token).map(|v| v.to_string()), - DataType::Date64 => parse_date64_literal(token).map(|v| v.to_string()), - other => Err(NanoError::Storage(format!( - "unsupported @key type for edge endpoint resolution: {:?}", - other - ))), - } -} - -pub(crate) fn parse_date32_literal(s: &str) -> Result { - let raw: Arc = Arc::new(StringArray::from(vec![Some(s)])); - let casted = arrow_cast::cast(raw.as_ref(), &DataType::Date32) - .map_err(|e| NanoError::Storage(format!("invalid Date literal '{}': {}", s, e)))?; - let out = casted - .as_any() - .downcast_ref::() - .ok_or_else(|| NanoError::Storage("Date32 cast produced unexpected array".to_string()))?; - if out.is_null(0) { - return Err(NanoError::Storage(format!("invalid Date literal '{}'", s))); - } - Ok(out.value(0)) -} - -pub(crate) fn parse_date64_literal(s: &str) -> Result { - let raw: Arc = Arc::new(StringArray::from(vec![Some(s)])); - let casted = arrow_cast::cast(raw.as_ref(), &DataType::Date64) - .map_err(|e| NanoError::Storage(format!("invalid DateTime literal '{}': {}", s, e)))?; - let out = casted - .as_any() - .downcast_ref::() - .ok_or_else(|| NanoError::Storage("Date64 cast produced unexpected array".to_string()))?; - if out.is_null(0) { - return Err(NanoError::Storage(format!( - "invalid DateTime literal '{}'", - s - ))); - } - Ok(out.value(0)) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::io::Cursor; - - use serde_json::json; - - use crate::catalog::schema_ir::{build_catalog_from_ir, build_schema_ir}; - use crate::schema::parser::parse_schema; - - use super::*; - - fn test_schema() -> &'static str { - r#"node Person { - name: String @key -} -edge Knows: Person -> Person -"# - } - - fn build_storage(schema_src: &str) -> DatasetAccumulator { - let schema = parse_schema(schema_src).unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let catalog = build_catalog_from_ir(&ir).unwrap(); - DatasetAccumulator::new(catalog) - } - - fn person_key_props() -> HashMap { - HashMap::from([("Person".to_string(), "name".to_string())]) - } - - fn person_id_by_name(storage: &DatasetAccumulator, name: &str) -> u64 { - let batch = storage.get_all_nodes("Person").unwrap().unwrap(); - let id_col = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let name_col = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - (0..batch.num_rows()) - .find(|&i| name_col.value(i) == name) - .map(|i| id_col.value(i)) - .unwrap() - } - - #[test] - fn json_values_to_array_rejects_non_nullable_mismatch() { - let values = vec![json!("abc"), json!(42)]; - let err = json_values_to_array(&values, &DataType::Int32, false).unwrap_err(); - assert!( - err.to_string().contains("null value"), - "unexpected error: {err}" - ); - } - - #[test] - fn json_values_to_array_accepts_iso_date_strings() { - let values = vec![json!("2026-02-14"), json!(null)]; - let arr = json_values_to_array(&values, &DataType::Date32, true).unwrap(); - let arr = arr.as_any().downcast_ref::().unwrap(); - assert!(!arr.is_null(0)); - assert!(arr.is_null(1)); - } - - #[test] - fn json_values_to_array_accepts_iso_datetime_strings() { - let values = vec![json!("2026-02-14T10:00:00Z"), json!(null)]; - let arr = json_values_to_array(&values, &DataType::Date64, true).unwrap(); - let arr = arr.as_any().downcast_ref::().unwrap(); - assert!(!arr.is_null(0)); - assert!(arr.is_null(1)); - } - - #[test] - fn json_values_to_array_builds_list_values() { - let values = vec![json!([1, 2]), json!(null), json!([3])]; - let dt = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); - let arr = json_values_to_array(&values, &dt, true).unwrap(); - let list = arr - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(list.len(), 3); - assert!(!list.is_null(0)); - assert!(list.is_null(1)); - assert!(!list.is_null(2)); - - let first = list.value(0); - let first = first.as_any().downcast_ref::().unwrap(); - assert_eq!(first.len(), 2); - assert_eq!(first.value(0), 1); - assert_eq!(first.value(1), 2); - } - - #[test] - fn json_values_to_array_builds_fixed_size_list_vectors() { - let values = vec![json!([0.1, 0.2, 0.3]), json!(null), json!([1, 2, 3])]; - let dt = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 3); - let arr = json_values_to_array(&values, &dt, true).unwrap(); - let vecs = arr - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(vecs.len(), 3); - assert!(!vecs.is_null(0)); - assert!(vecs.is_null(1)); - assert!(!vecs.is_null(2)); - } - - #[test] - fn json_values_to_array_rejects_fixed_size_list_length_mismatch() { - let values = vec![json!([0.1, 0.2])]; - let dt = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 3); - let err = json_values_to_array(&values, &dt, true).unwrap_err(); - assert!(err.to_string().contains("length mismatch")); - } - - #[test] - fn load_jsonl_with_name_seed_resolves_edges_to_existing_nodes() { - let mut existing = build_storage(test_schema()); - load_jsonl_data( - &mut existing, - r#"{"type":"Person","data":{"name":"Alice"}}"#, - &person_key_props(), - ) - .unwrap(); - let alice_id = person_id_by_name(&existing, "Alice"); - - let data = r#"{"type":"Person","data":{"name":"Bob"}} -{"edge":"Knows","from":"Alice","to":"Bob"}"#; - - let mut no_seed = build_storage(test_schema()); - let err = load_jsonl_data(&mut no_seed, data, &person_key_props()).unwrap_err(); - assert!( - err.to_string().contains("node not found by @key"), - "unexpected error: {err}" - ); - - let mut seeded = build_storage(test_schema()); - let mut seed = HashMap::new(); - seed.insert(("Person".to_string(), "Alice".to_string()), alice_id); - load_jsonl_data_with_name_seed(&mut seeded, data, &person_key_props(), Some(&seed)) - .unwrap(); - - let bob_id = person_id_by_name(&seeded, "Bob"); - let knows = &seeded.edge_segments["Knows"]; - assert_eq!(knows.edge_ids.len(), 1); - assert_eq!(knows.src_ids[0], alice_id); - assert_eq!(knows.dst_ids[0], bob_id); - } - - #[test] - fn load_jsonl_reader_handles_forward_reference_edges() { - let mut storage = build_storage(test_schema()); - let data = r#"{"edge":"Knows","from":"Alice","to":"Bob"} -{"type":"Person","data":{"name":"Alice"}} -{"type":"Person","data":{"name":"Bob"}}"#; - - load_jsonl_reader( - &mut storage, - Cursor::new(data.as_bytes()), - &person_key_props(), - ) - .unwrap(); - - let knows = &storage.edge_segments["Knows"]; - assert_eq!(knows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_deduplicates_duplicate_edges() { - let mut storage = build_storage(test_schema()); - let data = r#"{"type":"Person","data":{"name":"Alice"}} -{"type":"Person","data":{"name":"Bob"}} -{"edge":"Knows","from":"Alice","to":"Bob"} -{"edge":"Knows","from":"Alice","to":"Bob"}"#; - - load_jsonl_data(&mut storage, data, &person_key_props()).unwrap(); - let knows = &storage.edge_segments["Knows"]; - assert_eq!(knows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_edges_require_endpoint_key_annotations() { - let schema = r#"node Event { - title: String - at: Date -} -edge Precedes: Event -> Event -"#; - let mut storage = build_storage(schema); - let data = r#"{"type":"Event","data":{"title":"Kickoff","at":"2026-02-14"}} -{"type":"Event","data":{"title":"Wrap","at":"2026-02-15"}} -{"edge":"Precedes","from":"Kickoff","to":"Wrap"}"#; - - let err = load_jsonl_data(&mut storage, data, &HashMap::new()).unwrap_err(); - assert!( - err.to_string() - .contains("requires @key on source type 'Event' and destination type 'Event'"), - "unexpected error: {err}" - ); - } - - #[test] - fn load_jsonl_edges_resolve_by_non_name_key() { - let schema = r#"node User { - uid: String @key - display_name: String -} -edge Follows: User -> User -"#; - let mut storage = build_storage(schema); - let key_props = HashMap::from([("User".to_string(), "uid".to_string())]); - let data = r#"{"type":"User","data":{"uid":"usr_01","display_name":"Alice"}} -{"type":"User","data":{"uid":"usr_02","display_name":"Bob"}} -{"edge":"Follows","from":"usr_01","to":"usr_02"}"#; - - load_jsonl_data(&mut storage, data, &key_props).unwrap(); - let follows = &storage.edge_segments["Follows"]; - assert_eq!(follows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_edges_resolve_by_user_property_named_id() { - let schema = r#"node User { - id: String @key - display_name: String -} -edge Follows: User -> User -"#; - let mut storage = build_storage(schema); - let key_props = HashMap::from([("User".to_string(), "id".to_string())]); - let data = r#"{"type":"User","data":{"id":"usr_01","display_name":"Alice"}} -{"type":"User","data":{"id":"usr_02","display_name":"Bob"}} -{"edge":"Follows","from":"usr_01","to":"usr_02"}"#; - - load_jsonl_data(&mut storage, data, &key_props).unwrap(); - - let users = storage.get_all_nodes("User").unwrap().unwrap(); - let user_ids = users - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(user_ids.value(0), "usr_01"); - assert_eq!(user_ids.value(1), "usr_02"); - - let follows = &storage.edge_segments["Follows"]; - assert_eq!(follows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_edges_parse_non_string_key_tokens() { - let schema = r#"node User { - uid: U64 @key - display_name: String -} -edge Follows: User -> User -"#; - let mut storage = build_storage(schema); - let key_props = HashMap::from([("User".to_string(), "uid".to_string())]); - let data = r#"{"type":"User","data":{"uid":1,"display_name":"Alice"}} -{"type":"User","data":{"uid":2,"display_name":"Bob"}} -{"edge":"Follows","from":"1","to":"2"}"#; - - load_jsonl_data(&mut storage, data, &key_props).unwrap(); - let follows = &storage.edge_segments["Follows"]; - assert_eq!(follows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_rejects_invalid_node_enum_values() { - let schema = r#"node Person { - name: String @key - role: enum(admin, member, guest) -}"#; - let mut storage = build_storage(schema); - let err = load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Bad","role":"superadmin"}}"#, - &HashMap::from([("Person".to_string(), "name".to_string())]), - ) - .unwrap_err(); - assert_eq!( - err.to_string(), - "storage error: invalid enum value 'superadmin' for Person.role (expected: admin, guest, member)" - ); - } - - #[test] - fn load_jsonl_rejects_invalid_edge_enum_values() { - let schema = r#"node Person { - name: String @key -} -edge WorksWith: Person -> Person { - role: enum(lead, contributor) -}"#; - let mut storage = build_storage(schema); - let data = r#"{"type":"Person","data":{"name":"Alice"}} -{"type":"Person","data":{"name":"Bob"}} -{"edge":"WorksWith","from":"Alice","to":"Bob","data":{"role":"manager"}}"#; - let err = load_jsonl_data( - &mut storage, - data, - &HashMap::from([("Person".to_string(), "name".to_string())]), - ) - .unwrap_err(); - assert_eq!( - err.to_string(), - "storage error: invalid enum value 'manager' for WorksWith.role (expected: contributor, lead)" - ); - } - - #[test] - fn load_jsonl_rejects_wrong_type_for_nullable_node_field() { - let schema = r#"node Person { - name: String @key - age: I32? -}"#; - let mut storage = build_storage(schema); - let err = load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Bad","age":"not-a-number"}}"#, - &HashMap::from([("Person".to_string(), "name".to_string())]), - ) - .unwrap_err(); - assert_eq!( - err.to_string(), - r#"storage error: type mismatch for Person.age: expected I32, got String "not-a-number""# - ); - } -}