diff --git a/crates/omnigraph/src/loader/constraints.rs b/crates/omnigraph/src/loader/constraints.rs deleted file mode 100644 index d76decb..0000000 --- a/crates/omnigraph/src/loader/constraints.rs +++ /dev/null @@ -1,476 +0,0 @@ -use std::collections::HashMap; -#[cfg(test)] -use std::collections::HashSet; - -use arrow_array::{ - Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, - Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array, -}; -use arrow_schema::{DataType, Field, Schema}; - -use crate::catalog::schema_ir::SchemaIR; -use crate::error::{NanoError, Result}; - -use super::super::graph::DatasetAccumulator; - -#[derive(Debug, Default)] -pub(crate) struct NodeConstraintAnnotations { - pub(crate) key_props: HashMap, - pub(crate) unique_props: HashMap>, -} - -pub(crate) fn load_node_constraint_annotations( - schema_ir: &SchemaIR, -) -> Result { - let mut constraints = NodeConstraintAnnotations::default(); - - for node in schema_ir.node_types() { - let mut node_key_prop: Option = None; - let mut node_unique_props: Vec = Vec::new(); - - for prop in &node.properties { - if prop.key && node_key_prop.replace(prop.name.clone()).is_some() { - return Err(NanoError::Storage(format!( - "node type {} has multiple @key properties; only one is currently supported", - node.name - ))); - } - if prop.unique { - node_unique_props.push(prop.name.clone()); - } - } - - if let Some(prop_name) = node_key_prop { - if !node_unique_props.contains(&prop_name) { - node_unique_props.push(prop_name.clone()); - } - constraints.key_props.insert(node.name.clone(), prop_name); - } - if !node_unique_props.is_empty() { - node_unique_props.sort(); - node_unique_props.dedup(); - constraints - .unique_props - .insert(node.name.clone(), node_unique_props); - } - } - - Ok(constraints) -} - -pub(crate) fn enforce_node_unique_constraints( - storage: &DatasetAccumulator, - unique_props: &HashMap>, -) -> Result<()> { - for (type_name, properties) in unique_props { - let Some(batch) = storage.get_all_nodes(type_name)? else { - continue; - }; - - for property in properties { - let prop_idx = - node_property_index(batch.schema().as_ref(), property).ok_or_else(|| { - NanoError::Storage(format!( - "node type {} missing @unique property {}", - type_name, property - )) - })?; - let arr = batch.column(prop_idx); - let mut seen: HashMap = HashMap::new(); - for row in 0..batch.num_rows() { - let Some(value) = unique_value_string(arr, row, type_name, property)? else { - continue; - }; - if let Some(prev_row) = seen.insert(value.clone(), row) { - return Err(NanoError::UniqueConstraint { - type_name: type_name.clone(), - property: property.clone(), - value, - first_row: prev_row, - second_row: row, - }); - } - } - } - } - Ok(()) -} - -#[cfg(test)] -pub(crate) fn collect_incoming_node_types(data_source: &str) -> Result> { - let mut node_types = HashSet::new(); - for line in data_source.lines() { - let line = line.trim(); - if line.is_empty() || line.starts_with("//") { - continue; - } - - let obj: serde_json::Value = serde_json::from_str(line) - .map_err(|e| NanoError::Storage(format!("JSON parse error: {}", e)))?; - if let Some(type_name) = obj.get("type").and_then(|v| v.as_str()) { - node_types.insert(type_name.to_string()); - } - } - Ok(node_types) -} - -pub(crate) fn build_name_seed_for_keyed_load( - storage: &DatasetAccumulator, - key_props: &HashMap, -) -> Result> { - let mut seed = HashMap::new(); - - for (type_name, key_prop) in key_props { - let Some(batch) = storage.get_all_nodes(type_name)? else { - continue; - }; - - let key_idx = node_property_index(batch.schema().as_ref(), key_prop).ok_or_else(|| { - NanoError::Storage(format!( - "node type {} missing @key property {}", - type_name, key_prop - )) - })?; - let key_arr = batch.column(key_idx).clone(); - let id_arr = batch - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - NanoError::Storage(format!("node type {} has non-UInt64 id column", type_name)) - })?; - - for row in 0..batch.num_rows() { - let key = key_value_string(&key_arr, row, key_prop)?; - seed.insert((type_name.clone(), key), id_arr.value(row)); - } - } - - Ok(seed) -} - -pub(crate) fn build_name_seed_for_append( - storage: &DatasetAccumulator, - key_props: &HashMap, -) -> Result> { - build_name_seed_for_keyed_load(storage, key_props) -} - -pub(crate) fn node_property_index(schema: &Schema, prop_name: &str) -> Option { - schema - .fields() - .iter() - .enumerate() - .skip(1) - .find_map(|(idx, field)| (field.name() == prop_name).then_some(idx)) -} - -pub(crate) fn node_property_field<'a>(schema: &'a Schema, prop_name: &str) -> Option<&'a Field> { - node_property_index(schema, prop_name).map(|idx| schema.field(idx)) -} - -pub(crate) fn key_value_string(array: &ArrayRef, row: usize, prop_name: &str) -> Result { - let value = scalar_value_string(array, row, "key", None, prop_name)?; - if let Some(value) = value { - return Ok(value); - } - Err(NanoError::Storage(format!( - "@key property {} cannot be null", - prop_name - ))) -} - -fn unique_value_string( - array: &ArrayRef, - row: usize, - type_name: &str, - prop_name: &str, -) -> Result> { - scalar_value_string(array, row, "unique", Some(type_name), prop_name) -} - -fn scalar_value_string( - array: &ArrayRef, - row: usize, - annotation: &str, - type_name: Option<&str>, - prop_name: &str, -) -> Result> { - if array.is_null(row) { - return Ok(None); - } - - let value = match array.data_type() { - DataType::Utf8 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Boolean => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Int32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Int64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::UInt32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::UInt64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Float32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Float64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Date32 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - DataType::Date64 => array - .as_any() - .downcast_ref::() - .map(|a| a.value(row).to_string()), - _ => None, - }; - - let value = value.ok_or_else(|| { - let target = match type_name { - Some(name) => format!("{}.{}", name, prop_name), - None => prop_name.to_string(), - }; - NanoError::Storage(format!( - "unsupported @{} data type {:?} for {}", - annotation, - array.data_type(), - target - )) - })?; - - Ok(Some(value)) -} - -#[cfg(test)] -mod tests { - use std::collections::{HashMap, HashSet}; - - use arrow_array::StringArray; - - use crate::catalog::schema_ir::{build_catalog_from_ir, build_schema_ir}; - use crate::schema::parser::parse_schema; - - use super::super::jsonl::load_jsonl_data; - use super::*; - - fn build_schema_ir_and_storage(schema_src: &str) -> (SchemaIR, DatasetAccumulator) { - let schema = parse_schema(schema_src).unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let catalog = build_catalog_from_ir(&ir).unwrap(); - (ir, DatasetAccumulator::new(catalog)) - } - - #[test] - fn load_node_constraint_annotations_collects_key_and_unique() { - let schema = r#"node Person { - name: String @key - email: String @unique - alias: String? @unique -}"#; - let (ir, _) = build_schema_ir_and_storage(schema); - let annotations = load_node_constraint_annotations(&ir).unwrap(); - - assert_eq!(annotations.key_props.get("Person").unwrap(), "name"); - assert_eq!( - annotations.unique_props.get("Person").unwrap(), - &vec!["alias".to_string(), "email".to_string(), "name".to_string()] - ); - } - - #[test] - fn collect_incoming_node_types_ignores_comments_and_blanks() { - let data = r#" -// comment -{"type":"Person","data":{"name":"Alice"}} - -{"edge":"Knows","from":"Alice","to":"Bob"} -{"type":"Company","data":{"name":"Acme"}} -"#; - let types = collect_incoming_node_types(data).unwrap(); - assert_eq!( - types, - HashSet::from(["Person".to_string(), "Company".to_string()]) - ); - } - - #[test] - fn enforce_node_unique_constraints_detects_duplicate_non_null() { - let schema = r#"node Person { - name: String - email: String? @unique -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::new(); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Alice","email":"dupe@example.com"}} -{"type":"Person","data":{"name":"Bob","email":"dupe@example.com"}}"#, - &key_props, - ) - .unwrap(); - - let unique_props = HashMap::from([("Person".to_string(), vec!["email".to_string()])]); - let err = enforce_node_unique_constraints(&storage, &unique_props).unwrap_err(); - match err { - NanoError::UniqueConstraint { - type_name, - property, - value, - .. - } => { - assert_eq!(type_name, "Person"); - assert_eq!(property, "email"); - assert_eq!(value, "dupe@example.com"); - } - other => panic!("expected UniqueConstraint, got {other}"), - } - } - - #[test] - fn enforce_node_unique_constraints_allows_multiple_nulls() { - let schema = r#"node Person { - name: String - nick: String? @unique -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::new(); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Alice","nick":null}} -{"type":"Person","data":{"name":"Bob","nick":null}}"#, - &key_props, - ) - .unwrap(); - - let unique_props = HashMap::from([("Person".to_string(), vec!["nick".to_string()])]); - enforce_node_unique_constraints(&storage, &unique_props).unwrap(); - } - - #[test] - fn enforce_node_unique_constraints_uses_user_property_named_id() { - let schema = r#"node Person { - id: String @unique - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::new(); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"id":"user-1","name":"Alice"}} -{"type":"Person","data":{"id":"user-1","name":"Bob"}}"#, - &key_props, - ) - .unwrap(); - - let unique_props = HashMap::from([("Person".to_string(), vec!["id".to_string()])]); - let err = enforce_node_unique_constraints(&storage, &unique_props).unwrap_err(); - match err { - NanoError::UniqueConstraint { - type_name, - property, - value, - .. - } => { - assert_eq!(type_name, "Person"); - assert_eq!(property, "id"); - assert_eq!(value, "user-1"); - } - other => panic!("expected UniqueConstraint, got {other}"), - } - } - - #[test] - fn build_name_seed_for_keyed_load_uses_declared_key_property() { - let schema = r#"node Person { - uid: String @key - name: String -} -node Company { - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::from([("Person".to_string(), "uid".to_string())]); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"uid":"u1","name":"Alice"}} -{"type":"Company","data":{"name":"Acme"}}"#, - &key_props, - ) - .unwrap(); - - let seed = build_name_seed_for_keyed_load(&storage, &key_props).unwrap(); - - assert!(seed.contains_key(&("Person".to_string(), "u1".to_string()))); - assert!(!seed.contains_key(&("Company".to_string(), "Acme".to_string()))); - } - - #[test] - fn build_name_seed_for_keyed_load_uses_user_property_named_id() { - let schema = r#"node Person { - id: String @key - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::from([("Person".to_string(), "id".to_string())]); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"id":"user-1","name":"Alice"}}"#, - &key_props, - ) - .unwrap(); - - let seed = build_name_seed_for_keyed_load(&storage, &key_props).unwrap(); - assert!(seed.contains_key(&("Person".to_string(), "user-1".to_string()))); - } - - #[test] - fn build_name_seed_for_append_keeps_all_existing_keyed_nodes() { - let schema = r#"node Person { - uid: String @key - name: String -} -node Company { - name: String -}"#; - let (_, mut storage) = build_schema_ir_and_storage(schema); - let key_props = HashMap::from([("Person".to_string(), "uid".to_string())]); - load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"uid":"u1","name":"Alice"}} -{"type":"Company","data":{"name":"Acme"}}"#, - &key_props, - ) - .unwrap(); - - let seed = build_name_seed_for_append(&storage, &key_props).unwrap(); - assert!(seed.contains_key(&("Person".to_string(), "u1".to_string()))); - assert!(!seed.contains_key(&("Company".to_string(), "Acme".to_string()))); - } - - #[test] - fn key_value_string_rejects_null() { - let arr: ArrayRef = std::sync::Arc::new(StringArray::from(vec![Some("x"), None])); - assert_eq!(key_value_string(&arr, 0, "name").unwrap(), "x"); - let err = key_value_string(&arr, 1, "name").unwrap_err(); - assert!(err.to_string().contains("cannot be null")); - } -} diff --git a/crates/omnigraph/src/loader/embeddings.rs b/crates/omnigraph/src/loader/embeddings.rs deleted file mode 100644 index 58ecb93..0000000 --- a/crates/omnigraph/src/loader/embeddings.rs +++ /dev/null @@ -1,1732 +0,0 @@ -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; -use std::io::{BufRead, BufWriter, Write}; -use std::path::{Path, PathBuf}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use serde::{Deserialize, Serialize}; - -use crate::catalog::schema_ir::{PropDef, SchemaIR}; -use crate::embedding::EmbeddingClient; -use crate::error::{NanoError, Result}; -use crate::store::manifest::hash_string; -use crate::types::ScalarType; - -const EMBEDDING_CACHE_FILENAME: &str = "_embedding_cache.jsonl"; -const DEFAULT_EMBED_BATCH_SIZE: usize = 64; -const DEFAULT_EMBED_CHUNK_CHARS: usize = 0; -const DEFAULT_EMBED_CHUNK_OVERLAP_CHARS: usize = 128; -const DEFAULT_EMBED_CACHE_MAX_ENTRIES: usize = 50_000; -const DEFAULT_EMBED_CACHE_LOCK_STALE_SECS: usize = 60; -const EMBEDDING_CACHE_LOCK_RETRIES: usize = 200; -const EMBEDDING_CACHE_LOCK_RETRY_DELAY_MS: u64 = 10; - -#[derive(Debug, Clone)] -pub(crate) struct EmbedSpec { - pub target_prop: String, - pub source_prop: String, - pub dim: usize, -} - -#[derive(Debug, Clone)] -pub(crate) struct EmbedValueRequest { - pub source_text: String, - pub dim: usize, -} - -#[cfg_attr(not(test), allow(dead_code))] -#[derive(Debug, Clone)] -struct PendingAssignment { - line_index: usize, - target_prop: String, - source_text: String, - dim: usize, - content_hash: String, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct CacheKey { - model: String, - dim: usize, - content_hash: String, - chunk_chars: usize, - chunk_overlap_chars: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct CacheRecord { - model: String, - dim: usize, - content_hash: String, - vector: Vec, - #[serde(default)] - chunk_chars: usize, - #[serde(default)] - chunk_overlap_chars: usize, -} - -enum ParsedLine { - Raw(String), - Json(serde_json::Value), -} - -struct StreamPendingLine { - line_id: usize, - line: ParsedLine, - missing_assignments: usize, -} - -#[derive(Debug, Clone)] -struct StreamPendingAssignment { - line_id: usize, - target_prop: String, - source_text: String, - dim: usize, - content_hash: String, -} - -impl StreamPendingAssignment { - fn cache_key(&self, model: &str, chunking: EmbedChunkingConfig) -> CacheKey { - CacheKey { - model: model.to_string(), - dim: self.dim, - content_hash: self.content_hash.clone(), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct EmbedChunkingConfig { - chunk_chars: usize, - chunk_overlap_chars: usize, -} - -impl EmbedChunkingConfig { - fn from_env() -> Self { - let chunk_chars = parse_env_usize("NANOGRAPH_EMBED_CHUNK_CHARS", DEFAULT_EMBED_CHUNK_CHARS); - let overlap = parse_env_usize( - "NANOGRAPH_EMBED_CHUNK_OVERLAP_CHARS", - DEFAULT_EMBED_CHUNK_OVERLAP_CHARS, - ); - Self::new(chunk_chars, overlap) - } - - fn new(chunk_chars: usize, chunk_overlap_chars: usize) -> Self { - let chunk_overlap_chars = if chunk_chars == 0 { - 0 - } else { - chunk_overlap_chars.min(chunk_chars.saturating_sub(1)) - }; - Self { - chunk_chars, - chunk_overlap_chars, - } - } - - fn is_enabled(self) -> bool { - self.chunk_chars > 0 - } -} - -#[allow(dead_code)] -pub(crate) async fn materialize_embeddings_for_load( - db_path: &Path, - schema_ir: &SchemaIR, - data_source: &str, -) -> Result { - materialize_embeddings_for_load_inner(db_path, schema_ir, data_source, None).await -} - -#[cfg_attr(not(test), allow(dead_code))] -async fn materialize_embeddings_for_load_inner( - db_path: &Path, - schema_ir: &SchemaIR, - data_source: &str, - client_override: Option<&EmbeddingClient>, -) -> Result { - materialize_embeddings_for_load_inner_with_chunking( - db_path, - schema_ir, - data_source, - client_override, - EmbedChunkingConfig::from_env(), - ) - .await -} - -pub(crate) fn has_embedding_specs(schema_ir: &SchemaIR) -> bool { - schema_ir.node_types().any(|node| { - node.properties - .iter() - .any(|prop| prop.embed_source.is_some()) - }) -} - -pub(crate) async fn materialize_embeddings_for_load_to_tempfile( - db_path: &Path, - schema_ir: &SchemaIR, - reader: R, -) -> Result { - materialize_embeddings_for_load_to_tempfile_inner(db_path, schema_ir, reader, None).await -} - -pub(crate) async fn resolve_embedding_requests( - db_path: &Path, - requests: &[EmbedValueRequest], -) -> Result>> { - resolve_embedding_requests_with_chunking(db_path, requests, EmbedChunkingConfig::from_env()) - .await -} - -async fn materialize_embeddings_for_load_to_tempfile_inner( - db_path: &Path, - schema_ir: &SchemaIR, - reader: R, - client_override: Option<&EmbeddingClient>, -) -> Result { - materialize_embeddings_for_load_to_tempfile_inner_with_chunking( - db_path, - schema_ir, - reader, - client_override, - EmbedChunkingConfig::from_env(), - ) - .await -} - -async fn resolve_embedding_requests_with_chunking( - db_path: &Path, - requests: &[EmbedValueRequest], - chunking: EmbedChunkingConfig, -) -> Result>> { - if requests.is_empty() { - return Ok(Vec::new()); - } - - let cache_path = db_path.join(EMBEDDING_CACHE_FILENAME); - let mut cache = load_embedding_cache(&cache_path)?; - let client = EmbeddingClient::from_env() - .map_err(|err| NanoError::Storage(format!("embedding initialization failed: {}", err)))?; - let model = client.model().to_string(); - let batch_size = parse_env_usize("NANOGRAPH_EMBED_BATCH_SIZE", DEFAULT_EMBED_BATCH_SIZE); - - let mut results: Vec>> = vec![None; requests.len()]; - let mut missing_by_dim: BTreeMap> = BTreeMap::new(); - let mut missing_indices: HashMap> = HashMap::new(); - - for (idx, request) in requests.iter().enumerate() { - if request.dim == 0 { - return Err(NanoError::Storage( - "embedding dimension must be greater than zero".to_string(), - )); - } - - let key = CacheKey { - model: model.clone(), - dim: request.dim, - content_hash: hash_string(&request.source_text), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - }; - - if let Some(vector) = cache.get(&key) { - results[idx] = Some(vector.clone()); - continue; - } - - missing_indices.entry(key.clone()).or_default().push(idx); - let entries = missing_by_dim.entry(request.dim).or_default(); - if !entries.iter().any(|(existing, _)| existing == &key) { - entries.push((key, request.source_text.clone())); - } - } - - let mut new_cache_records = Vec::new(); - for (dim, entries) in missing_by_dim { - if chunking.is_enabled() { - for (key, text) in entries { - let vector = - embed_text_with_chunking(&client, &text, dim, batch_size, chunking).await?; - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - continue; - } - - for chunk in entries.chunks(batch_size.max(1)) { - let texts: Vec = chunk.iter().map(|(_, text)| text.clone()).collect(); - let vectors = client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if vectors.len() != chunk.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - chunk.len(), - vectors.len() - ))); - } - - for ((key, _), vector) in chunk.iter().zip(vectors.into_iter()) { - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - } - } - - append_embedding_cache(&cache_path, &new_cache_records)?; - - for (key, indices) in missing_indices { - let vector = cache.get(&key).ok_or_else(|| { - NanoError::Storage(format!( - "embedding cache miss for content hash {}", - key.content_hash - )) - })?; - for idx in indices { - results[idx] = Some(vector.clone()); - } - } - - results - .into_iter() - .enumerate() - .map(|(idx, vector)| { - vector.ok_or_else(|| { - NanoError::Storage(format!( - "missing embedding result for request index {}", - idx - )) - }) - }) - .collect() -} - -async fn materialize_embeddings_for_load_to_tempfile_inner_with_chunking( - db_path: &Path, - schema_ir: &SchemaIR, - reader: R, - client_override: Option<&EmbeddingClient>, - chunking: EmbedChunkingConfig, -) -> Result { - let output_path = create_materialized_temp_file(db_path)?; - let embed_specs = collect_embed_specs(schema_ir)?; - let cache_path = db_path.join(EMBEDDING_CACHE_FILENAME); - - if embed_specs.is_empty() { - let mut writer = BufWriter::new(std::fs::File::create(&output_path)?); - copy_reader_to_writer(reader, &mut writer)?; - writer.flush()?; - return Ok(output_path); - } - - let mut cache = load_embedding_cache(&cache_path)?; - let owned_client; - let client = if let Some(client) = client_override { - client - } else { - owned_client = EmbeddingClient::from_env().map_err(|err| { - NanoError::Storage(format!("embedding initialization failed: {}", err)) - })?; - &owned_client - }; - let model = client.model().to_string(); - let batch_size = parse_env_usize("NANOGRAPH_EMBED_BATCH_SIZE", DEFAULT_EMBED_BATCH_SIZE); - let mut writer = BufWriter::new(std::fs::File::create(&output_path)?); - let mut pending_lines: VecDeque = VecDeque::new(); - let mut pending_by_dim: BTreeMap> = BTreeMap::new(); - let mut new_cache_records = Vec::new(); - let mut next_line_id = 0usize; - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() || trimmed.starts_with("//") { - pending_lines.push_back(StreamPendingLine { - line_id: next_line_id, - line: ParsedLine::Raw(line), - missing_assignments: 0, - }); - next_line_id += 1; - flush_ready_stream_lines(&mut writer, &mut pending_lines)?; - continue; - } - - let mut obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!("JSON parse error on line {}: {}", line_no + 1, e)) - })?; - let mut output_line = ParsedLine::Raw(line); - let mut missing_assignments = 0usize; - - if let Some(type_name) = obj - .get("type") - .and_then(|value| value.as_str()) - .map(|value| value.to_string()) - && let Some(specs) = embed_specs.get(type_name.as_str()) - { - let data_obj = obj - .get_mut("data") - .and_then(|value| value.as_object_mut()) - .ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} is missing object field `data`", - type_name, - line_no + 1 - )) - })?; - let mut mutated = false; - - for spec in specs { - let needs_embedding = match data_obj.get(&spec.target_prop) { - Some(value) => value.is_null(), - None => true, - }; - if !needs_embedding { - continue; - } - - let source_value = data_obj.get(&spec.source_prop).ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} missing @embed source property `{}` for `{}`", - type_name, - line_no + 1, - spec.source_prop, - spec.target_prop - )) - })?; - let source_text = source_value.as_str().ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} @embed source property `{}` must be String", - type_name, - line_no + 1, - spec.source_prop - )) - })?; - - let assignment = StreamPendingAssignment { - line_id: next_line_id, - target_prop: spec.target_prop.clone(), - source_text: source_text.to_string(), - dim: spec.dim, - content_hash: hash_string(source_text), - }; - let cache_key = assignment.cache_key(&model, chunking); - if let Some(vector) = cache.get(&cache_key) { - data_obj.insert( - spec.target_prop.clone(), - serde_json::to_value(vector).map_err(|e| { - NanoError::Storage(format!("serialize embedding vector failed: {}", e)) - })?, - ); - } else { - missing_assignments += 1; - pending_by_dim - .entry(spec.dim) - .or_default() - .push_back(assignment); - } - mutated = true; - } - - if mutated { - output_line = ParsedLine::Json(obj); - } - } - - pending_lines.push_back(StreamPendingLine { - line_id: next_line_id, - line: output_line, - missing_assignments, - }); - next_line_id += 1; - - let mut runtime = StreamEmbedRuntime { - cache: &mut cache, - model: &model, - client, - new_cache_records: &mut new_cache_records, - batch_size, - chunking, - }; - resolve_pending_stream_batches( - &mut pending_by_dim, - &mut pending_lines, - &mut runtime, - false, - ) - .await?; - flush_ready_stream_lines(&mut writer, &mut pending_lines)?; - } - - let mut runtime = StreamEmbedRuntime { - cache: &mut cache, - model: &model, - client, - new_cache_records: &mut new_cache_records, - batch_size, - chunking, - }; - resolve_pending_stream_batches(&mut pending_by_dim, &mut pending_lines, &mut runtime, true) - .await?; - flush_ready_stream_lines(&mut writer, &mut pending_lines)?; - writer.flush()?; - - if !pending_lines.is_empty() { - return Err(NanoError::Storage( - "embedding materialization left unresolved output rows".to_string(), - )); - } - - append_embedding_cache(&cache_path, &new_cache_records)?; - Ok(output_path) -} - -#[cfg_attr(not(test), allow(dead_code))] -async fn materialize_embeddings_for_load_inner_with_chunking( - db_path: &Path, - schema_ir: &SchemaIR, - data_source: &str, - client_override: Option<&EmbeddingClient>, - chunking: EmbedChunkingConfig, -) -> Result { - let embed_specs = collect_embed_specs(schema_ir)?; - if embed_specs.is_empty() { - return Ok(data_source.to_string()); - } - - let mut lines = Vec::new(); - let mut pending = Vec::new(); - parse_input_lines(data_source, &embed_specs, &mut lines, &mut pending)?; - if pending.is_empty() { - return Ok(data_source.to_string()); - } - - let cache_path = db_path.join(EMBEDDING_CACHE_FILENAME); - let mut cache = load_embedding_cache(&cache_path)?; - - let owned_client; - let client = if let Some(client) = client_override { - client - } else { - owned_client = EmbeddingClient::from_env().map_err(|err| { - NanoError::Storage(format!("embedding initialization failed: {}", err)) - })?; - &owned_client - }; - let model = client.model().to_string(); - - let mut missing_by_dim: BTreeMap> = BTreeMap::new(); - for assignment in &pending { - let key = CacheKey { - model: model.clone(), - dim: assignment.dim, - content_hash: assignment.content_hash.clone(), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - }; - if cache.contains_key(&key) { - continue; - } - let entries = missing_by_dim.entry(assignment.dim).or_default(); - if !entries.iter().any(|(existing, _)| existing == &key) { - entries.push((key, assignment.source_text.clone())); - } - } - - let batch_size = parse_env_usize("NANOGRAPH_EMBED_BATCH_SIZE", DEFAULT_EMBED_BATCH_SIZE); - let mut new_cache_records = Vec::new(); - for (dim, entries) in missing_by_dim { - if chunking.is_enabled() { - for (key, text) in entries { - let vector = - embed_text_with_chunking(client, &text, dim, batch_size, chunking).await?; - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - continue; - } - - for chunk in entries.chunks(batch_size) { - let texts: Vec = chunk.iter().map(|(_, text)| text.clone()).collect(); - let vectors = client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if vectors.len() != chunk.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - chunk.len(), - vectors.len() - ))); - } - for ((key, _), vector) in chunk.iter().zip(vectors.into_iter()) { - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - key.content_hash, - dim, - vector.len() - ))); - } - cache.insert(key.clone(), vector.clone()); - new_cache_records.push(CacheRecord { - model: key.model.clone(), - dim: key.dim, - content_hash: key.content_hash.clone(), - vector, - chunk_chars: key.chunk_chars, - chunk_overlap_chars: key.chunk_overlap_chars, - }); - } - } - } - append_embedding_cache(&cache_path, &new_cache_records)?; - - apply_embeddings_to_lines(&mut lines, &pending, &cache, &model, chunking)?; - render_output_lines(data_source, lines) -} - -#[cfg_attr(not(test), allow(dead_code))] -fn parse_input_lines( - data_source: &str, - embed_specs: &HashMap>, - lines: &mut Vec, - pending: &mut Vec, -) -> Result<()> { - for (line_no, raw_line) in data_source.lines().enumerate() { - let trimmed = raw_line.trim(); - if trimmed.is_empty() || trimmed.starts_with("//") { - lines.push(ParsedLine::Raw(raw_line.to_string())); - continue; - } - - let mut obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!("JSON parse error on line {}: {}", line_no + 1, e)) - })?; - - if let Some(type_name) = obj - .get("type") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - && let Some(specs) = embed_specs.get(type_name.as_str()) - { - let data_obj = obj - .get_mut("data") - .and_then(|v| v.as_object_mut()) - .ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} is missing object field `data`", - type_name, - line_no + 1 - )) - })?; - let line_index = lines.len(); - - for spec in specs { - let needs_embedding = match data_obj.get(&spec.target_prop) { - Some(value) => value.is_null(), - None => true, - }; - if !needs_embedding { - continue; - } - - let source_value = data_obj.get(&spec.source_prop).ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} missing @embed source property `{}` for `{}`", - type_name, - line_no + 1, - spec.source_prop, - spec.target_prop - )) - })?; - let source_text = source_value.as_str().ok_or_else(|| { - NanoError::Storage(format!( - "node {} line {} @embed source property `{}` must be String", - type_name, - line_no + 1, - spec.source_prop - )) - })?; - - pending.push(PendingAssignment { - line_index, - target_prop: spec.target_prop.clone(), - source_text: source_text.to_string(), - dim: spec.dim, - content_hash: hash_string(source_text), - }); - } - } - - lines.push(ParsedLine::Json(obj)); - } - Ok(()) -} - -#[cfg_attr(not(test), allow(dead_code))] -fn apply_embeddings_to_lines( - lines: &mut [ParsedLine], - pending: &[PendingAssignment], - cache: &HashMap>, - model: &str, - chunking: EmbedChunkingConfig, -) -> Result<()> { - for assignment in pending { - let key = CacheKey { - model: model.to_string(), - dim: assignment.dim, - content_hash: assignment.content_hash.clone(), - chunk_chars: chunking.chunk_chars, - chunk_overlap_chars: chunking.chunk_overlap_chars, - }; - let vector = cache.get(&key).ok_or_else(|| { - NanoError::Storage(format!( - "embedding cache miss for content hash {}", - assignment.content_hash - )) - })?; - let line = lines.get_mut(assignment.line_index).ok_or_else(|| { - NanoError::Storage(format!( - "embedding assignment line out of range: {}", - assignment.line_index - )) - })?; - let ParsedLine::Json(obj) = line else { - return Err(NanoError::Storage(format!( - "embedding assignment line {} is not JSON", - assignment.line_index - ))); - }; - let data_obj = obj - .get_mut("data") - .and_then(|v| v.as_object_mut()) - .ok_or_else(|| { - NanoError::Storage("node row is missing object field `data`".to_string()) - })?; - data_obj.insert( - assignment.target_prop.clone(), - serde_json::to_value(vector).map_err(|e| { - NanoError::Storage(format!("serialize embedding vector failed: {}", e)) - })?, - ); - } - Ok(()) -} - -#[cfg_attr(not(test), allow(dead_code))] -fn render_output_lines(original: &str, lines: Vec) -> Result { - let mut out = String::new(); - for (idx, line) in lines.into_iter().enumerate() { - if idx > 0 { - out.push('\n'); - } - match line { - ParsedLine::Raw(raw) => out.push_str(&raw), - ParsedLine::Json(obj) => { - out.push_str(&serde_json::to_string(&obj).map_err(|e| { - NanoError::Storage(format!("serialize JSONL row failed: {}", e)) - })?) - } - } - } - if original.ends_with('\n') { - out.push('\n'); - } - Ok(out) -} - -async fn resolve_pending_stream_batches( - pending_by_dim: &mut BTreeMap>, - pending_lines: &mut VecDeque, - runtime: &mut StreamEmbedRuntime<'_>, - flush_all: bool, -) -> Result<()> { - loop { - let next_dim = pending_by_dim - .iter() - .find(|(_, queue)| { - if flush_all { - !queue.is_empty() - } else { - queue.len() >= runtime.batch_size.max(1) - } - }) - .map(|(dim, _)| *dim); - let Some(dim) = next_dim else { - break; - }; - - let queue = pending_by_dim.get_mut(&dim).ok_or_else(|| { - NanoError::Storage(format!("missing pending embedding queue for dim {}", dim)) - })?; - resolve_pending_stream_batch(queue, pending_lines, runtime).await?; - if queue.is_empty() { - pending_by_dim.remove(&dim); - } - } - - Ok(()) -} - -async fn resolve_pending_stream_batch( - queue: &mut VecDeque, - pending_lines: &mut VecDeque, - runtime: &mut StreamEmbedRuntime<'_>, -) -> Result<()> { - let batch_size = runtime.batch_size.max(1); - let mut assignments = Vec::new(); - let mut unique_entries = Vec::new(); - let mut seen_keys = HashSet::new(); - - while let Some(assignment) = queue.pop_front() { - let cache_key = assignment.cache_key(runtime.model, runtime.chunking); - if seen_keys.insert(cache_key.clone()) { - unique_entries.push((cache_key, assignment.source_text.clone())); - } - assignments.push(assignment); - if unique_entries.len() >= batch_size { - break; - } - } - - if unique_entries.is_empty() { - return Ok(()); - } - - if runtime.chunking.is_enabled() { - for (cache_key, text) in &unique_entries { - let vector = embed_text_with_chunking( - runtime.client, - text, - cache_key.dim, - batch_size, - runtime.chunking, - ) - .await?; - if vector.len() != cache_key.dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - cache_key.content_hash, - cache_key.dim, - vector.len() - ))); - } - runtime.cache.insert(cache_key.clone(), vector.clone()); - runtime.new_cache_records.push(CacheRecord { - model: cache_key.model.clone(), - dim: cache_key.dim, - content_hash: cache_key.content_hash.clone(), - vector, - chunk_chars: cache_key.chunk_chars, - chunk_overlap_chars: cache_key.chunk_overlap_chars, - }); - } - } else { - let texts: Vec = unique_entries - .iter() - .map(|(_, text)| text.clone()) - .collect(); - let dim = unique_entries[0].0.dim; - let vectors = runtime - .client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if vectors.len() != unique_entries.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - unique_entries.len(), - vectors.len() - ))); - } - - for ((cache_key, _), vector) in unique_entries.iter().zip(vectors.into_iter()) { - if vector.len() != cache_key.dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch for {}: expected {}, got {}", - cache_key.content_hash, - cache_key.dim, - vector.len() - ))); - } - runtime.cache.insert(cache_key.clone(), vector.clone()); - runtime.new_cache_records.push(CacheRecord { - model: cache_key.model.clone(), - dim: cache_key.dim, - content_hash: cache_key.content_hash.clone(), - vector, - chunk_chars: cache_key.chunk_chars, - chunk_overlap_chars: cache_key.chunk_overlap_chars, - }); - } - } - - for assignment in &assignments { - apply_stream_assignment( - pending_lines, - assignment, - runtime.cache, - runtime.model, - runtime.chunking, - )?; - } - - Ok(()) -} - -struct StreamEmbedRuntime<'a> { - cache: &'a mut HashMap>, - model: &'a str, - client: &'a EmbeddingClient, - new_cache_records: &'a mut Vec, - batch_size: usize, - chunking: EmbedChunkingConfig, -} - -fn apply_stream_assignment( - pending_lines: &mut VecDeque, - assignment: &StreamPendingAssignment, - cache: &HashMap>, - model: &str, - chunking: EmbedChunkingConfig, -) -> Result<()> { - let cache_key = assignment.cache_key(model, chunking); - let vector = cache.get(&cache_key).ok_or_else(|| { - NanoError::Storage(format!( - "embedding cache miss for content hash {}", - assignment.content_hash - )) - })?; - let line = pending_lines - .iter_mut() - .find(|line| line.line_id == assignment.line_id) - .ok_or_else(|| { - NanoError::Storage(format!( - "embedding assignment line out of range: {}", - assignment.line_id - )) - })?; - let ParsedLine::Json(obj) = &mut line.line else { - return Err(NanoError::Storage(format!( - "embedding assignment line {} is not JSON", - assignment.line_id - ))); - }; - let data_obj = obj - .get_mut("data") - .and_then(|value| value.as_object_mut()) - .ok_or_else(|| NanoError::Storage("node row is missing object field `data`".to_string()))?; - data_obj.insert( - assignment.target_prop.clone(), - serde_json::to_value(vector) - .map_err(|e| NanoError::Storage(format!("serialize embedding vector failed: {}", e)))?, - ); - if line.missing_assignments == 0 { - return Err(NanoError::Storage(format!( - "embedding assignment line {} underflow", - assignment.line_id - ))); - } - line.missing_assignments -= 1; - Ok(()) -} - -fn flush_ready_stream_lines( - writer: &mut BufWriter, - pending_lines: &mut VecDeque, -) -> Result<()> { - while pending_lines - .front() - .map(|line| line.missing_assignments == 0) - .unwrap_or(false) - { - let line = pending_lines.pop_front().ok_or_else(|| { - NanoError::Storage("pending embedding output queue unexpectedly empty".to_string()) - })?; - match line.line { - ParsedLine::Raw(raw) => writer.write_all(raw.as_bytes())?, - ParsedLine::Json(obj) => serde_json::to_writer(&mut *writer, &obj) - .map_err(|e| NanoError::Storage(format!("serialize JSONL row failed: {}", e)))?, - } - writer.write_all(b"\n")?; - } - Ok(()) -} - -fn copy_reader_to_writer( - reader: R, - writer: &mut BufWriter, -) -> Result<()> { - for line in reader.lines() { - let line = line?; - writer.write_all(line.as_bytes())?; - writer.write_all(b"\n")?; - } - Ok(()) -} - -fn create_materialized_temp_file(db_path: &Path) -> Result { - std::fs::create_dir_all(db_path)?; - let pid = std::process::id(); - for attempt in 0..256u32 { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - let path = db_path.join(format!( - ".nanograph_embed_materialized_{}_{}_{}.jsonl", - pid, now, attempt - )); - match std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&path) - { - Ok(_) => return Ok(path), - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue, - Err(err) => return Err(err.into()), - } - } - - Err(NanoError::Storage( - "failed to create temp embedding materialization file".to_string(), - )) -} - -async fn embed_text_with_chunking( - client: &EmbeddingClient, - source_text: &str, - dim: usize, - batch_size: usize, - chunking: EmbedChunkingConfig, -) -> Result> { - let chunks = split_text_into_chunks( - source_text, - chunking.chunk_chars, - chunking.chunk_overlap_chars, - ); - if chunks.len() == 1 { - return client - .embed_text(&chunks[0], dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err))); - } - - let batch_size = batch_size.max(1); - let mut vectors = Vec::with_capacity(chunks.len()); - for chunk_batch in chunks.chunks(batch_size) { - let texts: Vec = chunk_batch.to_vec(); - let mut embedded = client - .embed_texts(&texts, dim) - .await - .map_err(|err| NanoError::Storage(format!("embedding request failed: {}", err)))?; - if embedded.len() != texts.len() { - return Err(NanoError::Storage(format!( - "embedding response size mismatch: expected {}, got {}", - texts.len(), - embedded.len() - ))); - } - vectors.append(&mut embedded); - } - - average_pool_embeddings(&vectors, dim) -} - -fn split_text_into_chunks(text: &str, chunk_chars: usize, overlap_chars: usize) -> Vec { - if chunk_chars == 0 { - return vec![text.to_string()]; - } - - let total_chars = text.chars().count(); - if total_chars <= chunk_chars { - return vec![text.to_string()]; - } - - let mut char_boundaries = Vec::with_capacity(total_chars + 1); - char_boundaries.push(0); - for (idx, _) in text.char_indices().skip(1) { - char_boundaries.push(idx); - } - char_boundaries.push(text.len()); - - let step = chunk_chars.saturating_sub(overlap_chars).max(1); - let mut out = Vec::new(); - let mut start_char = 0usize; - while start_char < total_chars { - let end_char = (start_char + chunk_chars).min(total_chars); - let start_byte = char_boundaries[start_char]; - let end_byte = char_boundaries[end_char]; - out.push(text[start_byte..end_byte].to_string()); - if end_char == total_chars { - break; - } - start_char = start_char.saturating_add(step); - } - - if out.is_empty() { - vec![text.to_string()] - } else { - out - } -} - -fn average_pool_embeddings(vectors: &[Vec], dim: usize) -> Result> { - if vectors.is_empty() { - return Err(NanoError::Storage( - "embedding aggregation received no chunk vectors".to_string(), - )); - } - - let mut accum = vec![0.0f64; dim]; - for vector in vectors { - if vector.len() != dim { - return Err(NanoError::Storage(format!( - "embedding dimension mismatch during chunk aggregation: expected {}, got {}", - dim, - vector.len() - ))); - } - for (idx, value) in vector.iter().enumerate() { - accum[idx] += *value as f64; - } - } - - let inv_len = 1.0f64 / vectors.len() as f64; - let mut pooled: Vec = accum - .into_iter() - .map(|sum| (sum * inv_len) as f32) - .collect(); - let norm = pooled - .iter() - .map(|v| (*v as f64) * (*v as f64)) - .sum::() - .sqrt() as f32; - if norm > f32::EPSILON { - for value in &mut pooled { - *value /= norm; - } - } - Ok(pooled) -} - -pub(crate) fn collect_embed_specs(schema_ir: &SchemaIR) -> Result>> { - let mut specs_by_type: HashMap> = HashMap::new(); - for node in schema_ir.node_types() { - let mut prop_by_name: HashMap<&str, &PropDef> = HashMap::new(); - for prop in &node.properties { - prop_by_name.insert(prop.name.as_str(), prop); - } - - let mut node_specs = Vec::new(); - for prop in &node.properties { - let Some(source_prop) = prop.embed_source.as_ref() else { - continue; - }; - - if prop.list { - return Err(NanoError::Storage(format!( - "@embed target {}.{} cannot be a list type", - node.name, prop.name - ))); - } - let dim = match ScalarType::from_str_name(&prop.scalar_type) { - Some(ScalarType::Vector(dim)) if dim > 0 => dim as usize, - _ => { - return Err(NanoError::Storage(format!( - "@embed target {}.{} must be Vector(dim)", - node.name, prop.name - ))); - } - }; - - let source_def = prop_by_name.get(source_prop.as_str()).ok_or_else(|| { - NanoError::Storage(format!( - "@embed on {}.{} references unknown source property {}", - node.name, prop.name, source_prop - )) - })?; - if source_def.list || source_def.scalar_type != "String" { - return Err(NanoError::Storage(format!( - "@embed source {}.{} must be String", - node.name, source_prop - ))); - } - - node_specs.push(EmbedSpec { - target_prop: prop.name.clone(), - source_prop: source_prop.clone(), - dim, - }); - } - - if !node_specs.is_empty() { - specs_by_type.insert(node.name.clone(), node_specs); - } - } - Ok(specs_by_type) -} - -fn load_embedding_cache(path: &Path) -> Result>> { - let records = load_embedding_cache_records(path)?; - let mut cache = HashMap::new(); - for record in records { - let key = cache_key_from_record(&record); - cache.insert(key, record.vector); - } - Ok(cache) -} - -fn append_embedding_cache(path: &Path, records: &[CacheRecord]) -> Result<()> { - let max_entries = parse_env_usize( - "NANOGRAPH_EMBED_CACHE_MAX_ENTRIES", - DEFAULT_EMBED_CACHE_MAX_ENTRIES, - ); - append_embedding_cache_with_limit(path, records, max_entries) -} - -fn append_embedding_cache_with_limit( - path: &Path, - records: &[CacheRecord], - max_entries: usize, -) -> Result<()> { - if records.is_empty() { - return Ok(()); - } - let _lock = acquire_embedding_cache_lock(path)?; - let mut merged = load_embedding_cache_records(path)?; - merged.extend(records.iter().cloned()); - let compacted = compact_embedding_cache_records(merged, max_entries); - write_embedding_cache_records(path, &compacted)?; - Ok(()) -} - -fn load_embedding_cache_records(path: &Path) -> Result> { - if !path.exists() { - return Ok(Vec::new()); - } - let data = std::fs::read_to_string(path)?; - parse_embedding_cache_records(path, &data) -} - -fn parse_embedding_cache_records(path: &Path, data: &str) -> Result> { - let mut records = Vec::new(); - for (line_no, line) in data.lines().enumerate() { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let record: CacheRecord = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!( - "invalid embedding cache at {} line {}: {}", - path.display(), - line_no + 1, - e - )) - })?; - if record.vector.len() != record.dim { - return Err(NanoError::Storage(format!( - "invalid embedding cache at {} line {}: vector dim {} does not match {}", - path.display(), - line_no + 1, - record.vector.len(), - record.dim - ))); - } - records.push(record); - } - Ok(records) -} - -fn write_embedding_cache_records(path: &Path, records: &[CacheRecord]) -> Result<()> { - let mut file = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(path)?; - for record in records { - let mut line = serde_json::to_vec(record).map_err(|e| { - NanoError::Storage(format!( - "failed to write embedding cache {}: {}", - path.display(), - e - )) - })?; - line.push(b'\n'); - file.write_all(&line)?; - } - file.flush()?; - Ok(()) -} - -fn compact_embedding_cache_records( - records: Vec, - max_entries: usize, -) -> Vec { - let max_entries = max_entries.max(1); - let mut seen = HashSet::new(); - let mut compacted_rev = Vec::with_capacity(records.len().min(max_entries)); - for record in records.into_iter().rev() { - if seen.insert(cache_key_from_record(&record)) { - compacted_rev.push(record); - if compacted_rev.len() == max_entries { - break; - } - } - } - compacted_rev.reverse(); - compacted_rev -} - -fn cache_key_from_record(record: &CacheRecord) -> CacheKey { - CacheKey { - model: record.model.clone(), - dim: record.dim, - content_hash: record.content_hash.clone(), - chunk_chars: record.chunk_chars, - chunk_overlap_chars: record.chunk_overlap_chars, - } -} - -struct EmbeddingCacheLock { - path: PathBuf, - _file: std::fs::File, -} - -impl Drop for EmbeddingCacheLock { - fn drop(&mut self) { - let _ = std::fs::remove_file(&self.path); - } -} - -fn embedding_cache_lock_path(path: &Path) -> PathBuf { - let mut lock_path = path.as_os_str().to_os_string(); - lock_path.push(".lock"); - PathBuf::from(lock_path) -} - -fn acquire_embedding_cache_lock(path: &Path) -> Result { - let stale_after_secs = parse_env_usize( - "NANOGRAPH_EMBED_CACHE_LOCK_STALE_SECS", - DEFAULT_EMBED_CACHE_LOCK_STALE_SECS, - ); - let stale_after = Duration::from_secs(stale_after_secs as u64); - acquire_embedding_cache_lock_with_stale_after(path, stale_after) -} - -fn acquire_embedding_cache_lock_with_stale_after( - path: &Path, - stale_after: Duration, -) -> Result { - let lock_path = embedding_cache_lock_path(path); - for attempt in 0..EMBEDDING_CACHE_LOCK_RETRIES { - match std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&lock_path) - { - Ok(file) => { - return Ok(EmbeddingCacheLock { - path: lock_path, - _file: file, - }); - } - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { - if lock_file_is_stale(&lock_path, stale_after) { - match std::fs::remove_file(&lock_path) { - Ok(()) => continue, - Err(remove_err) if remove_err.kind() == std::io::ErrorKind::NotFound => { - continue; - } - Err(remove_err) => { - return Err(NanoError::Storage(format!( - "failed to remove stale embedding cache lock {}: {}", - lock_path.display(), - remove_err - ))); - } - } - } - if attempt + 1 == EMBEDDING_CACHE_LOCK_RETRIES { - return Err(NanoError::Storage(format!( - "embedding cache lock timed out for {} (lock file: {})", - path.display(), - lock_path.display() - ))); - } - std::thread::sleep(Duration::from_millis(EMBEDDING_CACHE_LOCK_RETRY_DELAY_MS)); - } - Err(err) => { - return Err(NanoError::Storage(format!( - "failed to acquire embedding cache lock {}: {}", - lock_path.display(), - err - ))); - } - } - } - - Err(NanoError::Storage(format!( - "embedding cache lock acquisition failed for {}", - path.display() - ))) -} - -fn lock_file_is_stale(lock_path: &Path, stale_after: Duration) -> bool { - let metadata = match std::fs::metadata(lock_path) { - Ok(meta) => meta, - Err(_) => return false, - }; - let timestamp = metadata.modified().ok().or_else(|| metadata.created().ok()); - let Some(timestamp) = timestamp else { - return false; - }; - match timestamp.elapsed() { - Ok(age) => age >= stale_after, - Err(_) => false, - } -} - -fn parse_env_usize(name: &str, default: usize) -> usize { - std::env::var(name) - .ok() - .and_then(|v| v.parse::().ok()) - .filter(|v| *v > 0) - .unwrap_or(default) -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::io::Cursor; - use std::sync::{Arc, Barrier}; - - use tempfile::TempDir; - - use crate::catalog::schema_ir::build_schema_ir; - use crate::schema::parser::parse_schema; - - use super::*; - - #[tokio::test] - async fn materialize_embeddings_populates_missing_vector() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - title: String - embedding: Vector(6) @embed(title) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = r#"{"type":"Doc","data":{"slug":"a","title":"alpha"}} -{"type":"Doc","data":{"slug":"b","title":"beta"}} -"#; - let temp = TempDir::new().unwrap(); - let client = EmbeddingClient::mock_for_tests(); - let out = materialize_embeddings_for_load_inner(temp.path(), &ir, data, Some(&client)) - .await - .unwrap(); - assert!(out.contains("\"embedding\"")); - assert!(temp.path().join(EMBEDDING_CACHE_FILENAME).exists()); - } - - #[tokio::test] - async fn materialize_embeddings_is_noop_when_vectors_present() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - title: String - embedding: Vector(3) @embed(title) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = - r#"{"type":"Doc","data":{"slug":"a","title":"alpha","embedding":[1.0,0.0,0.0]}}"#; - let temp = TempDir::new().unwrap(); - let out = materialize_embeddings_for_load_inner( - temp.path(), - &ir, - data, - Some(&EmbeddingClient::mock_for_tests()), - ) - .await - .unwrap(); - assert_eq!(out, data); - assert!(!temp.path().join(EMBEDDING_CACHE_FILENAME).exists()); - } - - #[tokio::test] - async fn materialize_embeddings_to_tempfile_matches_string_path() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - title: String - embedding: Vector(6) @embed(title) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = r#"{"type":"Doc","data":{"slug":"a","title":"alpha"}} -{"type":"Doc","data":{"slug":"b","title":"beta"}} -"#; - let temp = TempDir::new().unwrap(); - let client = EmbeddingClient::mock_for_tests(); - - let string_out = - materialize_embeddings_for_load_inner(temp.path(), &ir, data, Some(&client)) - .await - .unwrap(); - let tempfile_out = materialize_embeddings_for_load_to_tempfile_inner( - temp.path(), - &ir, - Cursor::new(data.as_bytes()), - Some(&client), - ) - .await - .unwrap(); - let stream_out = std::fs::read_to_string(tempfile_out).unwrap(); - - let parse_rows = |text: &str| { - text.lines() - .filter(|line| !line.trim().is_empty()) - .map(|line| serde_json::from_str::(line).unwrap()) - .collect::>() - }; - - assert_eq!(parse_rows(&string_out), parse_rows(&stream_out)); - } - - #[test] - fn split_text_into_chunks_respects_overlap() { - let chunks = split_text_into_chunks("abcdefghij", 4, 1); - assert_eq!(chunks, vec!["abcd", "defg", "ghij"]); - } - - #[test] - fn append_embedding_cache_handles_concurrent_writers() { - let temp = TempDir::new().unwrap(); - let cache_path = temp.path().join(EMBEDDING_CACHE_FILENAME); - let writer_count = 8usize; - let barrier = Arc::new(Barrier::new(writer_count)); - let mut threads = Vec::new(); - - for idx in 0..writer_count { - let path = cache_path.clone(); - let barrier = Arc::clone(&barrier); - threads.push(std::thread::spawn(move || { - let record = CacheRecord { - model: "test-model".to_string(), - dim: 3, - content_hash: format!("hash-{}", idx), - vector: vec![idx as f32, 1.0, 2.0], - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - barrier.wait(); - append_embedding_cache(&path, &[record]).unwrap(); - })); - } - - for thread in threads { - thread.join().unwrap(); - } - - let file = std::fs::read_to_string(&cache_path).unwrap(); - let lines: Vec<&str> = file - .lines() - .filter(|line| !line.trim().is_empty()) - .collect(); - assert_eq!(lines.len(), writer_count); - - let mut seen = HashSet::new(); - for line in lines { - let record: CacheRecord = serde_json::from_str(line).unwrap(); - assert!(seen.insert(record.content_hash)); - } - } - - #[test] - fn append_embedding_cache_with_limit_compacts_and_deduplicates() { - let temp = TempDir::new().unwrap(); - let cache_path = temp.path().join(EMBEDDING_CACHE_FILENAME); - - let record = |hash: &str, marker: f32| CacheRecord { - model: "test-model".to_string(), - dim: 3, - content_hash: hash.to_string(), - vector: vec![marker, 1.0, 2.0], - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - - append_embedding_cache_with_limit( - &cache_path, - &[record("a", 1.0), record("b", 2.0), record("c", 3.0)], - 3, - ) - .unwrap(); - append_embedding_cache_with_limit(&cache_path, &[record("d", 4.0), record("b", 20.0)], 3) - .unwrap(); - - let cache = load_embedding_cache(&cache_path).unwrap(); - assert_eq!(cache.len(), 3); - - let key_b = CacheKey { - model: "test-model".to_string(), - dim: 3, - content_hash: "b".to_string(), - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - let key_c = CacheKey { - content_hash: "c".to_string(), - ..key_b.clone() - }; - let key_d = CacheKey { - content_hash: "d".to_string(), - ..key_b.clone() - }; - - assert_eq!(cache.get(&key_b).unwrap()[0], 20.0); - assert!(cache.contains_key(&key_c)); - assert!(cache.contains_key(&key_d)); - } - - #[test] - fn acquire_embedding_cache_lock_reclaims_stale_lock_file() { - let temp = TempDir::new().unwrap(); - let cache_path = temp.path().join(EMBEDDING_CACHE_FILENAME); - let lock_path = embedding_cache_lock_path(&cache_path); - - std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&lock_path) - .unwrap(); - std::thread::sleep(Duration::from_secs(2)); - - let lock = - acquire_embedding_cache_lock_with_stale_after(&cache_path, Duration::from_secs(1)) - .unwrap(); - drop(lock); - - assert!(!lock_path.exists()); - } - - #[tokio::test] - async fn materialize_embeddings_chunking_pools_chunk_vectors() { - let schema = parse_schema( - r#" -node Doc { - slug: String @key - body: String - embedding: Vector(6) @embed(body) -} -"#, - ) - .unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let data = r#"{"type":"Doc","data":{"slug":"doc-1","body":"alpha beta gamma delta epsilon zeta"}}"#; - let temp = TempDir::new().unwrap(); - let client = EmbeddingClient::mock_for_tests(); - let chunking = EmbedChunkingConfig::new(12, 3); - let out = materialize_embeddings_for_load_inner_with_chunking( - temp.path(), - &ir, - data, - Some(&client), - chunking, - ) - .await - .unwrap(); - - let embedded: serde_json::Value = serde_json::from_str(&out).unwrap(); - let values = embedded["data"]["embedding"].as_array().unwrap(); - let actual: Vec = values.iter().map(|v| v.as_f64().unwrap() as f32).collect(); - - let chunk_texts = split_text_into_chunks( - "alpha beta gamma delta epsilon zeta", - chunking.chunk_chars, - chunking.chunk_overlap_chars, - ); - let chunk_vectors = client.embed_texts(&chunk_texts, 6).await.unwrap(); - let expected = average_pool_embeddings(&chunk_vectors, 6).unwrap(); - - assert_eq!(actual.len(), expected.len()); - for (got, want) in actual.iter().zip(expected.iter()) { - assert!((got - want).abs() < 1e-6, "got={}, want={}", got, want); - } - } - - #[test] - fn cache_key_differs_by_chunking_config() { - let key_a = CacheKey { - model: "text-embedding-3-small".to_string(), - dim: 8, - content_hash: "abc".to_string(), - chunk_chars: 0, - chunk_overlap_chars: 0, - }; - let key_b = CacheKey { - chunk_chars: 256, - chunk_overlap_chars: 64, - ..key_a.clone() - }; - assert_ne!(key_a, key_b); - } -} diff --git a/crates/omnigraph/src/loader/jsonl.rs b/crates/omnigraph/src/loader/jsonl.rs deleted file mode 100644 index 8eb9617..0000000 --- a/crates/omnigraph/src/loader/jsonl.rs +++ /dev/null @@ -1,1532 +0,0 @@ -use std::collections::{BTreeMap, HashMap}; -use std::fs::{File, OpenOptions}; -use std::io::{BufRead, BufReader, BufWriter, Cursor, Write}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; - -use arrow_array::builder::{ - ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder, - Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder, - UInt32Builder, UInt64Builder, make_builder, -}; -use arrow_array::{ - Array, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int32Array, - Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array, -}; -use arrow_schema::{DataType, Field, Schema}; - -use crate::error::{NanoError, Result}; - -use super::super::graph::DatasetAccumulator; -use super::constraints::{key_value_string, node_property_field}; - -#[cfg_attr(not(test), allow(dead_code))] -/// Load JSONL-formatted data into a DatasetAccumulator. -/// Each line is either a node `{"type": "...", "data": {...}}` or edge `{"edge": "...", "from": "...", "to": "..."}`. -pub(crate) fn load_jsonl_data( - storage: &mut DatasetAccumulator, - data: &str, - key_props: &HashMap, -) -> Result<()> { - load_jsonl_data_with_name_seed(storage, data, key_props, None) -} - -#[cfg_attr(not(test), allow(dead_code))] -/// Load JSONL-formatted data into a DatasetAccumulator with an optional pre-populated -/// @key-value-to-id mapping for resolving edges that reference existing nodes. -pub(crate) fn load_jsonl_data_with_name_seed( - storage: &mut DatasetAccumulator, - data: &str, - key_props: &HashMap, - name_seed: Option<&HashMap<(String, String), u64>>, -) -> Result<()> { - let cursor = Cursor::new(data.as_bytes()); - load_jsonl_reader_with_name_seed(storage, cursor, key_props, name_seed) -} - -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn load_jsonl_reader( - storage: &mut DatasetAccumulator, - reader: R, - key_props: &HashMap, -) -> Result<()> { - load_jsonl_reader_with_name_seed(storage, reader, key_props, None) -} - -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn load_jsonl_reader_with_name_seed( - storage: &mut DatasetAccumulator, - reader: R, - key_props: &HashMap, - name_seed: Option<&HashMap<(String, String), u64>>, -) -> Result<()> { - let spool_dir = std::env::temp_dir(); - load_jsonl_reader_with_name_seed_at_path(storage, &spool_dir, reader, key_props, name_seed) -} - -pub(crate) fn load_jsonl_reader_with_name_seed_at_path( - storage: &mut DatasetAccumulator, - spool_dir: &Path, - reader: R, - key_props: &HashMap, - name_seed: Option<&HashMap<(String, String), u64>>, -) -> Result<()> { - let batch_size = parse_env_usize("NANOGRAPH_LOAD_ROW_BATCH_SIZE", 2048); - let mut spool_paths = TempSpoolPaths::default(); - let mut node_paths = HashMap::new(); - let mut node_writers = HashMap::new(); - let mut edge_paths = HashMap::new(); - let mut edge_writers = HashMap::new(); - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() || trimmed.starts_with("//") { - continue; - } - - let obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!("JSON parse error on line {}: {}", line_no + 1, e)) - })?; - - if let Some(type_name) = obj.get("type").and_then(|v| v.as_str()) { - if !storage.catalog.node_types.contains_key(type_name) { - return Err(NanoError::Storage(format!( - "unknown node type in data: {}", - type_name - ))); - } - let writer = spool_writer_for_type( - spool_dir, - "load_nodes", - type_name, - &mut node_writers, - &mut node_paths, - &mut spool_paths, - )?; - write_jsonl_line(writer, &obj)?; - } else if let Some(edge_type) = obj.get("edge").and_then(|v| v.as_str()) { - let edge_name = resolve_edge_name(storage, edge_type)?; - let writer = spool_writer_for_type( - spool_dir, - "load_edges", - &edge_name, - &mut edge_writers, - &mut edge_paths, - &mut spool_paths, - )?; - write_jsonl_line(writer, &obj)?; - } - } - - drop(node_writers); - drop(edge_writers); - - let mut key_to_id: HashMap<(String, String), u64> = name_seed.cloned().unwrap_or_default(); - - let mut node_types: Vec = node_paths.keys().cloned().collect(); - node_types.sort(); - for type_name in node_types { - let path = node_paths.get(&type_name).ok_or_else(|| { - NanoError::Storage(format!("missing node spool path for {}", type_name)) - })?; - load_spooled_nodes( - storage, - &type_name, - path, - key_props, - &mut key_to_id, - batch_size, - )?; - } - - let mut edge_names: Vec = edge_paths.keys().cloned().collect(); - edge_names.sort(); - for edge_name in edge_names { - let path = edge_paths.get(&edge_name).ok_or_else(|| { - NanoError::Storage(format!("missing edge spool path for {}", edge_name)) - })?; - load_spooled_edges(storage, &edge_name, path, key_props, &key_to_id, batch_size)?; - } - - Ok(()) -} - -#[derive(Debug)] -struct PendingNodeRow { - row_idx: usize, - data: serde_json::Map, -} - -#[derive(Debug)] -struct ResolvedEdge { - from_id: u64, - to_id: u64, - data: Option>, -} - -#[derive(Default)] -struct TempSpoolPaths { - paths: Vec, -} - -impl TempSpoolPaths { - fn push(&mut self, path: PathBuf) { - self.paths.push(path); - } -} - -impl Drop for TempSpoolPaths { - fn drop(&mut self) { - for path in &self.paths { - let _ = std::fs::remove_file(path); - } - } -} - -fn load_spooled_nodes( - storage: &mut DatasetAccumulator, - type_name: &str, - path: &Path, - key_props: &HashMap, - key_to_id: &mut HashMap<(String, String), u64>, - batch_size: usize, -) -> Result<()> { - let file = File::open(path)?; - let reader = BufReader::new(file); - let mut rows = Vec::with_capacity(batch_size); - let mut next_row_idx = 0usize; - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!( - "JSON parse error in node spool {} line {}: {}", - type_name, - line_no + 1, - e - )) - })?; - let data = obj - .get("data") - .and_then(|value| value.as_object()) - .cloned() - .ok_or_else(|| { - NanoError::Storage(format!( - "node {} is missing object field `data` in spooled load", - type_name - )) - })?; - rows.push(PendingNodeRow { - row_idx: next_row_idx, - data, - }); - next_row_idx += 1; - if rows.len() >= batch_size { - flush_node_rows(storage, type_name, &mut rows, key_props, key_to_id)?; - } - } - - if !rows.is_empty() { - flush_node_rows(storage, type_name, &mut rows, key_props, key_to_id)?; - } - - Ok(()) -} - -fn flush_node_rows( - storage: &mut DatasetAccumulator, - type_name: &str, - rows: &mut Vec, - key_props: &HashMap, - key_to_id: &mut HashMap<(String, String), u64>, -) -> Result<()> { - if rows.is_empty() { - return Ok(()); - } - - let node_type = - storage.catalog.node_types.get(type_name).ok_or_else(|| { - NanoError::Storage(format!("unknown node type in data: {}", type_name)) - })?; - let prop_fields: Vec = node_type - .arrow_schema - .fields() - .iter() - .skip(1) - .map(|field| field.as_ref().clone()) - .collect(); - let mut builders: Vec> = - vec![Vec::with_capacity(rows.len()); prop_fields.len()]; - - for row in rows.iter() { - for (idx, field) in prop_fields.iter().enumerate() { - let value = row - .data - .get(field.name()) - .cloned() - .unwrap_or(serde_json::Value::Null); - if value.is_null() && !field.is_nullable() { - return Err(NanoError::Storage(format!( - "node {}: required field '{}' missing on row {}", - type_name, - field.name(), - row.row_idx - ))); - } - if let Some(prop_type) = node_type.properties.get(field.name()) { - validate_json_value(type_name, field.name(), prop_type, &value)?; - } - builders[idx].push(value); - } - } - - let mut columns: Vec> = Vec::with_capacity(prop_fields.len()); - for (idx, field) in prop_fields.iter().enumerate() { - columns.push(json_values_to_array( - &builders[idx], - field.data_type(), - field.is_nullable(), - )?); - } - - let prop_schema = Arc::new(Schema::new(prop_fields.clone())); - let batch = RecordBatch::try_new(prop_schema, columns) - .map_err(|e| NanoError::Storage(format!("batch error: {}", e)))?; - - let key_rows: Option> = if let Some(key_prop) = key_props.get(type_name) { - let key_col_idx = prop_fields - .iter() - .position(|field| field.name() == key_prop) - .ok_or_else(|| { - NanoError::Storage(format!( - "node type {} missing @key property {}", - type_name, key_prop - )) - })?; - let key_arr = batch.column(key_col_idx).clone(); - let mut keys = Vec::with_capacity(batch.num_rows()); - for row in 0..batch.num_rows() { - keys.push(key_value_string(&key_arr, row, key_prop)?); - } - Some(keys) - } else { - None - }; - - let assigned_ids = storage.insert_nodes(type_name, batch)?; - if let Some(keys) = key_rows { - for (row, key) in keys.into_iter().enumerate() { - key_to_id.insert((type_name.to_string(), key), assigned_ids[row]); - } - } - - rows.clear(); - Ok(()) -} - -fn load_spooled_edges( - storage: &mut DatasetAccumulator, - edge_name: &str, - path: &Path, - key_props: &HashMap, - key_to_id: &HashMap<(String, String), u64>, - batch_size: usize, -) -> Result<()> { - let file = File::open(path)?; - let reader = BufReader::new(file); - let mut edges_by_pair: BTreeMap<(u64, u64), ResolvedEdge> = BTreeMap::new(); - - for (line_no, line) in reader.lines().enumerate() { - let line = line?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let obj: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| { - NanoError::Storage(format!( - "JSON parse error in edge spool {} line {}: {}", - edge_name, - line_no + 1, - e - )) - })?; - let resolved = resolve_edge_object(storage, &obj, key_props, key_to_id)?; - edges_by_pair.insert((resolved.from_id, resolved.to_id), resolved); - } - - if edges_by_pair.is_empty() { - return Ok(()); - } - - let resolved_edges: Vec<&ResolvedEdge> = edges_by_pair.values().collect(); - for chunk in resolved_edges.chunks(batch_size.max(1)) { - insert_resolved_edge_chunk(storage, edge_name, chunk)?; - } - - Ok(()) -} - -fn insert_resolved_edge_chunk( - storage: &mut DatasetAccumulator, - edge_name: &str, - edges: &[&ResolvedEdge], -) -> Result<()> { - let src_ids: Vec = edges.iter().map(|edge| edge.from_id).collect(); - let dst_ids: Vec = edges.iter().map(|edge| edge.to_id).collect(); - - let edge_seg = storage - .edge_segments - .get(edge_name) - .ok_or_else(|| NanoError::Storage(format!("no edge segment: {}", edge_name)))?; - let edge_type = - storage.catalog.edge_types.get(edge_name).ok_or_else(|| { - NanoError::Storage(format!("unknown edge type in data: {}", edge_name)) - })?; - let prop_fields: Vec = edge_seg - .schema - .fields() - .iter() - .skip(3) - .map(|field| field.as_ref().clone()) - .collect(); - - let prop_batch = if prop_fields.is_empty() { - None - } else { - let mut columns: Vec> = Vec::with_capacity(prop_fields.len()); - for field in &prop_fields { - let values: Vec = edges - .iter() - .map(|edge| { - edge.data - .as_ref() - .and_then(|data| data.get(field.name())) - .cloned() - .unwrap_or(serde_json::Value::Null) - }) - .collect(); - if let Some(prop_type) = edge_type.properties.get(field.name()) { - for value in &values { - validate_json_value(edge_name, field.name(), prop_type, value)?; - } - } - columns.push(json_values_to_array( - &values, - field.data_type(), - field.is_nullable(), - )?); - } - let schema = Arc::new(Schema::new(prop_fields)); - Some( - RecordBatch::try_new(schema, columns) - .map_err(|e| NanoError::Storage(format!("edge prop batch error: {}", e)))?, - ) - }; - - storage.insert_edges(edge_name, &src_ids, &dst_ids, prop_batch)?; - Ok(()) -} - -fn resolve_edge_object( - storage: &DatasetAccumulator, - edge_obj: &serde_json::Value, - key_props: &HashMap, - key_to_id: &HashMap<(String, String), u64>, -) -> Result { - let edge_type = edge_obj - .get("edge") - .and_then(|value| value.as_str()) - .ok_or_else(|| NanoError::Storage("edge missing type".to_string()))?; - let et = resolve_edge_type(storage, edge_type)?; - - let from_token = edge_obj - .get("from") - .and_then(|value| value.as_str()) - .ok_or_else(|| NanoError::Storage("edge missing from".to_string()))?; - let to_token = edge_obj - .get("to") - .and_then(|value| value.as_str()) - .ok_or_else(|| NanoError::Storage("edge missing to".to_string()))?; - - let from_type = et.from_type.clone(); - let to_type = et.to_type.clone(); - let edge_name = et.name.clone(); - - let (src_key_prop, dst_key_prop) = match (key_props.get(&from_type), key_props.get(&to_type)) { - (Some(src), Some(dst)) => (src, dst), - _ => { - return Err(NanoError::Storage(format!( - "edge '{}' requires @key on source type '{}' and destination type '{}'", - edge_name, from_type, to_type - ))); - } - }; - - let from_key_type = storage - .catalog - .node_types - .get(&from_type) - .and_then(|node_type| node_property_field(node_type.arrow_schema.as_ref(), src_key_prop)) - .map(|field| field.data_type().clone()) - .ok_or_else(|| { - NanoError::Storage(format!( - "missing @key field {} on source type {}", - src_key_prop, from_type - )) - })?; - let to_key_type = storage - .catalog - .node_types - .get(&to_type) - .and_then(|node_type| node_property_field(node_type.arrow_schema.as_ref(), dst_key_prop)) - .map(|field| field.data_type().clone()) - .ok_or_else(|| { - NanoError::Storage(format!( - "missing @key field {} on destination type {}", - dst_key_prop, to_type - )) - })?; - - let from_key = parse_edge_endpoint_key_token(from_token, &from_key_type).map_err(|e| { - NanoError::Storage(format!( - "invalid edge endpoint key for {}.{} from='{}': {}", - from_type, src_key_prop, from_token, e - )) - })?; - let to_key = parse_edge_endpoint_key_token(to_token, &to_key_type).map_err(|e| { - NanoError::Storage(format!( - "invalid edge endpoint key for {}.{} to='{}': {}", - to_type, dst_key_prop, to_token, e - )) - })?; - - let from_id = *key_to_id - .get(&(from_type.clone(), from_key.clone())) - .ok_or_else(|| { - NanoError::Storage(format!( - "node not found by @key: {}.{}={}", - from_type, src_key_prop, from_key - )) - })?; - let to_id = *key_to_id - .get(&(to_type.clone(), to_key.clone())) - .ok_or_else(|| { - NanoError::Storage(format!( - "node not found by @key: {}.{}={}", - to_type, dst_key_prop, to_key - )) - })?; - - Ok(ResolvedEdge { - from_id, - to_id, - data: edge_obj - .get("data") - .and_then(|value| value.as_object()) - .cloned(), - }) -} - -fn resolve_edge_name(storage: &DatasetAccumulator, edge_type: &str) -> Result { - Ok(resolve_edge_type(storage, edge_type)?.name.clone()) -} - -fn resolve_edge_type<'a>( - storage: &'a DatasetAccumulator, - edge_type: &str, -) -> Result<&'a crate::catalog::EdgeType> { - storage - .catalog - .edge_types - .get(edge_type) - .or_else(|| { - storage - .catalog - .edge_name_index - .get(edge_type) - .and_then(|name| storage.catalog.edge_types.get(name)) - }) - .ok_or_else(|| NanoError::Storage(format!("unknown edge type: {}", edge_type))) -} - -fn spool_writer_for_type<'a>( - spool_dir: &Path, - prefix: &str, - type_name: &str, - writers: &'a mut HashMap>, - paths: &mut HashMap, - spool_paths: &mut TempSpoolPaths, -) -> Result<&'a mut BufWriter> { - if !writers.contains_key(type_name) { - let path = create_temp_spool_file(spool_dir, prefix, type_name)?; - spool_paths.push(path.clone()); - let writer = BufWriter::new( - OpenOptions::new() - .create_new(false) - .write(true) - .open(&path)?, - ); - writers.insert(type_name.to_string(), writer); - paths.insert(type_name.to_string(), path); - } - writers - .get_mut(type_name) - .ok_or_else(|| NanoError::Storage(format!("failed to open spool writer for {}", type_name))) -} - -fn create_temp_spool_file(spool_dir: &Path, prefix: &str, type_name: &str) -> Result { - std::fs::create_dir_all(spool_dir)?; - let pid = std::process::id(); - let sanitized = type_name.replace(['/', '\\', ' '], "_"); - for attempt in 0..256u32 { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - let path = spool_dir.join(format!( - ".nanograph_{}_{}_{}_{}_{}.jsonl", - prefix, sanitized, pid, now, attempt - )); - match OpenOptions::new().create_new(true).write(true).open(&path) { - Ok(_) => return Ok(path), - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue, - Err(err) => return Err(err.into()), - } - } - - Err(NanoError::Storage(format!( - "failed to create temp spool file for {}", - type_name - ))) -} - -fn write_jsonl_line(writer: &mut BufWriter, value: &serde_json::Value) -> Result<()> { - serde_json::to_writer(&mut *writer, value) - .map_err(|e| NanoError::Storage(format!("serialize JSONL row failed: {}", e)))?; - writer.write_all(b"\n")?; - Ok(()) -} - -fn parse_env_usize(name: &str, default: usize) -> usize { - std::env::var(name) - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|value| *value > 0) - .unwrap_or(default) -} - -fn validate_json_value( - type_name: &str, - field_name: &str, - prop_type: &crate::types::PropType, - value: &serde_json::Value, -) -> Result<()> { - if value.is_null() { - return Ok(()); - } - if prop_type.list { - let Some(items) = value.as_array() else { - return Err(type_mismatch_error( - type_name, - field_name, - &expected_type_name(prop_type), - value, - )); - }; - let item_type = crate::types::PropType { - scalar: prop_type.scalar, - nullable: true, - list: false, - enum_values: prop_type.enum_values.clone(), - }; - for item in items { - validate_json_value(type_name, field_name, &item_type, item)?; - } - return Ok(()); - } - if let Some(enum_values) = &prop_type.enum_values { - let Some(raw) = value.as_str() else { - return Err(type_mismatch_error( - type_name, - field_name, - &expected_type_name(prop_type), - value, - )); - }; - if enum_values.iter().any(|allowed| allowed == raw) { - return Ok(()); - } - return Err(NanoError::Storage(format!( - "invalid enum value '{}' for {}.{} (expected: {})", - raw, - type_name, - field_name, - enum_values.join(", ") - ))); - } - - let valid = match prop_type.scalar { - crate::types::ScalarType::String => value.is_string(), - crate::types::ScalarType::Bool => value.is_boolean(), - crate::types::ScalarType::I32 => { - value.as_i64().and_then(|n| i32::try_from(n).ok()).is_some() - } - crate::types::ScalarType::I64 => value.as_i64().is_some(), - crate::types::ScalarType::U32 => { - value.as_u64().and_then(|n| u32::try_from(n).ok()).is_some() - } - crate::types::ScalarType::U64 => value.as_u64().is_some(), - crate::types::ScalarType::F32 => value.as_f64().is_some(), - crate::types::ScalarType::F64 => value.as_f64().is_some(), - crate::types::ScalarType::Date => parse_date32_json_value(value).is_ok(), - crate::types::ScalarType::DateTime => parse_date64_json_value(value).is_ok(), - crate::types::ScalarType::Vector(dim) => match value.as_array() { - Some(items) if items.len() == dim as usize => { - items.iter().all(|item| item.as_f64().is_some()) - } - _ => false, - }, - }; - if valid { - Ok(()) - } else { - Err(type_mismatch_error( - type_name, - field_name, - &expected_type_name(prop_type), - value, - )) - } -} - -fn expected_type_name(prop_type: &crate::types::PropType) -> String { - let base = if let Some(enum_values) = &prop_type.enum_values { - format!("enum({})", enum_values.join(", ")) - } else { - prop_type.scalar.to_string() - }; - if prop_type.list { - format!("[{}]", base) - } else { - base - } -} - -fn type_mismatch_error( - type_name: &str, - field_name: &str, - expected: &str, - value: &serde_json::Value, -) -> NanoError { - NanoError::Storage(format!( - "type mismatch for {}.{}: expected {}, got {}", - type_name, - field_name, - expected, - describe_json_value(value) - )) -} - -fn describe_json_value(value: &serde_json::Value) -> String { - match value { - serde_json::Value::Null => "Null".to_string(), - serde_json::Value::Bool(v) => format!("Bool {}", v), - serde_json::Value::Number(v) => { - if v.is_i64() || v.is_u64() { - format!("Integer {}", v) - } else { - format!("Float {}", v) - } - } - serde_json::Value::String(v) => format!("String {:?}", v), - serde_json::Value::Array(v) => format!("Array {}", serde_json::Value::Array(v.clone())), - serde_json::Value::Object(v) => { - format!("Object {}", serde_json::Value::Object(v.clone())) - } - } -} - -/// Convert JSON values to an Arrow array based on the target DataType. -pub(crate) fn json_values_to_array( - values: &[serde_json::Value], - dt: &DataType, - nullable: bool, -) -> Result> { - let arr: Arc = match dt { - DataType::Utf8 => { - let arr: StringArray = values - .iter() - .map(|v| v.as_str().map(|s| s.to_string())) - .collect(); - Arc::new(arr) - } - DataType::Int32 => { - let arr: Int32Array = values - .iter() - .map(|v| v.as_i64().map(|n| n as i32)) - .collect(); - Arc::new(arr) - } - DataType::Int64 => { - let arr: Int64Array = values.iter().map(|v| v.as_i64()).collect(); - Arc::new(arr) - } - DataType::UInt64 => { - let arr: UInt64Array = values.iter().map(|v| v.as_u64()).collect(); - Arc::new(arr) - } - DataType::Float64 => { - let arr: Float64Array = values.iter().map(|v| v.as_f64()).collect(); - Arc::new(arr) - } - DataType::Boolean => { - let arr: BooleanArray = values.iter().map(|v| v.as_bool()).collect(); - Arc::new(arr) - } - DataType::Float32 => { - let arr: Float32Array = values - .iter() - .map(|v| v.as_f64().map(|n| n as f32)) - .collect(); - Arc::new(arr) - } - DataType::UInt32 => { - let arr: UInt32Array = values - .iter() - .map(|v| v.as_u64().map(|n| n as u32)) - .collect(); - Arc::new(arr) - } - DataType::Date32 => { - let mut out = Vec::with_capacity(values.len()); - for value in values { - out.push(parse_date32_json_value(value)?); - } - Arc::new(Date32Array::from(out)) - } - DataType::Date64 => { - let mut out = Vec::with_capacity(values.len()); - for value in values { - out.push(parse_date64_json_value(value)?); - } - Arc::new(Date64Array::from(out)) - } - DataType::List(field) => { - let mut builder = ListBuilder::with_capacity( - make_builder(field.data_type(), values.len()), - values.len(), - ) - .with_field(field.clone()); - for value in values { - if value.is_null() { - builder.append(false); - continue; - } - let Some(items) = value.as_array() else { - builder.append(false); - continue; - }; - for item in items { - append_json_to_builder(builder.values(), field.data_type(), item)?; - } - builder.append(true); - } - Arc::new(builder.finish()) - } - DataType::FixedSizeList(field, dim) => { - if *dim <= 0 { - return Err(NanoError::Storage(format!( - "invalid FixedSizeList dimension: {}", - dim - ))); - } - if field.data_type() != &DataType::Float32 { - return Err(NanoError::Storage(format!( - "unsupported FixedSizeList element type {:?}; expected Float32", - field.data_type() - ))); - } - - let list_len = *dim as usize; - let mut builder = FixedSizeListBuilder::with_capacity( - Float32Builder::with_capacity(values.len() * list_len), - *dim, - values.len(), - ) - .with_field(field.clone()); - - for value in values { - if value.is_null() { - for _ in 0..list_len { - builder.values().append_null(); - } - builder.append(false); - continue; - } - let items = value.as_array().ok_or_else(|| { - NanoError::Storage(format!( - "expected JSON array for FixedSizeList, got {}", - dim, value - )) - })?; - if items.len() != list_len { - return Err(NanoError::Storage(format!( - "FixedSizeList length mismatch: got {}", - dim, - items.len() - ))); - } - - for item in items { - let num = item.as_f64().ok_or_else(|| { - NanoError::Storage(format!( - "expected numeric vector element in FixedSizeList, got {}", - dim, item - )) - })?; - builder.values().append_value(num as f32); - } - builder.append(true); - } - Arc::new(builder.finish()) - } - _ => { - // Fallback to string - let arr: StringArray = values.iter().map(|v| Some(v.to_string())).collect(); - Arc::new(arr) - } - }; - if !nullable && arr.null_count() > 0 { - return Err(NanoError::Storage(format!( - "field has {} null value(s) from type mismatch (expected {:?})", - arr.null_count(), - dt - ))); - } - Ok(arr) -} - -fn append_json_to_builder( - builder: &mut Box, - dt: &DataType, - value: &serde_json::Value, -) -> Result<()> { - match dt { - DataType::Utf8 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Utf8 builder downcast failed".to_string()) - })?; - if let Some(s) = value.as_str() { - b.append_value(s); - } else { - b.append_null(); - } - } - DataType::Boolean => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Boolean builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_bool() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Int32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Int32 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_i64() { - if let Ok(n) = i32::try_from(v) { - b.append_value(n); - } else { - b.append_null(); - } - } else { - b.append_null(); - } - } - DataType::Int64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Int64 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_i64() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::UInt32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list UInt32 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_u64() { - if let Ok(n) = u32::try_from(v) { - b.append_value(n); - } else { - b.append_null(); - } - } else { - b.append_null(); - } - } - DataType::UInt64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list UInt64 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_u64() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Float32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Float32 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_f64() { - b.append_value(v as f32); - } else { - b.append_null(); - } - } - DataType::Float64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Float64 builder downcast failed".to_string()) - })?; - if let Some(v) = value.as_f64() { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Date32 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Date32 builder downcast failed".to_string()) - })?; - if let Some(v) = parse_date32_json_value(value)? { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::Date64 => { - let b = builder - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - NanoError::Storage("list Date64 builder downcast failed".to_string()) - })?; - if let Some(v) = parse_date64_json_value(value)? { - b.append_value(v); - } else { - b.append_null(); - } - } - DataType::List(field) => { - let b = builder - .as_any_mut() - .downcast_mut::>>() - .ok_or_else(|| { - NanoError::Storage("nested list builder downcast failed".to_string()) - })?; - if value.is_null() { - b.append(false); - } else if let Some(items) = value.as_array() { - for item in items { - append_json_to_builder(b.values(), field.data_type(), item)?; - } - b.append(true); - } else { - b.append(false); - } - } - other => { - return Err(NanoError::Storage(format!( - "unsupported list element data type {:?}", - other - ))); - } - } - - Ok(()) -} - -fn parse_date32_json_value(value: &serde_json::Value) -> Result> { - if value.is_null() { - return Ok(None); - } - if let Some(days) = value.as_i64() { - return i32::try_from(days) - .map(Some) - .map_err(|_| NanoError::Storage(format!("Date32 value out of range: {}", days))); - } - if let Some(days) = value.as_u64() { - return i32::try_from(days) - .map(Some) - .map_err(|_| NanoError::Storage(format!("Date32 value out of range: {}", days))); - } - if let Some(s) = value.as_str() { - return Ok(Some(parse_date32_literal(s)?)); - } - Ok(None) -} - -fn parse_date64_json_value(value: &serde_json::Value) -> Result> { - if value.is_null() { - return Ok(None); - } - if let Some(ms) = value.as_i64() { - return Ok(Some(ms)); - } - if let Some(ms) = value.as_u64() { - return i64::try_from(ms) - .map(Some) - .map_err(|_| NanoError::Storage(format!("Date64 value out of range: {}", ms))); - } - if let Some(s) = value.as_str() { - return Ok(Some(parse_date64_literal(s)?)); - } - Ok(None) -} - -fn parse_edge_endpoint_key_token(token: &str, dt: &DataType) -> Result { - match dt { - DataType::Utf8 => Ok(token.to_string()), - DataType::Boolean => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected bool token: {}", e))), - DataType::Int32 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Int32 token: {}", e))), - DataType::Int64 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Int64 token: {}", e))), - DataType::UInt32 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected UInt32 token: {}", e))), - DataType::UInt64 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected UInt64 token: {}", e))), - DataType::Float32 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Float32 token: {}", e))), - DataType::Float64 => token - .parse::() - .map(|v| v.to_string()) - .map_err(|e| NanoError::Storage(format!("expected Float64 token: {}", e))), - DataType::Date32 => parse_date32_literal(token).map(|v| v.to_string()), - DataType::Date64 => parse_date64_literal(token).map(|v| v.to_string()), - other => Err(NanoError::Storage(format!( - "unsupported @key type for edge endpoint resolution: {:?}", - other - ))), - } -} - -pub(crate) fn parse_date32_literal(s: &str) -> Result { - let raw: Arc = Arc::new(StringArray::from(vec![Some(s)])); - let casted = arrow_cast::cast(raw.as_ref(), &DataType::Date32) - .map_err(|e| NanoError::Storage(format!("invalid Date literal '{}': {}", s, e)))?; - let out = casted - .as_any() - .downcast_ref::() - .ok_or_else(|| NanoError::Storage("Date32 cast produced unexpected array".to_string()))?; - if out.is_null(0) { - return Err(NanoError::Storage(format!("invalid Date literal '{}'", s))); - } - Ok(out.value(0)) -} - -pub(crate) fn parse_date64_literal(s: &str) -> Result { - let raw: Arc = Arc::new(StringArray::from(vec![Some(s)])); - let casted = arrow_cast::cast(raw.as_ref(), &DataType::Date64) - .map_err(|e| NanoError::Storage(format!("invalid DateTime literal '{}': {}", s, e)))?; - let out = casted - .as_any() - .downcast_ref::() - .ok_or_else(|| NanoError::Storage("Date64 cast produced unexpected array".to_string()))?; - if out.is_null(0) { - return Err(NanoError::Storage(format!( - "invalid DateTime literal '{}'", - s - ))); - } - Ok(out.value(0)) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::io::Cursor; - - use serde_json::json; - - use crate::catalog::schema_ir::{build_catalog_from_ir, build_schema_ir}; - use crate::schema::parser::parse_schema; - - use super::*; - - fn test_schema() -> &'static str { - r#"node Person { - name: String @key -} -edge Knows: Person -> Person -"# - } - - fn build_storage(schema_src: &str) -> DatasetAccumulator { - let schema = parse_schema(schema_src).unwrap(); - let ir = build_schema_ir(&schema).unwrap(); - let catalog = build_catalog_from_ir(&ir).unwrap(); - DatasetAccumulator::new(catalog) - } - - fn person_key_props() -> HashMap { - HashMap::from([("Person".to_string(), "name".to_string())]) - } - - fn person_id_by_name(storage: &DatasetAccumulator, name: &str) -> u64 { - let batch = storage.get_all_nodes("Person").unwrap().unwrap(); - let id_col = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let name_col = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - (0..batch.num_rows()) - .find(|&i| name_col.value(i) == name) - .map(|i| id_col.value(i)) - .unwrap() - } - - #[test] - fn json_values_to_array_rejects_non_nullable_mismatch() { - let values = vec![json!("abc"), json!(42)]; - let err = json_values_to_array(&values, &DataType::Int32, false).unwrap_err(); - assert!( - err.to_string().contains("null value"), - "unexpected error: {err}" - ); - } - - #[test] - fn json_values_to_array_accepts_iso_date_strings() { - let values = vec![json!("2026-02-14"), json!(null)]; - let arr = json_values_to_array(&values, &DataType::Date32, true).unwrap(); - let arr = arr.as_any().downcast_ref::().unwrap(); - assert!(!arr.is_null(0)); - assert!(arr.is_null(1)); - } - - #[test] - fn json_values_to_array_accepts_iso_datetime_strings() { - let values = vec![json!("2026-02-14T10:00:00Z"), json!(null)]; - let arr = json_values_to_array(&values, &DataType::Date64, true).unwrap(); - let arr = arr.as_any().downcast_ref::().unwrap(); - assert!(!arr.is_null(0)); - assert!(arr.is_null(1)); - } - - #[test] - fn json_values_to_array_builds_list_values() { - let values = vec![json!([1, 2]), json!(null), json!([3])]; - let dt = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); - let arr = json_values_to_array(&values, &dt, true).unwrap(); - let list = arr - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(list.len(), 3); - assert!(!list.is_null(0)); - assert!(list.is_null(1)); - assert!(!list.is_null(2)); - - let first = list.value(0); - let first = first.as_any().downcast_ref::().unwrap(); - assert_eq!(first.len(), 2); - assert_eq!(first.value(0), 1); - assert_eq!(first.value(1), 2); - } - - #[test] - fn json_values_to_array_builds_fixed_size_list_vectors() { - let values = vec![json!([0.1, 0.2, 0.3]), json!(null), json!([1, 2, 3])]; - let dt = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 3); - let arr = json_values_to_array(&values, &dt, true).unwrap(); - let vecs = arr - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(vecs.len(), 3); - assert!(!vecs.is_null(0)); - assert!(vecs.is_null(1)); - assert!(!vecs.is_null(2)); - } - - #[test] - fn json_values_to_array_rejects_fixed_size_list_length_mismatch() { - let values = vec![json!([0.1, 0.2])]; - let dt = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 3); - let err = json_values_to_array(&values, &dt, true).unwrap_err(); - assert!(err.to_string().contains("length mismatch")); - } - - #[test] - fn load_jsonl_with_name_seed_resolves_edges_to_existing_nodes() { - let mut existing = build_storage(test_schema()); - load_jsonl_data( - &mut existing, - r#"{"type":"Person","data":{"name":"Alice"}}"#, - &person_key_props(), - ) - .unwrap(); - let alice_id = person_id_by_name(&existing, "Alice"); - - let data = r#"{"type":"Person","data":{"name":"Bob"}} -{"edge":"Knows","from":"Alice","to":"Bob"}"#; - - let mut no_seed = build_storage(test_schema()); - let err = load_jsonl_data(&mut no_seed, data, &person_key_props()).unwrap_err(); - assert!( - err.to_string().contains("node not found by @key"), - "unexpected error: {err}" - ); - - let mut seeded = build_storage(test_schema()); - let mut seed = HashMap::new(); - seed.insert(("Person".to_string(), "Alice".to_string()), alice_id); - load_jsonl_data_with_name_seed(&mut seeded, data, &person_key_props(), Some(&seed)) - .unwrap(); - - let bob_id = person_id_by_name(&seeded, "Bob"); - let knows = &seeded.edge_segments["Knows"]; - assert_eq!(knows.edge_ids.len(), 1); - assert_eq!(knows.src_ids[0], alice_id); - assert_eq!(knows.dst_ids[0], bob_id); - } - - #[test] - fn load_jsonl_reader_handles_forward_reference_edges() { - let mut storage = build_storage(test_schema()); - let data = r#"{"edge":"Knows","from":"Alice","to":"Bob"} -{"type":"Person","data":{"name":"Alice"}} -{"type":"Person","data":{"name":"Bob"}}"#; - - load_jsonl_reader( - &mut storage, - Cursor::new(data.as_bytes()), - &person_key_props(), - ) - .unwrap(); - - let knows = &storage.edge_segments["Knows"]; - assert_eq!(knows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_deduplicates_duplicate_edges() { - let mut storage = build_storage(test_schema()); - let data = r#"{"type":"Person","data":{"name":"Alice"}} -{"type":"Person","data":{"name":"Bob"}} -{"edge":"Knows","from":"Alice","to":"Bob"} -{"edge":"Knows","from":"Alice","to":"Bob"}"#; - - load_jsonl_data(&mut storage, data, &person_key_props()).unwrap(); - let knows = &storage.edge_segments["Knows"]; - assert_eq!(knows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_edges_require_endpoint_key_annotations() { - let schema = r#"node Event { - title: String - at: Date -} -edge Precedes: Event -> Event -"#; - let mut storage = build_storage(schema); - let data = r#"{"type":"Event","data":{"title":"Kickoff","at":"2026-02-14"}} -{"type":"Event","data":{"title":"Wrap","at":"2026-02-15"}} -{"edge":"Precedes","from":"Kickoff","to":"Wrap"}"#; - - let err = load_jsonl_data(&mut storage, data, &HashMap::new()).unwrap_err(); - assert!( - err.to_string() - .contains("requires @key on source type 'Event' and destination type 'Event'"), - "unexpected error: {err}" - ); - } - - #[test] - fn load_jsonl_edges_resolve_by_non_name_key() { - let schema = r#"node User { - uid: String @key - display_name: String -} -edge Follows: User -> User -"#; - let mut storage = build_storage(schema); - let key_props = HashMap::from([("User".to_string(), "uid".to_string())]); - let data = r#"{"type":"User","data":{"uid":"usr_01","display_name":"Alice"}} -{"type":"User","data":{"uid":"usr_02","display_name":"Bob"}} -{"edge":"Follows","from":"usr_01","to":"usr_02"}"#; - - load_jsonl_data(&mut storage, data, &key_props).unwrap(); - let follows = &storage.edge_segments["Follows"]; - assert_eq!(follows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_edges_resolve_by_user_property_named_id() { - let schema = r#"node User { - id: String @key - display_name: String -} -edge Follows: User -> User -"#; - let mut storage = build_storage(schema); - let key_props = HashMap::from([("User".to_string(), "id".to_string())]); - let data = r#"{"type":"User","data":{"id":"usr_01","display_name":"Alice"}} -{"type":"User","data":{"id":"usr_02","display_name":"Bob"}} -{"edge":"Follows","from":"usr_01","to":"usr_02"}"#; - - load_jsonl_data(&mut storage, data, &key_props).unwrap(); - - let users = storage.get_all_nodes("User").unwrap().unwrap(); - let user_ids = users - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(user_ids.value(0), "usr_01"); - assert_eq!(user_ids.value(1), "usr_02"); - - let follows = &storage.edge_segments["Follows"]; - assert_eq!(follows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_edges_parse_non_string_key_tokens() { - let schema = r#"node User { - uid: U64 @key - display_name: String -} -edge Follows: User -> User -"#; - let mut storage = build_storage(schema); - let key_props = HashMap::from([("User".to_string(), "uid".to_string())]); - let data = r#"{"type":"User","data":{"uid":1,"display_name":"Alice"}} -{"type":"User","data":{"uid":2,"display_name":"Bob"}} -{"edge":"Follows","from":"1","to":"2"}"#; - - load_jsonl_data(&mut storage, data, &key_props).unwrap(); - let follows = &storage.edge_segments["Follows"]; - assert_eq!(follows.edge_ids.len(), 1); - } - - #[test] - fn load_jsonl_rejects_invalid_node_enum_values() { - let schema = r#"node Person { - name: String @key - role: enum(admin, member, guest) -}"#; - let mut storage = build_storage(schema); - let err = load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Bad","role":"superadmin"}}"#, - &HashMap::from([("Person".to_string(), "name".to_string())]), - ) - .unwrap_err(); - assert_eq!( - err.to_string(), - "storage error: invalid enum value 'superadmin' for Person.role (expected: admin, guest, member)" - ); - } - - #[test] - fn load_jsonl_rejects_invalid_edge_enum_values() { - let schema = r#"node Person { - name: String @key -} -edge WorksWith: Person -> Person { - role: enum(lead, contributor) -}"#; - let mut storage = build_storage(schema); - let data = r#"{"type":"Person","data":{"name":"Alice"}} -{"type":"Person","data":{"name":"Bob"}} -{"edge":"WorksWith","from":"Alice","to":"Bob","data":{"role":"manager"}}"#; - let err = load_jsonl_data( - &mut storage, - data, - &HashMap::from([("Person".to_string(), "name".to_string())]), - ) - .unwrap_err(); - assert_eq!( - err.to_string(), - "storage error: invalid enum value 'manager' for WorksWith.role (expected: contributor, lead)" - ); - } - - #[test] - fn load_jsonl_rejects_wrong_type_for_nullable_node_field() { - let schema = r#"node Person { - name: String @key - age: I32? -}"#; - let mut storage = build_storage(schema); - let err = load_jsonl_data( - &mut storage, - r#"{"type":"Person","data":{"name":"Bad","age":"not-a-number"}}"#, - &HashMap::from([("Person".to_string(), "name".to_string())]), - ) - .unwrap_err(); - assert_eq!( - err.to_string(), - r#"storage error: type mismatch for Person.age: expected I32, got String "not-a-number""# - ); - } -}