mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-15 01:55:13 +02:00
352 lines
12 KiB
Rust
352 lines
12 KiB
Rust
use arrow_array::{
|
|
Array, ArrayRef, BooleanArray, Date32Array, Date64Array, FixedSizeListArray, Float32Array,
|
|
Float64Array, Int32Array, Int64Array, ListArray, RecordBatch, StringArray, StructArray,
|
|
UInt32Array, UInt64Array,
|
|
};
|
|
use arrow_schema::DataType;
|
|
|
|
pub const JS_MAX_SAFE_INTEGER_I64: i64 = 9_007_199_254_740_991;
|
|
pub const JS_MAX_SAFE_INTEGER_U64: u64 = 9_007_199_254_740_991;
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
enum JsonIntegerMode {
|
|
JavaScript,
|
|
Native,
|
|
}
|
|
|
|
pub fn is_js_safe_integer_i64(value: i64) -> bool {
|
|
(-JS_MAX_SAFE_INTEGER_I64..=JS_MAX_SAFE_INTEGER_I64).contains(&value)
|
|
}
|
|
|
|
/// Convert Arrow RecordBatches into a Vec of JSON objects (one per row).
|
|
pub fn record_batches_to_json_rows(results: &[RecordBatch]) -> Vec<serde_json::Value> {
|
|
record_batches_to_json_rows_with_mode(results, JsonIntegerMode::JavaScript)
|
|
}
|
|
|
|
/// Convert Arrow RecordBatches into JSON rows without JS-safe integer coercion.
|
|
pub fn record_batches_to_rust_json_rows(results: &[RecordBatch]) -> Vec<serde_json::Value> {
|
|
record_batches_to_json_rows_with_mode(results, JsonIntegerMode::Native)
|
|
}
|
|
|
|
fn record_batches_to_json_rows_with_mode(
|
|
results: &[RecordBatch],
|
|
integer_mode: JsonIntegerMode,
|
|
) -> Vec<serde_json::Value> {
|
|
let total_rows = results.iter().map(RecordBatch::num_rows).sum();
|
|
let mut out = Vec::with_capacity(total_rows);
|
|
for batch in results {
|
|
let schema = batch.schema();
|
|
for row in 0..batch.num_rows() {
|
|
let mut map = serde_json::Map::new();
|
|
for (col_idx, field) in schema.fields().iter().enumerate() {
|
|
let col_arr = batch.column(col_idx);
|
|
map.insert(
|
|
field.name().clone(),
|
|
array_value_to_json_with_mode(col_arr, row, integer_mode),
|
|
);
|
|
}
|
|
out.push(serde_json::Value::Object(map));
|
|
}
|
|
}
|
|
out
|
|
}
|
|
|
|
/// Convert a single cell from an Arrow array to a serde_json::Value.
|
|
pub fn array_value_to_json(array: &ArrayRef, row: usize) -> serde_json::Value {
|
|
array_value_to_json_with_mode(array, row, JsonIntegerMode::JavaScript)
|
|
}
|
|
|
|
fn array_value_to_json_with_mode(
|
|
array: &ArrayRef,
|
|
row: usize,
|
|
integer_mode: JsonIntegerMode,
|
|
) -> serde_json::Value {
|
|
if array.is_null(row) {
|
|
return serde_json::Value::Null;
|
|
}
|
|
|
|
match array.data_type() {
|
|
DataType::Utf8 => array
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.map(|a| serde_json::Value::String(a.value(row).to_string()))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Boolean => array
|
|
.as_any()
|
|
.downcast_ref::<BooleanArray>()
|
|
.map(|a| serde_json::Value::Bool(a.value(row)))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Int32 => array
|
|
.as_any()
|
|
.downcast_ref::<Int32Array>()
|
|
.map(|a| serde_json::Value::Number((a.value(row) as i64).into()))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Int64 => array
|
|
.as_any()
|
|
.downcast_ref::<Int64Array>()
|
|
.map(|a| {
|
|
let value = a.value(row);
|
|
match integer_mode {
|
|
JsonIntegerMode::JavaScript if !is_js_safe_integer_i64(value) => {
|
|
serde_json::Value::String(value.to_string())
|
|
}
|
|
JsonIntegerMode::JavaScript | JsonIntegerMode::Native => {
|
|
serde_json::Value::Number(value.into())
|
|
}
|
|
}
|
|
})
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::UInt32 => array
|
|
.as_any()
|
|
.downcast_ref::<UInt32Array>()
|
|
.map(|a| serde_json::Value::Number((a.value(row) as u64).into()))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::UInt64 => array
|
|
.as_any()
|
|
.downcast_ref::<UInt64Array>()
|
|
.map(|a| {
|
|
let value = a.value(row);
|
|
match integer_mode {
|
|
JsonIntegerMode::JavaScript if value > JS_MAX_SAFE_INTEGER_U64 => {
|
|
serde_json::Value::String(value.to_string())
|
|
}
|
|
JsonIntegerMode::JavaScript | JsonIntegerMode::Native => {
|
|
serde_json::Value::Number(value.into())
|
|
}
|
|
}
|
|
})
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Float32 => array
|
|
.as_any()
|
|
.downcast_ref::<Float32Array>()
|
|
.map(|a| json_float_value(a.value(row) as f64))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Float64 => array
|
|
.as_any()
|
|
.downcast_ref::<Float64Array>()
|
|
.map(|a| json_float_value(a.value(row)))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Date32 => array
|
|
.as_any()
|
|
.downcast_ref::<Date32Array>()
|
|
.map(|a| {
|
|
let days = a.value(row);
|
|
arrow_array::temporal_conversions::date32_to_datetime(days)
|
|
.map(|dt| serde_json::Value::String(dt.format("%Y-%m-%d").to_string()))
|
|
.unwrap_or_else(|| serde_json::Value::Number((days as i64).into()))
|
|
})
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Date64 => array
|
|
.as_any()
|
|
.downcast_ref::<Date64Array>()
|
|
.map(|a| {
|
|
let ms = a.value(row);
|
|
arrow_array::temporal_conversions::date64_to_datetime(ms)
|
|
.map(|dt| {
|
|
serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string())
|
|
})
|
|
.unwrap_or_else(|| serde_json::Value::Number(ms.into()))
|
|
})
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::List(_) => array
|
|
.as_any()
|
|
.downcast_ref::<ListArray>()
|
|
.map(|a| {
|
|
let values = a.value(row);
|
|
serde_json::Value::Array(
|
|
(0..values.len())
|
|
.map(|idx| array_value_to_json_with_mode(&values, idx, integer_mode))
|
|
.collect(),
|
|
)
|
|
})
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::FixedSizeList(_, _) => array
|
|
.as_any()
|
|
.downcast_ref::<FixedSizeListArray>()
|
|
.map(|a| fixed_size_list_value_to_json(a, row, integer_mode))
|
|
.unwrap_or(serde_json::Value::Null),
|
|
DataType::Struct(_) => array
|
|
.as_any()
|
|
.downcast_ref::<StructArray>()
|
|
.map(|struct_arr| {
|
|
let mut obj = serde_json::Map::new();
|
|
for (i, field) in struct_arr.fields().iter().enumerate() {
|
|
let col = struct_arr.column(i);
|
|
obj.insert(
|
|
field.name().clone(),
|
|
array_value_to_json_with_mode(col, row, integer_mode),
|
|
);
|
|
}
|
|
serde_json::Value::Object(obj)
|
|
})
|
|
.unwrap_or(serde_json::Value::Null),
|
|
_ => {
|
|
let display =
|
|
arrow_cast::display::array_value_to_string(array, row).unwrap_or_default();
|
|
serde_json::Value::String(display)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn json_float_value(value: f64) -> serde_json::Value {
|
|
if value.is_nan() {
|
|
return serde_json::Value::String("NaN".to_string());
|
|
}
|
|
if value == f64::INFINITY {
|
|
return serde_json::Value::String("Infinity".to_string());
|
|
}
|
|
if value == f64::NEG_INFINITY {
|
|
return serde_json::Value::String("-Infinity".to_string());
|
|
}
|
|
|
|
serde_json::Number::from_f64(value)
|
|
.map(serde_json::Value::Number)
|
|
.unwrap_or(serde_json::Value::Null)
|
|
}
|
|
|
|
fn fixed_size_list_value_to_json(
|
|
array: &FixedSizeListArray,
|
|
row: usize,
|
|
integer_mode: JsonIntegerMode,
|
|
) -> serde_json::Value {
|
|
let value_len = array.value_length() as usize;
|
|
let values = array.values();
|
|
if let Some(float_values) = values.as_any().downcast_ref::<Float32Array>() {
|
|
let start = row.saturating_mul(value_len);
|
|
return float32_json_array(float_values, start, value_len);
|
|
}
|
|
|
|
let values = array.value(row);
|
|
serde_json::Value::Array(
|
|
(0..values.len())
|
|
.map(|idx| array_value_to_json_with_mode(&values, idx, integer_mode))
|
|
.collect(),
|
|
)
|
|
}
|
|
|
|
fn float32_json_array(values: &Float32Array, start: usize, len: usize) -> serde_json::Value {
|
|
let mut out = Vec::with_capacity(len);
|
|
let end = start.saturating_add(len).min(values.len());
|
|
for idx in start..end {
|
|
if values.is_null(idx) {
|
|
out.push(serde_json::Value::Null);
|
|
continue;
|
|
}
|
|
let value = values.value(idx) as f64;
|
|
out.push(
|
|
serde_json::Number::from_f64(value)
|
|
.map(serde_json::Value::Number)
|
|
.unwrap_or(serde_json::Value::Null),
|
|
);
|
|
}
|
|
serde_json::Value::Array(out)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::{array_value_to_json, record_batches_to_rust_json_rows};
|
|
use std::sync::Arc;
|
|
|
|
use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
|
|
use arrow_array::{ArrayRef, Float64Array, Int64Array, RecordBatch, UInt64Array};
|
|
use arrow_schema::{DataType, Field, Schema};
|
|
|
|
#[test]
|
|
fn int64_outside_js_safe_range_is_stringified() {
|
|
let values: ArrayRef = Arc::new(Int64Array::from(vec![Some(9_007_199_254_740_992)]));
|
|
assert_eq!(
|
|
array_value_to_json(&values, 0),
|
|
serde_json::Value::String("9007199254740992".to_string())
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn uint64_outside_js_safe_range_is_stringified() {
|
|
let values: ArrayRef = Arc::new(UInt64Array::from(vec![Some(9_007_199_254_740_992)]));
|
|
assert_eq!(
|
|
array_value_to_json(&values, 0),
|
|
serde_json::Value::String("9007199254740992".to_string())
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn uint64_within_js_safe_range_stays_numeric() {
|
|
let values: ArrayRef = Arc::new(UInt64Array::from(vec![Some(9_007_199_254_740_991)]));
|
|
assert_eq!(
|
|
array_value_to_json(&values, 0),
|
|
serde_json::json!(9_007_199_254_740_991u64)
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn rust_json_rows_preserve_full_width_integers() {
|
|
let schema = Arc::new(Schema::new(vec![
|
|
Field::new("signed", DataType::Int64, false),
|
|
Field::new("unsigned", DataType::UInt64, false),
|
|
]));
|
|
let batch = RecordBatch::try_new(
|
|
schema,
|
|
vec![
|
|
Arc::new(Int64Array::from(vec![i64::MIN])),
|
|
Arc::new(UInt64Array::from(vec![u64::MAX])),
|
|
],
|
|
)
|
|
.expect("batch");
|
|
|
|
assert_eq!(
|
|
record_batches_to_rust_json_rows(&[batch]),
|
|
vec![serde_json::json!({
|
|
"signed": i64::MIN,
|
|
"unsigned": u64::MAX,
|
|
})]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn fixed_size_float32_vectors_serialize_without_recursive_dispatch() {
|
|
let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), 3);
|
|
builder.values().append_value(0.25);
|
|
builder.values().append_value(0.5);
|
|
builder.values().append_value(0.75);
|
|
builder.append(true);
|
|
|
|
for _ in 0..3 {
|
|
builder.values().append_null();
|
|
}
|
|
builder.append(false);
|
|
|
|
builder.values().append_value(1.0);
|
|
builder.values().append_value(2.0);
|
|
builder.values().append_value(3.0);
|
|
builder.append(true);
|
|
|
|
let values: ArrayRef = Arc::new(builder.finish());
|
|
assert_eq!(
|
|
array_value_to_json(&values, 0),
|
|
serde_json::json!([0.25, 0.5, 0.75])
|
|
);
|
|
assert_eq!(array_value_to_json(&values, 1), serde_json::Value::Null);
|
|
assert_eq!(
|
|
array_value_to_json(&values, 2),
|
|
serde_json::json!([1.0, 2.0, 3.0])
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn non_finite_floats_are_stringified() {
|
|
let values: ArrayRef = Arc::new(Float64Array::from(vec![
|
|
Some(f64::NAN),
|
|
Some(f64::INFINITY),
|
|
Some(f64::NEG_INFINITY),
|
|
]));
|
|
assert_eq!(array_value_to_json(&values, 0), serde_json::json!("NaN"));
|
|
assert_eq!(
|
|
array_value_to_json(&values, 1),
|
|
serde_json::json!("Infinity")
|
|
);
|
|
assert_eq!(
|
|
array_value_to_json(&values, 2),
|
|
serde_json::json!("-Infinity")
|
|
);
|
|
}
|
|
}
|