1#![doc(
33 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
34 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
35)]
36#![cfg_attr(docsrs, feature(doc_cfg))]
37#![warn(missing_docs)]
38use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
39use hex::decode;
40use num_bigint::BigInt;
41use num_traits::Signed;
42use serde::{Deserialize, Serialize};
43use serde_json::{Map as SJMap, Value};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use arrow::array::*;
48use arrow::buffer::{Buffer, MutableBuffer};
49use arrow::datatypes::*;
50use arrow::error::{ArrowError, Result};
51use arrow::util::bit_util;
52
53mod datatype;
54mod field;
55mod schema;
56
57pub use datatype::*;
58pub use field::*;
59pub use schema::*;
60
61#[derive(Deserialize, Serialize, Debug)]
65pub struct ArrowJson {
66 pub schema: ArrowJsonSchema,
68 pub batches: Vec<ArrowJsonBatch>,
70 #[serde(skip_serializing_if = "Option::is_none")]
72 pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
73}
74
75#[derive(Deserialize, Serialize, Debug)]
79pub struct ArrowJsonSchema {
80 pub fields: Vec<ArrowJsonField>,
82 #[serde(skip_serializing_if = "Option::is_none")]
84 pub metadata: Option<Vec<HashMap<String, String>>>,
85}
86
87#[derive(Deserialize, Serialize, Debug)]
89pub struct ArrowJsonField {
90 pub name: String,
92 #[serde(rename = "type")]
95 pub field_type: Value,
96 pub nullable: bool,
98 pub children: Vec<ArrowJsonField>,
100 #[serde(skip_serializing_if = "Option::is_none")]
102 pub dictionary: Option<ArrowJsonFieldDictionary>,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub metadata: Option<Value>,
106}
107
108impl From<&FieldRef> for ArrowJsonField {
109 fn from(value: &FieldRef) -> Self {
110 Self::from(value.as_ref())
111 }
112}
113
114impl From<&Field> for ArrowJsonField {
115 fn from(field: &Field) -> Self {
116 let metadata_value = match field.metadata().is_empty() {
117 false => {
118 let mut array = Vec::new();
119 for (k, v) in field.metadata() {
120 let mut kv_map = SJMap::new();
121 kv_map.insert(k.clone(), Value::String(v.clone()));
122 array.push(Value::Object(kv_map));
123 }
124 if !array.is_empty() {
125 Some(Value::Array(array))
126 } else {
127 None
128 }
129 }
130 _ => None,
131 };
132
133 Self {
134 name: field.name().to_string(),
135 field_type: data_type_to_json(field.data_type()),
136 nullable: field.is_nullable(),
137 children: vec![],
138 dictionary: None, metadata: metadata_value,
140 }
141 }
142}
143
144#[derive(Deserialize, Serialize, Debug)]
146pub struct ArrowJsonFieldDictionary {
147 pub id: i64,
149 #[serde(rename = "indexType")]
151 pub index_type: DictionaryIndexType,
152 #[serde(rename = "isOrdered")]
154 pub is_ordered: bool,
155}
156
157#[derive(Deserialize, Serialize, Debug)]
159pub struct DictionaryIndexType {
160 pub name: String,
162 #[serde(rename = "isSigned")]
164 pub is_signed: bool,
165 #[serde(rename = "bitWidth")]
167 pub bit_width: i64,
168}
169
170#[derive(Deserialize, Serialize, Debug, Clone)]
172pub struct ArrowJsonBatch {
173 count: usize,
174 pub columns: Vec<ArrowJsonColumn>,
176}
177
178#[derive(Deserialize, Serialize, Debug, Clone)]
180#[allow(non_snake_case)]
181pub struct ArrowJsonDictionaryBatch {
182 pub id: i64,
184 pub data: ArrowJsonBatch,
186}
187
188#[derive(Deserialize, Serialize, Clone, Debug)]
190pub struct ArrowJsonColumn {
191 name: String,
192 pub count: usize,
194 #[serde(rename = "VALIDITY")]
196 pub validity: Option<Vec<u8>>,
197 #[serde(rename = "DATA")]
199 pub data: Option<Vec<Value>>,
200 #[serde(rename = "OFFSET")]
202 pub offset: Option<Vec<Value>>, #[serde(rename = "TYPE_ID")]
205 pub type_id: Option<Vec<i8>>,
206 pub children: Option<Vec<ArrowJsonColumn>>,
208}
209
210impl ArrowJson {
211 pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
213 if !self.schema.equals_schema(&reader.schema()) {
214 return Ok(false);
215 }
216
217 for json_batch in self.get_record_batches()?.into_iter() {
218 let batch = reader.next();
219 match batch {
220 Some(Ok(batch)) => {
221 if json_batch != batch {
222 println!("json: {json_batch:?}");
223 println!("batch: {batch:?}");
224 return Ok(false);
225 }
226 }
227 Some(Err(e)) => return Err(e),
228 None => return Ok(false),
229 }
230 }
231
232 Ok(true)
233 }
234
235 pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
237 let schema = self.schema.to_arrow_schema()?;
238
239 let mut dictionaries = HashMap::new();
240 self.dictionaries.iter().for_each(|dict_batches| {
241 dict_batches.iter().for_each(|d| {
242 dictionaries.insert(d.id, d.clone());
243 });
244 });
245
246 let batches: Result<Vec<_>> = self
247 .batches
248 .iter()
249 .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
250 .collect();
251
252 batches
253 }
254}
255
256impl ArrowJsonSchema {
257 fn equals_schema(&self, schema: &Schema) -> bool {
259 let field_len = self.fields.len();
260 if field_len != schema.fields().len() {
261 return false;
262 }
263 for i in 0..field_len {
264 let json_field = &self.fields[i];
265 let field = schema.field(i);
266 if !json_field.equals_field(field) {
267 return false;
268 }
269 }
270 true
271 }
272
273 fn to_arrow_schema(&self) -> Result<Schema> {
274 let arrow_fields: Result<Vec<_>> = self
275 .fields
276 .iter()
277 .map(|field| field.to_arrow_field())
278 .collect();
279
280 if let Some(metadatas) = &self.metadata {
281 let mut metadata: HashMap<String, String> = HashMap::new();
282
283 metadatas.iter().for_each(|pair| {
284 let key = pair.get("key").unwrap();
285 let value = pair.get("value").unwrap();
286 metadata.insert(key.clone(), value.clone());
287 });
288
289 Ok(Schema::new_with_metadata(arrow_fields?, metadata))
290 } else {
291 Ok(Schema::new(arrow_fields?))
292 }
293 }
294}
295
296impl ArrowJsonField {
297 fn equals_field(&self, field: &Field) -> bool {
299 match self.to_arrow_field() {
301 Ok(self_field) => {
302 assert_eq!(&self_field, field, "Arrow fields not the same");
303 true
304 }
305 Err(e) => {
306 eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
307 false
308 }
309 }
310 }
311
312 fn to_arrow_field(&self) -> Result<Field> {
315 let field =
317 serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
318 field_from_json(&field)
319 }
320}
321
322pub fn record_batch_from_json(
324 schema: &Schema,
325 json_batch: ArrowJsonBatch,
326 json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
327) -> Result<RecordBatch> {
328 let mut columns = vec![];
329
330 for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
331 let col = array_from_json(field, json_col, json_dictionaries)?;
332 columns.push(col);
333 }
334
335 RecordBatch::try_new(Arc::new(schema.clone()), columns)
336}
337
338pub fn array_from_json(
340 field: &Field,
341 json_col: ArrowJsonColumn,
342 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
343) -> Result<ArrayRef> {
344 match field.data_type() {
345 DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
346 DataType::Boolean => {
347 let mut b = BooleanBuilder::with_capacity(json_col.count);
348 for (is_valid, value) in json_col
349 .validity
350 .as_ref()
351 .unwrap()
352 .iter()
353 .zip(json_col.data.unwrap())
354 {
355 match is_valid {
356 1 => b.append_value(value.as_bool().unwrap()),
357 _ => b.append_null(),
358 };
359 }
360 Ok(Arc::new(b.finish()))
361 }
362 DataType::Int8 => {
363 let mut b = Int8Builder::with_capacity(json_col.count);
364 for (is_valid, value) in json_col
365 .validity
366 .as_ref()
367 .unwrap()
368 .iter()
369 .zip(json_col.data.unwrap())
370 {
371 match is_valid {
372 1 => b.append_value(value.as_i64().ok_or_else(|| {
373 ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
374 })? as i8),
375 _ => b.append_null(),
376 };
377 }
378 Ok(Arc::new(b.finish()))
379 }
380 DataType::Int16 => {
381 let mut b = Int16Builder::with_capacity(json_col.count);
382 for (is_valid, value) in json_col
383 .validity
384 .as_ref()
385 .unwrap()
386 .iter()
387 .zip(json_col.data.unwrap())
388 {
389 match is_valid {
390 1 => b.append_value(value.as_i64().unwrap() as i16),
391 _ => b.append_null(),
392 };
393 }
394 Ok(Arc::new(b.finish()))
395 }
396 DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
397 let mut b = Int32Builder::with_capacity(json_col.count);
398 for (is_valid, value) in json_col
399 .validity
400 .as_ref()
401 .unwrap()
402 .iter()
403 .zip(json_col.data.unwrap())
404 {
405 match is_valid {
406 1 => b.append_value(value.as_i64().unwrap() as i32),
407 _ => b.append_null(),
408 };
409 }
410 let array = Arc::new(b.finish()) as ArrayRef;
411 arrow::compute::cast(&array, field.data_type())
412 }
413 DataType::Interval(IntervalUnit::YearMonth) => {
414 let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
415 for (is_valid, value) in json_col
416 .validity
417 .as_ref()
418 .unwrap()
419 .iter()
420 .zip(json_col.data.unwrap())
421 {
422 match is_valid {
423 1 => b.append_value(value.as_i64().unwrap() as i32),
424 _ => b.append_null(),
425 };
426 }
427 Ok(Arc::new(b.finish()))
428 }
429 DataType::Int64
430 | DataType::Date64
431 | DataType::Time64(_)
432 | DataType::Timestamp(_, _)
433 | DataType::Duration(_) => {
434 let mut b = Int64Builder::with_capacity(json_col.count);
435 for (is_valid, value) in json_col
436 .validity
437 .as_ref()
438 .unwrap()
439 .iter()
440 .zip(json_col.data.unwrap())
441 {
442 match is_valid {
443 1 => b.append_value(match value {
444 Value::Number(n) => n.as_i64().unwrap(),
445 Value::String(s) => s.parse().expect("Unable to parse string as i64"),
446 _ => panic!("Unable to parse {value:?} as number"),
447 }),
448 _ => b.append_null(),
449 };
450 }
451 let array = Arc::new(b.finish()) as ArrayRef;
452 arrow::compute::cast(&array, field.data_type())
453 }
454 DataType::Interval(IntervalUnit::DayTime) => {
455 let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
456 for (is_valid, value) in json_col
457 .validity
458 .as_ref()
459 .unwrap()
460 .iter()
461 .zip(json_col.data.unwrap())
462 {
463 match is_valid {
464 1 => b.append_value(match value {
465 Value::Object(ref map)
466 if map.contains_key("days") && map.contains_key("milliseconds") =>
467 {
468 match field.data_type() {
469 DataType::Interval(IntervalUnit::DayTime) => {
470 let days = map.get("days").unwrap();
471 let milliseconds = map.get("milliseconds").unwrap();
472
473 match (days, milliseconds) {
474 (Value::Number(d), Value::Number(m)) => {
475 let days = d.as_i64().unwrap() as _;
476 let millis = m.as_i64().unwrap() as _;
477 IntervalDayTime::new(days, millis)
478 }
479 _ => {
480 panic!("Unable to parse {value:?} as interval daytime")
481 }
482 }
483 }
484 _ => panic!("Unable to parse {value:?} as interval daytime"),
485 }
486 }
487 _ => panic!("Unable to parse {value:?} as number"),
488 }),
489 _ => b.append_null(),
490 };
491 }
492 Ok(Arc::new(b.finish()))
493 }
494 DataType::UInt8 => {
495 let mut b = UInt8Builder::with_capacity(json_col.count);
496 for (is_valid, value) in json_col
497 .validity
498 .as_ref()
499 .unwrap()
500 .iter()
501 .zip(json_col.data.unwrap())
502 {
503 match is_valid {
504 1 => b.append_value(value.as_u64().unwrap() as u8),
505 _ => b.append_null(),
506 };
507 }
508 Ok(Arc::new(b.finish()))
509 }
510 DataType::UInt16 => {
511 let mut b = UInt16Builder::with_capacity(json_col.count);
512 for (is_valid, value) in json_col
513 .validity
514 .as_ref()
515 .unwrap()
516 .iter()
517 .zip(json_col.data.unwrap())
518 {
519 match is_valid {
520 1 => b.append_value(value.as_u64().unwrap() as u16),
521 _ => b.append_null(),
522 };
523 }
524 Ok(Arc::new(b.finish()))
525 }
526 DataType::UInt32 => {
527 let mut b = UInt32Builder::with_capacity(json_col.count);
528 for (is_valid, value) in json_col
529 .validity
530 .as_ref()
531 .unwrap()
532 .iter()
533 .zip(json_col.data.unwrap())
534 {
535 match is_valid {
536 1 => b.append_value(value.as_u64().unwrap() as u32),
537 _ => b.append_null(),
538 };
539 }
540 Ok(Arc::new(b.finish()))
541 }
542 DataType::UInt64 => {
543 let mut b = UInt64Builder::with_capacity(json_col.count);
544 for (is_valid, value) in json_col
545 .validity
546 .as_ref()
547 .unwrap()
548 .iter()
549 .zip(json_col.data.unwrap())
550 {
551 match is_valid {
552 1 => {
553 if value.is_string() {
554 b.append_value(
555 value
556 .as_str()
557 .unwrap()
558 .parse()
559 .expect("Unable to parse string as u64"),
560 )
561 } else if value.is_number() {
562 b.append_value(value.as_u64().expect("Unable to read number as u64"))
563 } else {
564 panic!("Unable to parse value {value:?} as u64")
565 }
566 }
567 _ => b.append_null(),
568 };
569 }
570 Ok(Arc::new(b.finish()))
571 }
572 DataType::Interval(IntervalUnit::MonthDayNano) => {
573 let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
574 for (is_valid, value) in json_col
575 .validity
576 .as_ref()
577 .unwrap()
578 .iter()
579 .zip(json_col.data.unwrap())
580 {
581 match is_valid {
582 1 => b.append_value(match value {
583 Value::Object(v) => {
584 let months = v.get("months").unwrap();
585 let days = v.get("days").unwrap();
586 let nanoseconds = v.get("nanoseconds").unwrap();
587 match (months, days, nanoseconds) {
588 (
589 Value::Number(months),
590 Value::Number(days),
591 Value::Number(nanoseconds),
592 ) => {
593 let months = months.as_i64().unwrap() as i32;
594 let days = days.as_i64().unwrap() as i32;
595 let nanoseconds = nanoseconds.as_i64().unwrap();
596 IntervalMonthDayNano::new(months, days, nanoseconds)
597 }
598 (_, _, _) => {
599 panic!("Unable to parse {v:?} as MonthDayNano")
600 }
601 }
602 }
603 _ => panic!("Unable to parse {value:?} as MonthDayNano"),
604 }),
605 _ => b.append_null(),
606 };
607 }
608 Ok(Arc::new(b.finish()))
609 }
610 DataType::Float32 => {
611 let mut b = Float32Builder::with_capacity(json_col.count);
612 for (is_valid, value) in json_col
613 .validity
614 .as_ref()
615 .unwrap()
616 .iter()
617 .zip(json_col.data.unwrap())
618 {
619 match is_valid {
620 1 => b.append_value(value.as_f64().unwrap() as f32),
621 _ => b.append_null(),
622 };
623 }
624 Ok(Arc::new(b.finish()))
625 }
626 DataType::Float64 => {
627 let mut b = Float64Builder::with_capacity(json_col.count);
628 for (is_valid, value) in json_col
629 .validity
630 .as_ref()
631 .unwrap()
632 .iter()
633 .zip(json_col.data.unwrap())
634 {
635 match is_valid {
636 1 => b.append_value(value.as_f64().unwrap()),
637 _ => b.append_null(),
638 };
639 }
640 Ok(Arc::new(b.finish()))
641 }
642 DataType::Binary => {
643 let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
644 for (is_valid, value) in json_col
645 .validity
646 .as_ref()
647 .unwrap()
648 .iter()
649 .zip(json_col.data.unwrap())
650 {
651 match is_valid {
652 1 => {
653 let v = decode(value.as_str().unwrap()).unwrap();
654 b.append_value(&v)
655 }
656 _ => b.append_null(),
657 };
658 }
659 Ok(Arc::new(b.finish()))
660 }
661 DataType::LargeBinary => {
662 let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
663 for (is_valid, value) in json_col
664 .validity
665 .as_ref()
666 .unwrap()
667 .iter()
668 .zip(json_col.data.unwrap())
669 {
670 match is_valid {
671 1 => {
672 let v = decode(value.as_str().unwrap()).unwrap();
673 b.append_value(&v)
674 }
675 _ => b.append_null(),
676 };
677 }
678 Ok(Arc::new(b.finish()))
679 }
680 DataType::Utf8 => {
681 let mut b = StringBuilder::with_capacity(json_col.count, 1024);
682 for (is_valid, value) in json_col
683 .validity
684 .as_ref()
685 .unwrap()
686 .iter()
687 .zip(json_col.data.unwrap())
688 {
689 match is_valid {
690 1 => b.append_value(value.as_str().unwrap()),
691 _ => b.append_null(),
692 };
693 }
694 Ok(Arc::new(b.finish()))
695 }
696 DataType::LargeUtf8 => {
697 let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
698 for (is_valid, value) in json_col
699 .validity
700 .as_ref()
701 .unwrap()
702 .iter()
703 .zip(json_col.data.unwrap())
704 {
705 match is_valid {
706 1 => b.append_value(value.as_str().unwrap()),
707 _ => b.append_null(),
708 };
709 }
710 Ok(Arc::new(b.finish()))
711 }
712 DataType::FixedSizeBinary(len) => {
713 let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
714 for (is_valid, value) in json_col
715 .validity
716 .as_ref()
717 .unwrap()
718 .iter()
719 .zip(json_col.data.unwrap())
720 {
721 match is_valid {
722 1 => {
723 let v = hex::decode(value.as_str().unwrap()).unwrap();
724 b.append_value(&v)?
725 }
726 _ => b.append_null(),
727 };
728 }
729 Ok(Arc::new(b.finish()))
730 }
731 DataType::List(child_field) => {
732 let null_buf = create_null_buf(&json_col);
733 let children = json_col.children.clone().unwrap();
734 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
735 let offsets: Vec<i32> = json_col
736 .offset
737 .unwrap()
738 .iter()
739 .map(|v| v.as_i64().unwrap() as i32)
740 .collect();
741 let list_data = ArrayData::builder(field.data_type().clone())
742 .len(json_col.count)
743 .offset(0)
744 .add_buffer(Buffer::from(offsets.to_byte_slice()))
745 .add_child_data(child_array.into_data())
746 .null_bit_buffer(Some(null_buf))
747 .build()
748 .unwrap();
749 Ok(Arc::new(ListArray::from(list_data)))
750 }
751 DataType::LargeList(child_field) => {
752 let null_buf = create_null_buf(&json_col);
753 let children = json_col.children.clone().unwrap();
754 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
755 let offsets: Vec<i64> = json_col
756 .offset
757 .unwrap()
758 .iter()
759 .map(|v| match v {
760 Value::Number(n) => n.as_i64().unwrap(),
761 Value::String(s) => s.parse::<i64>().unwrap(),
762 _ => panic!("64-bit offset must be either string or number"),
763 })
764 .collect();
765 let list_data = ArrayData::builder(field.data_type().clone())
766 .len(json_col.count)
767 .offset(0)
768 .add_buffer(Buffer::from(offsets.to_byte_slice()))
769 .add_child_data(child_array.into_data())
770 .null_bit_buffer(Some(null_buf))
771 .build()
772 .unwrap();
773 Ok(Arc::new(LargeListArray::from(list_data)))
774 }
775 DataType::FixedSizeList(child_field, _) => {
776 let children = json_col.children.clone().unwrap();
777 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
778 let null_buf = create_null_buf(&json_col);
779 let list_data = ArrayData::builder(field.data_type().clone())
780 .len(json_col.count)
781 .add_child_data(child_array.into_data())
782 .null_bit_buffer(Some(null_buf))
783 .build()
784 .unwrap();
785 Ok(Arc::new(FixedSizeListArray::from(list_data)))
786 }
787 DataType::Struct(fields) => {
788 let null_buf = create_null_buf(&json_col);
790 let mut array_data = ArrayData::builder(field.data_type().clone())
791 .len(json_col.count)
792 .null_bit_buffer(Some(null_buf));
793
794 for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
795 let array = array_from_json(field, col, dictionaries)?;
796 array_data = array_data.add_child_data(array.into_data());
797 }
798
799 let array = StructArray::from(array_data.build().unwrap());
800 Ok(Arc::new(array))
801 }
802 DataType::Dictionary(key_type, value_type) => {
803 #[allow(deprecated)]
804 let dict_id = field.dict_id().ok_or_else(|| {
805 ArrowError::JsonError(format!("Unable to find dict_id for field {field}"))
806 })?;
807 let dictionary = dictionaries
809 .ok_or_else(|| {
810 ArrowError::JsonError(format!(
811 "Unable to find any dictionaries for field {field}"
812 ))
813 })?
814 .get(&dict_id);
815 match dictionary {
816 Some(dictionary) => dictionary_array_from_json(
817 field,
818 json_col,
819 key_type,
820 value_type,
821 dictionary,
822 dictionaries,
823 ),
824 None => Err(ArrowError::JsonError(format!(
825 "Unable to find dictionary for field {field}"
826 ))),
827 }
828 }
829 DataType::Decimal32(precision, scale) => {
830 let mut b = Decimal32Builder::with_capacity(json_col.count);
831 for (is_valid, value) in json_col
832 .validity
833 .as_ref()
834 .unwrap()
835 .iter()
836 .zip(json_col.data.unwrap())
837 {
838 match is_valid {
839 1 => b.append_value(value.as_str().unwrap().parse::<i32>().unwrap()),
840 _ => b.append_null(),
841 };
842 }
843 Ok(Arc::new(
844 b.finish().with_precision_and_scale(*precision, *scale)?,
845 ))
846 }
847 DataType::Decimal64(precision, scale) => {
848 let mut b = Decimal64Builder::with_capacity(json_col.count);
849 for (is_valid, value) in json_col
850 .validity
851 .as_ref()
852 .unwrap()
853 .iter()
854 .zip(json_col.data.unwrap())
855 {
856 match is_valid {
857 1 => b.append_value(value.as_str().unwrap().parse::<i64>().unwrap()),
858 _ => b.append_null(),
859 };
860 }
861 Ok(Arc::new(
862 b.finish().with_precision_and_scale(*precision, *scale)?,
863 ))
864 }
865 DataType::Decimal128(precision, scale) => {
866 let mut b = Decimal128Builder::with_capacity(json_col.count);
867 for (is_valid, value) in json_col
868 .validity
869 .as_ref()
870 .unwrap()
871 .iter()
872 .zip(json_col.data.unwrap())
873 {
874 match is_valid {
875 1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
876 _ => b.append_null(),
877 };
878 }
879 Ok(Arc::new(
880 b.finish().with_precision_and_scale(*precision, *scale)?,
881 ))
882 }
883 DataType::Decimal256(precision, scale) => {
884 let mut b = Decimal256Builder::with_capacity(json_col.count);
885 for (is_valid, value) in json_col
886 .validity
887 .as_ref()
888 .unwrap()
889 .iter()
890 .zip(json_col.data.unwrap())
891 {
892 match is_valid {
893 1 => {
894 let str = value.as_str().unwrap();
895 let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
896 let integer_bytes = integer.to_signed_bytes_le();
897 let mut bytes = if integer.is_positive() {
898 [0_u8; 32]
899 } else {
900 [255_u8; 32]
901 };
902 bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
903 b.append_value(i256::from_le_bytes(bytes));
904 }
905 _ => b.append_null(),
906 }
907 }
908 Ok(Arc::new(
909 b.finish().with_precision_and_scale(*precision, *scale)?,
910 ))
911 }
912 DataType::Map(child_field, _) => {
913 let null_buf = create_null_buf(&json_col);
914 let children = json_col.children.clone().unwrap();
915 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
916 let offsets: Vec<i32> = json_col
917 .offset
918 .unwrap()
919 .iter()
920 .map(|v| v.as_i64().unwrap() as i32)
921 .collect();
922 let array_data = ArrayData::builder(field.data_type().clone())
923 .len(json_col.count)
924 .add_buffer(Buffer::from(offsets.to_byte_slice()))
925 .add_child_data(child_array.into_data())
926 .null_bit_buffer(Some(null_buf))
927 .build()
928 .unwrap();
929
930 let array = MapArray::from(array_data);
931 Ok(Arc::new(array))
932 }
933 DataType::Union(fields, _) => {
934 let type_ids = if let Some(type_id) = json_col.type_id {
935 type_id
936 } else {
937 return Err(ArrowError::JsonError(
938 "Cannot find expected type_id in json column".to_string(),
939 ));
940 };
941
942 let offset: Option<ScalarBuffer<i32>> = json_col
943 .offset
944 .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
945
946 let mut children = Vec::with_capacity(fields.len());
947 for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
948 let array = array_from_json(field, col, dictionaries)?;
949 children.push(array);
950 }
951
952 let array =
953 UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
954 Ok(Arc::new(array))
955 }
956 t => Err(ArrowError::JsonError(format!(
957 "data type {t} not supported"
958 ))),
959 }
960}
961
962pub fn dictionary_array_from_json(
964 field: &Field,
965 json_col: ArrowJsonColumn,
966 dict_key: &DataType,
967 dict_value: &DataType,
968 dictionary: &ArrowJsonDictionaryBatch,
969 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
970) -> Result<ArrayRef> {
971 match dict_key {
972 DataType::Int8
973 | DataType::Int16
974 | DataType::Int32
975 | DataType::Int64
976 | DataType::UInt8
977 | DataType::UInt16
978 | DataType::UInt32
979 | DataType::UInt64 => {
980 let null_buf = create_null_buf(&json_col);
981
982 #[allow(deprecated)]
984 let key_field = Field::new_dict(
985 "key",
986 dict_key.clone(),
987 field.is_nullable(),
988 #[allow(deprecated)]
989 field
990 .dict_id()
991 .expect("Dictionary fields must have a dict_id value"),
992 field
993 .dict_is_ordered()
994 .expect("Dictionary fields must have a dict_is_ordered value"),
995 );
996 let keys = array_from_json(&key_field, json_col, None)?;
997 let value_field = Field::new("value", dict_value.clone(), true);
999 let values = array_from_json(
1000 &value_field,
1001 dictionary.data.columns[0].clone(),
1002 dictionaries,
1003 )?;
1004
1005 let dict_data = ArrayData::builder(field.data_type().clone())
1007 .len(keys.len())
1008 .add_buffer(keys.to_data().buffers()[0].clone())
1009 .null_bit_buffer(Some(null_buf))
1010 .add_child_data(values.into_data())
1011 .build()
1012 .unwrap();
1013
1014 let array = match dict_key {
1015 DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
1016 DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
1017 DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
1018 DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
1019 DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
1020 DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
1021 DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
1022 DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
1023 _ => unreachable!(),
1024 };
1025 Ok(array)
1026 }
1027 _ => Err(ArrowError::JsonError(format!(
1028 "Dictionary key type {dict_key:?} not supported"
1029 ))),
1030 }
1031}
1032
1033fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
1035 let num_bytes = bit_util::ceil(json_col.count, 8);
1036 let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
1037 json_col
1038 .validity
1039 .clone()
1040 .unwrap()
1041 .iter()
1042 .enumerate()
1043 .for_each(|(i, v)| {
1044 let null_slice = null_buf.as_slice_mut();
1045 if *v != 0 {
1046 bit_util::set_bit(null_slice, i);
1047 }
1048 });
1049 null_buf.into()
1050}
1051
1052impl ArrowJsonBatch {
1053 pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1065 let mut json_batch = ArrowJsonBatch {
1066 count: batch.num_rows(),
1067 columns: Vec::with_capacity(batch.num_columns()),
1068 };
1069
1070 for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1071 let json_col = match field.data_type() {
1072 DataType::Int8 => {
1073 let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1074
1075 let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1076 let mut data: Vec<Value> = Vec::with_capacity(col.len());
1077
1078 for i in 0..col.len() {
1079 if col.is_null(i) {
1080 validity.push(1);
1081 data.push(0i8.into());
1082 } else {
1083 validity.push(0);
1084 data.push(col.value(i).into());
1085 }
1086 }
1087
1088 ArrowJsonColumn {
1089 name: field.name().clone(),
1090 count: col.len(),
1091 validity: Some(validity),
1092 data: Some(data),
1093 offset: None,
1094 type_id: None,
1095 children: None,
1096 }
1097 }
1098 _ => ArrowJsonColumn {
1099 name: field.name().clone(),
1100 count: col.len(),
1101 validity: None,
1102 data: None,
1103 offset: None,
1104 type_id: None,
1105 children: None,
1106 },
1107 };
1108
1109 json_batch.columns.push(json_col);
1110 }
1111
1112 json_batch
1113 }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118 use super::*;
1119
1120 use std::fs::File;
1121 use std::io::Read;
1122
1123 #[test]
1124 fn test_schema_equality() {
1125 let json = r#"
1126 {
1127 "fields": [
1128 {
1129 "name": "c1",
1130 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1131 "nullable": true,
1132 "children": []
1133 },
1134 {
1135 "name": "c2",
1136 "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1137 "nullable": true,
1138 "children": []
1139 },
1140 {
1141 "name": "c3",
1142 "type": {"name": "utf8"},
1143 "nullable": true,
1144 "children": []
1145 },
1146 {
1147 "name": "c4",
1148 "type": {
1149 "name": "list"
1150 },
1151 "nullable": true,
1152 "children": [
1153 {
1154 "name": "custom_item",
1155 "type": {
1156 "name": "int",
1157 "isSigned": true,
1158 "bitWidth": 32
1159 },
1160 "nullable": false,
1161 "children": []
1162 }
1163 ]
1164 }
1165 ]
1166 }"#;
1167 let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1168 let schema = Schema::new(vec![
1169 Field::new("c1", DataType::Int32, true),
1170 Field::new("c2", DataType::Float64, true),
1171 Field::new("c3", DataType::Utf8, true),
1172 Field::new(
1173 "c4",
1174 DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1175 true,
1176 ),
1177 ]);
1178 assert!(json_schema.equals_schema(&schema));
1179 }
1180
1181 #[test]
1182 fn test_arrow_data_equality() {
1183 let secs_tz = Some("Europe/Budapest".into());
1184 let millis_tz = Some("America/New_York".into());
1185 let micros_tz = Some("UTC".into());
1186 let nanos_tz = Some("Africa/Johannesburg".into());
1187
1188 let schema = Schema::new(vec![
1189 Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1190 [("k".to_string(), "v".to_string())]
1191 .iter()
1192 .cloned()
1193 .collect(),
1194 ),
1195 Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1196 [("k2".to_string(), "v2".to_string())]
1197 .iter()
1198 .cloned()
1199 .collect(),
1200 ),
1201 Field::new("bools", DataType::Boolean, true),
1202 Field::new("int8s", DataType::Int8, true),
1203 Field::new("int16s", DataType::Int16, true),
1204 Field::new("int32s", DataType::Int32, true),
1205 Field::new("int64s", DataType::Int64, true),
1206 Field::new("uint8s", DataType::UInt8, true),
1207 Field::new("uint16s", DataType::UInt16, true),
1208 Field::new("uint32s", DataType::UInt32, true),
1209 Field::new("uint64s", DataType::UInt64, true),
1210 Field::new("float32s", DataType::Float32, true),
1211 Field::new("float64s", DataType::Float64, true),
1212 Field::new("date_days", DataType::Date32, true),
1213 Field::new("date_millis", DataType::Date64, true),
1214 Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1215 Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1216 Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1217 Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1218 Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1219 Field::new(
1220 "ts_millis",
1221 DataType::Timestamp(TimeUnit::Millisecond, None),
1222 true,
1223 ),
1224 Field::new(
1225 "ts_micros",
1226 DataType::Timestamp(TimeUnit::Microsecond, None),
1227 true,
1228 ),
1229 Field::new(
1230 "ts_nanos",
1231 DataType::Timestamp(TimeUnit::Nanosecond, None),
1232 true,
1233 ),
1234 Field::new(
1235 "ts_secs_tz",
1236 DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1237 true,
1238 ),
1239 Field::new(
1240 "ts_millis_tz",
1241 DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1242 true,
1243 ),
1244 Field::new(
1245 "ts_micros_tz",
1246 DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1247 true,
1248 ),
1249 Field::new(
1250 "ts_nanos_tz",
1251 DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1252 true,
1253 ),
1254 Field::new("utf8s", DataType::Utf8, true),
1255 Field::new(
1256 "lists",
1257 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1258 true,
1259 ),
1260 Field::new(
1261 "structs",
1262 DataType::Struct(Fields::from(vec![
1263 Field::new("int32s", DataType::Int32, true),
1264 Field::new("utf8s", DataType::Utf8, true),
1265 ])),
1266 true,
1267 ),
1268 ]);
1269
1270 let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1271 let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1272 let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1273 let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1274 let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1275 let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1276 let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1277 let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1278 let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1279 let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1280 let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1281 let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1282 let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1283 let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1284 let date_millis = Date64Array::from(vec![
1285 Some(167903550396207),
1286 Some(29923997007884),
1287 Some(30612271819236),
1288 ]);
1289 let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1290 let time_millis =
1291 Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1292 let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1293 let time_nanos =
1294 Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1295 let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1296 let ts_millis =
1297 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1298 let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1299 let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1300 let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1301 .with_timezone_opt(secs_tz);
1302 let ts_millis_tz =
1303 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1304 .with_timezone_opt(millis_tz);
1305 let ts_micros_tz =
1306 TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1307 let ts_nanos_tz =
1308 TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1309 .with_timezone_opt(nanos_tz);
1310 let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1311
1312 let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1313 let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1314 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1315 let list_data = ArrayData::builder(list_data_type)
1316 .len(3)
1317 .add_buffer(value_offsets)
1318 .add_child_data(value_data.into_data())
1319 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1320 .build()
1321 .unwrap();
1322 let lists = ListArray::from(list_data);
1323
1324 let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1325 let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1326 let struct_data_type = DataType::Struct(Fields::from(vec![
1327 Field::new("int32s", DataType::Int32, true),
1328 Field::new("utf8s", DataType::Utf8, true),
1329 ]));
1330 let struct_data = ArrayData::builder(struct_data_type)
1331 .len(3)
1332 .add_child_data(structs_int32s.into_data())
1333 .add_child_data(structs_utf8s.into_data())
1334 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1335 .build()
1336 .unwrap();
1337 let structs = StructArray::from(struct_data);
1338
1339 let record_batch = RecordBatch::try_new(
1340 Arc::new(schema.clone()),
1341 vec![
1342 Arc::new(bools_with_metadata_map),
1343 Arc::new(bools_with_metadata_vec),
1344 Arc::new(bools),
1345 Arc::new(int8s),
1346 Arc::new(int16s),
1347 Arc::new(int32s),
1348 Arc::new(int64s),
1349 Arc::new(uint8s),
1350 Arc::new(uint16s),
1351 Arc::new(uint32s),
1352 Arc::new(uint64s),
1353 Arc::new(float32s),
1354 Arc::new(float64s),
1355 Arc::new(date_days),
1356 Arc::new(date_millis),
1357 Arc::new(time_secs),
1358 Arc::new(time_millis),
1359 Arc::new(time_micros),
1360 Arc::new(time_nanos),
1361 Arc::new(ts_secs),
1362 Arc::new(ts_millis),
1363 Arc::new(ts_micros),
1364 Arc::new(ts_nanos),
1365 Arc::new(ts_secs_tz),
1366 Arc::new(ts_millis_tz),
1367 Arc::new(ts_micros_tz),
1368 Arc::new(ts_nanos_tz),
1369 Arc::new(utf8s),
1370 Arc::new(lists),
1371 Arc::new(structs),
1372 ],
1373 )
1374 .unwrap();
1375 let mut file = File::open("data/integration.json").unwrap();
1376 let mut json = String::new();
1377 file.read_to_string(&mut json).unwrap();
1378 let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1379 assert!(arrow_json.schema.equals_schema(&schema));
1381 assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1383 }
1384}