1#![doc(
25 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
26 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
27)]
28#![cfg_attr(docsrs, feature(doc_auto_cfg))]
29#![warn(missing_docs)]
30use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
31use hex::decode;
32use num::BigInt;
33use num::Signed;
34use serde::{Deserialize, Serialize};
35use serde_json::{Map as SJMap, Value};
36use std::collections::HashMap;
37use std::sync::Arc;
38
39use arrow::array::*;
40use arrow::buffer::{Buffer, MutableBuffer};
41use arrow::datatypes::*;
42use arrow::error::{ArrowError, Result};
43use arrow::util::bit_util;
44
45mod datatype;
46mod field;
47mod schema;
48
49pub use datatype::*;
50pub use field::*;
51pub use schema::*;
52
53#[derive(Deserialize, Serialize, Debug)]
57pub struct ArrowJson {
58 pub schema: ArrowJsonSchema,
60 pub batches: Vec<ArrowJsonBatch>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
65}
66
67#[derive(Deserialize, Serialize, Debug)]
71pub struct ArrowJsonSchema {
72 pub fields: Vec<ArrowJsonField>,
74 #[serde(skip_serializing_if = "Option::is_none")]
76 pub metadata: Option<Vec<HashMap<String, String>>>,
77}
78
79#[derive(Deserialize, Serialize, Debug)]
81pub struct ArrowJsonField {
82 pub name: String,
84 #[serde(rename = "type")]
87 pub field_type: Value,
88 pub nullable: bool,
90 pub children: Vec<ArrowJsonField>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub dictionary: Option<ArrowJsonFieldDictionary>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub metadata: Option<Value>,
98}
99
100impl From<&FieldRef> for ArrowJsonField {
101 fn from(value: &FieldRef) -> Self {
102 Self::from(value.as_ref())
103 }
104}
105
106impl From<&Field> for ArrowJsonField {
107 fn from(field: &Field) -> Self {
108 let metadata_value = match field.metadata().is_empty() {
109 false => {
110 let mut array = Vec::new();
111 for (k, v) in field.metadata() {
112 let mut kv_map = SJMap::new();
113 kv_map.insert(k.clone(), Value::String(v.clone()));
114 array.push(Value::Object(kv_map));
115 }
116 if !array.is_empty() {
117 Some(Value::Array(array))
118 } else {
119 None
120 }
121 }
122 _ => None,
123 };
124
125 Self {
126 name: field.name().to_string(),
127 field_type: data_type_to_json(field.data_type()),
128 nullable: field.is_nullable(),
129 children: vec![],
130 dictionary: None, metadata: metadata_value,
132 }
133 }
134}
135
136#[derive(Deserialize, Serialize, Debug)]
138pub struct ArrowJsonFieldDictionary {
139 pub id: i64,
141 #[serde(rename = "indexType")]
143 pub index_type: DictionaryIndexType,
144 #[serde(rename = "isOrdered")]
146 pub is_ordered: bool,
147}
148
149#[derive(Deserialize, Serialize, Debug)]
151pub struct DictionaryIndexType {
152 pub name: String,
154 #[serde(rename = "isSigned")]
156 pub is_signed: bool,
157 #[serde(rename = "bitWidth")]
159 pub bit_width: i64,
160}
161
162#[derive(Deserialize, Serialize, Debug, Clone)]
164pub struct ArrowJsonBatch {
165 count: usize,
166 pub columns: Vec<ArrowJsonColumn>,
168}
169
170#[derive(Deserialize, Serialize, Debug, Clone)]
172#[allow(non_snake_case)]
173pub struct ArrowJsonDictionaryBatch {
174 pub id: i64,
176 pub data: ArrowJsonBatch,
178}
179
180#[derive(Deserialize, Serialize, Clone, Debug)]
182pub struct ArrowJsonColumn {
183 name: String,
184 pub count: usize,
186 #[serde(rename = "VALIDITY")]
188 pub validity: Option<Vec<u8>>,
189 #[serde(rename = "DATA")]
191 pub data: Option<Vec<Value>>,
192 #[serde(rename = "OFFSET")]
194 pub offset: Option<Vec<Value>>, #[serde(rename = "TYPE_ID")]
197 pub type_id: Option<Vec<i8>>,
198 pub children: Option<Vec<ArrowJsonColumn>>,
200}
201
202impl ArrowJson {
203 pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
205 if !self.schema.equals_schema(&reader.schema()) {
206 return Ok(false);
207 }
208
209 for json_batch in self.get_record_batches()?.into_iter() {
210 let batch = reader.next();
211 match batch {
212 Some(Ok(batch)) => {
213 if json_batch != batch {
214 println!("json: {json_batch:?}");
215 println!("batch: {batch:?}");
216 return Ok(false);
217 }
218 }
219 Some(Err(e)) => return Err(e),
220 None => return Ok(false),
221 }
222 }
223
224 Ok(true)
225 }
226
227 pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
229 let schema = self.schema.to_arrow_schema()?;
230
231 let mut dictionaries = HashMap::new();
232 self.dictionaries.iter().for_each(|dict_batches| {
233 dict_batches.iter().for_each(|d| {
234 dictionaries.insert(d.id, d.clone());
235 });
236 });
237
238 let batches: Result<Vec<_>> = self
239 .batches
240 .iter()
241 .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
242 .collect();
243
244 batches
245 }
246}
247
248impl ArrowJsonSchema {
249 fn equals_schema(&self, schema: &Schema) -> bool {
251 let field_len = self.fields.len();
252 if field_len != schema.fields().len() {
253 return false;
254 }
255 for i in 0..field_len {
256 let json_field = &self.fields[i];
257 let field = schema.field(i);
258 if !json_field.equals_field(field) {
259 return false;
260 }
261 }
262 true
263 }
264
265 fn to_arrow_schema(&self) -> Result<Schema> {
266 let arrow_fields: Result<Vec<_>> = self
267 .fields
268 .iter()
269 .map(|field| field.to_arrow_field())
270 .collect();
271
272 if let Some(metadatas) = &self.metadata {
273 let mut metadata: HashMap<String, String> = HashMap::new();
274
275 metadatas.iter().for_each(|pair| {
276 let key = pair.get("key").unwrap();
277 let value = pair.get("value").unwrap();
278 metadata.insert(key.clone(), value.clone());
279 });
280
281 Ok(Schema::new_with_metadata(arrow_fields?, metadata))
282 } else {
283 Ok(Schema::new(arrow_fields?))
284 }
285 }
286}
287
288impl ArrowJsonField {
289 fn equals_field(&self, field: &Field) -> bool {
291 match self.to_arrow_field() {
293 Ok(self_field) => {
294 assert_eq!(&self_field, field, "Arrow fields not the same");
295 true
296 }
297 Err(e) => {
298 eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
299 false
300 }
301 }
302 }
303
304 fn to_arrow_field(&self) -> Result<Field> {
307 let field =
309 serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
310 field_from_json(&field)
311 }
312}
313
314pub fn record_batch_from_json(
316 schema: &Schema,
317 json_batch: ArrowJsonBatch,
318 json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
319) -> Result<RecordBatch> {
320 let mut columns = vec![];
321
322 for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
323 let col = array_from_json(field, json_col, json_dictionaries)?;
324 columns.push(col);
325 }
326
327 RecordBatch::try_new(Arc::new(schema.clone()), columns)
328}
329
330pub fn array_from_json(
332 field: &Field,
333 json_col: ArrowJsonColumn,
334 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
335) -> Result<ArrayRef> {
336 match field.data_type() {
337 DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
338 DataType::Boolean => {
339 let mut b = BooleanBuilder::with_capacity(json_col.count);
340 for (is_valid, value) in json_col
341 .validity
342 .as_ref()
343 .unwrap()
344 .iter()
345 .zip(json_col.data.unwrap())
346 {
347 match is_valid {
348 1 => b.append_value(value.as_bool().unwrap()),
349 _ => b.append_null(),
350 };
351 }
352 Ok(Arc::new(b.finish()))
353 }
354 DataType::Int8 => {
355 let mut b = Int8Builder::with_capacity(json_col.count);
356 for (is_valid, value) in json_col
357 .validity
358 .as_ref()
359 .unwrap()
360 .iter()
361 .zip(json_col.data.unwrap())
362 {
363 match is_valid {
364 1 => b.append_value(value.as_i64().ok_or_else(|| {
365 ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
366 })? as i8),
367 _ => b.append_null(),
368 };
369 }
370 Ok(Arc::new(b.finish()))
371 }
372 DataType::Int16 => {
373 let mut b = Int16Builder::with_capacity(json_col.count);
374 for (is_valid, value) in json_col
375 .validity
376 .as_ref()
377 .unwrap()
378 .iter()
379 .zip(json_col.data.unwrap())
380 {
381 match is_valid {
382 1 => b.append_value(value.as_i64().unwrap() as i16),
383 _ => b.append_null(),
384 };
385 }
386 Ok(Arc::new(b.finish()))
387 }
388 DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
389 let mut b = Int32Builder::with_capacity(json_col.count);
390 for (is_valid, value) in json_col
391 .validity
392 .as_ref()
393 .unwrap()
394 .iter()
395 .zip(json_col.data.unwrap())
396 {
397 match is_valid {
398 1 => b.append_value(value.as_i64().unwrap() as i32),
399 _ => b.append_null(),
400 };
401 }
402 let array = Arc::new(b.finish()) as ArrayRef;
403 arrow::compute::cast(&array, field.data_type())
404 }
405 DataType::Interval(IntervalUnit::YearMonth) => {
406 let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
407 for (is_valid, value) in json_col
408 .validity
409 .as_ref()
410 .unwrap()
411 .iter()
412 .zip(json_col.data.unwrap())
413 {
414 match is_valid {
415 1 => b.append_value(value.as_i64().unwrap() as i32),
416 _ => b.append_null(),
417 };
418 }
419 Ok(Arc::new(b.finish()))
420 }
421 DataType::Int64
422 | DataType::Date64
423 | DataType::Time64(_)
424 | DataType::Timestamp(_, _)
425 | DataType::Duration(_) => {
426 let mut b = Int64Builder::with_capacity(json_col.count);
427 for (is_valid, value) in json_col
428 .validity
429 .as_ref()
430 .unwrap()
431 .iter()
432 .zip(json_col.data.unwrap())
433 {
434 match is_valid {
435 1 => b.append_value(match value {
436 Value::Number(n) => n.as_i64().unwrap(),
437 Value::String(s) => s.parse().expect("Unable to parse string as i64"),
438 _ => panic!("Unable to parse {value:?} as number"),
439 }),
440 _ => b.append_null(),
441 };
442 }
443 let array = Arc::new(b.finish()) as ArrayRef;
444 arrow::compute::cast(&array, field.data_type())
445 }
446 DataType::Interval(IntervalUnit::DayTime) => {
447 let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
448 for (is_valid, value) in json_col
449 .validity
450 .as_ref()
451 .unwrap()
452 .iter()
453 .zip(json_col.data.unwrap())
454 {
455 match is_valid {
456 1 => b.append_value(match value {
457 Value::Object(ref map)
458 if map.contains_key("days") && map.contains_key("milliseconds") =>
459 {
460 match field.data_type() {
461 DataType::Interval(IntervalUnit::DayTime) => {
462 let days = map.get("days").unwrap();
463 let milliseconds = map.get("milliseconds").unwrap();
464
465 match (days, milliseconds) {
466 (Value::Number(d), Value::Number(m)) => {
467 let days = d.as_i64().unwrap() as _;
468 let millis = m.as_i64().unwrap() as _;
469 IntervalDayTime::new(days, millis)
470 }
471 _ => {
472 panic!("Unable to parse {value:?} as interval daytime")
473 }
474 }
475 }
476 _ => panic!("Unable to parse {value:?} as interval daytime"),
477 }
478 }
479 _ => panic!("Unable to parse {value:?} as number"),
480 }),
481 _ => b.append_null(),
482 };
483 }
484 Ok(Arc::new(b.finish()))
485 }
486 DataType::UInt8 => {
487 let mut b = UInt8Builder::with_capacity(json_col.count);
488 for (is_valid, value) in json_col
489 .validity
490 .as_ref()
491 .unwrap()
492 .iter()
493 .zip(json_col.data.unwrap())
494 {
495 match is_valid {
496 1 => b.append_value(value.as_u64().unwrap() as u8),
497 _ => b.append_null(),
498 };
499 }
500 Ok(Arc::new(b.finish()))
501 }
502 DataType::UInt16 => {
503 let mut b = UInt16Builder::with_capacity(json_col.count);
504 for (is_valid, value) in json_col
505 .validity
506 .as_ref()
507 .unwrap()
508 .iter()
509 .zip(json_col.data.unwrap())
510 {
511 match is_valid {
512 1 => b.append_value(value.as_u64().unwrap() as u16),
513 _ => b.append_null(),
514 };
515 }
516 Ok(Arc::new(b.finish()))
517 }
518 DataType::UInt32 => {
519 let mut b = UInt32Builder::with_capacity(json_col.count);
520 for (is_valid, value) in json_col
521 .validity
522 .as_ref()
523 .unwrap()
524 .iter()
525 .zip(json_col.data.unwrap())
526 {
527 match is_valid {
528 1 => b.append_value(value.as_u64().unwrap() as u32),
529 _ => b.append_null(),
530 };
531 }
532 Ok(Arc::new(b.finish()))
533 }
534 DataType::UInt64 => {
535 let mut b = UInt64Builder::with_capacity(json_col.count);
536 for (is_valid, value) in json_col
537 .validity
538 .as_ref()
539 .unwrap()
540 .iter()
541 .zip(json_col.data.unwrap())
542 {
543 match is_valid {
544 1 => {
545 if value.is_string() {
546 b.append_value(
547 value
548 .as_str()
549 .unwrap()
550 .parse()
551 .expect("Unable to parse string as u64"),
552 )
553 } else if value.is_number() {
554 b.append_value(value.as_u64().expect("Unable to read number as u64"))
555 } else {
556 panic!("Unable to parse value {value:?} as u64")
557 }
558 }
559 _ => b.append_null(),
560 };
561 }
562 Ok(Arc::new(b.finish()))
563 }
564 DataType::Interval(IntervalUnit::MonthDayNano) => {
565 let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
566 for (is_valid, value) in json_col
567 .validity
568 .as_ref()
569 .unwrap()
570 .iter()
571 .zip(json_col.data.unwrap())
572 {
573 match is_valid {
574 1 => b.append_value(match value {
575 Value::Object(v) => {
576 let months = v.get("months").unwrap();
577 let days = v.get("days").unwrap();
578 let nanoseconds = v.get("nanoseconds").unwrap();
579 match (months, days, nanoseconds) {
580 (
581 Value::Number(months),
582 Value::Number(days),
583 Value::Number(nanoseconds),
584 ) => {
585 let months = months.as_i64().unwrap() as i32;
586 let days = days.as_i64().unwrap() as i32;
587 let nanoseconds = nanoseconds.as_i64().unwrap();
588 IntervalMonthDayNano::new(months, days, nanoseconds)
589 }
590 (_, _, _) => {
591 panic!("Unable to parse {v:?} as MonthDayNano")
592 }
593 }
594 }
595 _ => panic!("Unable to parse {value:?} as MonthDayNano"),
596 }),
597 _ => b.append_null(),
598 };
599 }
600 Ok(Arc::new(b.finish()))
601 }
602 DataType::Float32 => {
603 let mut b = Float32Builder::with_capacity(json_col.count);
604 for (is_valid, value) in json_col
605 .validity
606 .as_ref()
607 .unwrap()
608 .iter()
609 .zip(json_col.data.unwrap())
610 {
611 match is_valid {
612 1 => b.append_value(value.as_f64().unwrap() as f32),
613 _ => b.append_null(),
614 };
615 }
616 Ok(Arc::new(b.finish()))
617 }
618 DataType::Float64 => {
619 let mut b = Float64Builder::with_capacity(json_col.count);
620 for (is_valid, value) in json_col
621 .validity
622 .as_ref()
623 .unwrap()
624 .iter()
625 .zip(json_col.data.unwrap())
626 {
627 match is_valid {
628 1 => b.append_value(value.as_f64().unwrap()),
629 _ => b.append_null(),
630 };
631 }
632 Ok(Arc::new(b.finish()))
633 }
634 DataType::Binary => {
635 let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
636 for (is_valid, value) in json_col
637 .validity
638 .as_ref()
639 .unwrap()
640 .iter()
641 .zip(json_col.data.unwrap())
642 {
643 match is_valid {
644 1 => {
645 let v = decode(value.as_str().unwrap()).unwrap();
646 b.append_value(&v)
647 }
648 _ => b.append_null(),
649 };
650 }
651 Ok(Arc::new(b.finish()))
652 }
653 DataType::LargeBinary => {
654 let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
655 for (is_valid, value) in json_col
656 .validity
657 .as_ref()
658 .unwrap()
659 .iter()
660 .zip(json_col.data.unwrap())
661 {
662 match is_valid {
663 1 => {
664 let v = decode(value.as_str().unwrap()).unwrap();
665 b.append_value(&v)
666 }
667 _ => b.append_null(),
668 };
669 }
670 Ok(Arc::new(b.finish()))
671 }
672 DataType::Utf8 => {
673 let mut b = StringBuilder::with_capacity(json_col.count, 1024);
674 for (is_valid, value) in json_col
675 .validity
676 .as_ref()
677 .unwrap()
678 .iter()
679 .zip(json_col.data.unwrap())
680 {
681 match is_valid {
682 1 => b.append_value(value.as_str().unwrap()),
683 _ => b.append_null(),
684 };
685 }
686 Ok(Arc::new(b.finish()))
687 }
688 DataType::LargeUtf8 => {
689 let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
690 for (is_valid, value) in json_col
691 .validity
692 .as_ref()
693 .unwrap()
694 .iter()
695 .zip(json_col.data.unwrap())
696 {
697 match is_valid {
698 1 => b.append_value(value.as_str().unwrap()),
699 _ => b.append_null(),
700 };
701 }
702 Ok(Arc::new(b.finish()))
703 }
704 DataType::FixedSizeBinary(len) => {
705 let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
706 for (is_valid, value) in json_col
707 .validity
708 .as_ref()
709 .unwrap()
710 .iter()
711 .zip(json_col.data.unwrap())
712 {
713 match is_valid {
714 1 => {
715 let v = hex::decode(value.as_str().unwrap()).unwrap();
716 b.append_value(&v)?
717 }
718 _ => b.append_null(),
719 };
720 }
721 Ok(Arc::new(b.finish()))
722 }
723 DataType::List(child_field) => {
724 let null_buf = create_null_buf(&json_col);
725 let children = json_col.children.clone().unwrap();
726 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
727 let offsets: Vec<i32> = json_col
728 .offset
729 .unwrap()
730 .iter()
731 .map(|v| v.as_i64().unwrap() as i32)
732 .collect();
733 let list_data = ArrayData::builder(field.data_type().clone())
734 .len(json_col.count)
735 .offset(0)
736 .add_buffer(Buffer::from(offsets.to_byte_slice()))
737 .add_child_data(child_array.into_data())
738 .null_bit_buffer(Some(null_buf))
739 .build()
740 .unwrap();
741 Ok(Arc::new(ListArray::from(list_data)))
742 }
743 DataType::LargeList(child_field) => {
744 let null_buf = create_null_buf(&json_col);
745 let children = json_col.children.clone().unwrap();
746 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
747 let offsets: Vec<i64> = json_col
748 .offset
749 .unwrap()
750 .iter()
751 .map(|v| match v {
752 Value::Number(n) => n.as_i64().unwrap(),
753 Value::String(s) => s.parse::<i64>().unwrap(),
754 _ => panic!("64-bit offset must be either string or number"),
755 })
756 .collect();
757 let list_data = ArrayData::builder(field.data_type().clone())
758 .len(json_col.count)
759 .offset(0)
760 .add_buffer(Buffer::from(offsets.to_byte_slice()))
761 .add_child_data(child_array.into_data())
762 .null_bit_buffer(Some(null_buf))
763 .build()
764 .unwrap();
765 Ok(Arc::new(LargeListArray::from(list_data)))
766 }
767 DataType::FixedSizeList(child_field, _) => {
768 let children = json_col.children.clone().unwrap();
769 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
770 let null_buf = create_null_buf(&json_col);
771 let list_data = ArrayData::builder(field.data_type().clone())
772 .len(json_col.count)
773 .add_child_data(child_array.into_data())
774 .null_bit_buffer(Some(null_buf))
775 .build()
776 .unwrap();
777 Ok(Arc::new(FixedSizeListArray::from(list_data)))
778 }
779 DataType::Struct(fields) => {
780 let null_buf = create_null_buf(&json_col);
782 let mut array_data = ArrayData::builder(field.data_type().clone())
783 .len(json_col.count)
784 .null_bit_buffer(Some(null_buf));
785
786 for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
787 let array = array_from_json(field, col, dictionaries)?;
788 array_data = array_data.add_child_data(array.into_data());
789 }
790
791 let array = StructArray::from(array_data.build().unwrap());
792 Ok(Arc::new(array))
793 }
794 DataType::Dictionary(key_type, value_type) => {
795 #[allow(deprecated)]
796 let dict_id = field.dict_id().ok_or_else(|| {
797 ArrowError::JsonError(format!("Unable to find dict_id for field {field:?}"))
798 })?;
799 let dictionary = dictionaries
801 .ok_or_else(|| {
802 ArrowError::JsonError(format!(
803 "Unable to find any dictionaries for field {field:?}"
804 ))
805 })?
806 .get(&dict_id);
807 match dictionary {
808 Some(dictionary) => dictionary_array_from_json(
809 field,
810 json_col,
811 key_type,
812 value_type,
813 dictionary,
814 dictionaries,
815 ),
816 None => Err(ArrowError::JsonError(format!(
817 "Unable to find dictionary for field {field:?}"
818 ))),
819 }
820 }
821 DataType::Decimal32(precision, scale) => {
822 let mut b = Decimal32Builder::with_capacity(json_col.count);
823 for (is_valid, value) in json_col
824 .validity
825 .as_ref()
826 .unwrap()
827 .iter()
828 .zip(json_col.data.unwrap())
829 {
830 match is_valid {
831 1 => b.append_value(value.as_str().unwrap().parse::<i32>().unwrap()),
832 _ => b.append_null(),
833 };
834 }
835 Ok(Arc::new(
836 b.finish().with_precision_and_scale(*precision, *scale)?,
837 ))
838 }
839 DataType::Decimal64(precision, scale) => {
840 let mut b = Decimal64Builder::with_capacity(json_col.count);
841 for (is_valid, value) in json_col
842 .validity
843 .as_ref()
844 .unwrap()
845 .iter()
846 .zip(json_col.data.unwrap())
847 {
848 match is_valid {
849 1 => b.append_value(value.as_str().unwrap().parse::<i64>().unwrap()),
850 _ => b.append_null(),
851 };
852 }
853 Ok(Arc::new(
854 b.finish().with_precision_and_scale(*precision, *scale)?,
855 ))
856 }
857 DataType::Decimal128(precision, scale) => {
858 let mut b = Decimal128Builder::with_capacity(json_col.count);
859 for (is_valid, value) in json_col
860 .validity
861 .as_ref()
862 .unwrap()
863 .iter()
864 .zip(json_col.data.unwrap())
865 {
866 match is_valid {
867 1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
868 _ => b.append_null(),
869 };
870 }
871 Ok(Arc::new(
872 b.finish().with_precision_and_scale(*precision, *scale)?,
873 ))
874 }
875 DataType::Decimal256(precision, scale) => {
876 let mut b = Decimal256Builder::with_capacity(json_col.count);
877 for (is_valid, value) in json_col
878 .validity
879 .as_ref()
880 .unwrap()
881 .iter()
882 .zip(json_col.data.unwrap())
883 {
884 match is_valid {
885 1 => {
886 let str = value.as_str().unwrap();
887 let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
888 let integer_bytes = integer.to_signed_bytes_le();
889 let mut bytes = if integer.is_positive() {
890 [0_u8; 32]
891 } else {
892 [255_u8; 32]
893 };
894 bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
895 b.append_value(i256::from_le_bytes(bytes));
896 }
897 _ => b.append_null(),
898 }
899 }
900 Ok(Arc::new(
901 b.finish().with_precision_and_scale(*precision, *scale)?,
902 ))
903 }
904 DataType::Map(child_field, _) => {
905 let null_buf = create_null_buf(&json_col);
906 let children = json_col.children.clone().unwrap();
907 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
908 let offsets: Vec<i32> = json_col
909 .offset
910 .unwrap()
911 .iter()
912 .map(|v| v.as_i64().unwrap() as i32)
913 .collect();
914 let array_data = ArrayData::builder(field.data_type().clone())
915 .len(json_col.count)
916 .add_buffer(Buffer::from(offsets.to_byte_slice()))
917 .add_child_data(child_array.into_data())
918 .null_bit_buffer(Some(null_buf))
919 .build()
920 .unwrap();
921
922 let array = MapArray::from(array_data);
923 Ok(Arc::new(array))
924 }
925 DataType::Union(fields, _) => {
926 let type_ids = if let Some(type_id) = json_col.type_id {
927 type_id
928 } else {
929 return Err(ArrowError::JsonError(
930 "Cannot find expected type_id in json column".to_string(),
931 ));
932 };
933
934 let offset: Option<ScalarBuffer<i32>> = json_col
935 .offset
936 .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
937
938 let mut children = Vec::with_capacity(fields.len());
939 for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
940 let array = array_from_json(field, col, dictionaries)?;
941 children.push(array);
942 }
943
944 let array =
945 UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
946 Ok(Arc::new(array))
947 }
948 t => Err(ArrowError::JsonError(format!(
949 "data type {t:?} not supported"
950 ))),
951 }
952}
953
954pub fn dictionary_array_from_json(
956 field: &Field,
957 json_col: ArrowJsonColumn,
958 dict_key: &DataType,
959 dict_value: &DataType,
960 dictionary: &ArrowJsonDictionaryBatch,
961 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
962) -> Result<ArrayRef> {
963 match dict_key {
964 DataType::Int8
965 | DataType::Int16
966 | DataType::Int32
967 | DataType::Int64
968 | DataType::UInt8
969 | DataType::UInt16
970 | DataType::UInt32
971 | DataType::UInt64 => {
972 let null_buf = create_null_buf(&json_col);
973
974 #[allow(deprecated)]
976 let key_field = Field::new_dict(
977 "key",
978 dict_key.clone(),
979 field.is_nullable(),
980 #[allow(deprecated)]
981 field
982 .dict_id()
983 .expect("Dictionary fields must have a dict_id value"),
984 field
985 .dict_is_ordered()
986 .expect("Dictionary fields must have a dict_is_ordered value"),
987 );
988 let keys = array_from_json(&key_field, json_col, None)?;
989 let value_field = Field::new("value", dict_value.clone(), true);
991 let values = array_from_json(
992 &value_field,
993 dictionary.data.columns[0].clone(),
994 dictionaries,
995 )?;
996
997 let dict_data = ArrayData::builder(field.data_type().clone())
999 .len(keys.len())
1000 .add_buffer(keys.to_data().buffers()[0].clone())
1001 .null_bit_buffer(Some(null_buf))
1002 .add_child_data(values.into_data())
1003 .build()
1004 .unwrap();
1005
1006 let array = match dict_key {
1007 DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
1008 DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
1009 DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
1010 DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
1011 DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
1012 DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
1013 DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
1014 DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
1015 _ => unreachable!(),
1016 };
1017 Ok(array)
1018 }
1019 _ => Err(ArrowError::JsonError(format!(
1020 "Dictionary key type {dict_key:?} not supported"
1021 ))),
1022 }
1023}
1024
1025fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
1027 let num_bytes = bit_util::ceil(json_col.count, 8);
1028 let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
1029 json_col
1030 .validity
1031 .clone()
1032 .unwrap()
1033 .iter()
1034 .enumerate()
1035 .for_each(|(i, v)| {
1036 let null_slice = null_buf.as_slice_mut();
1037 if *v != 0 {
1038 bit_util::set_bit(null_slice, i);
1039 }
1040 });
1041 null_buf.into()
1042}
1043
1044impl ArrowJsonBatch {
1045 pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1047 let mut json_batch = ArrowJsonBatch {
1048 count: batch.num_rows(),
1049 columns: Vec::with_capacity(batch.num_columns()),
1050 };
1051
1052 for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1053 let json_col = match field.data_type() {
1054 DataType::Int8 => {
1055 let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1056
1057 let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1058 let mut data: Vec<Value> = Vec::with_capacity(col.len());
1059
1060 for i in 0..col.len() {
1061 if col.is_null(i) {
1062 validity.push(1);
1063 data.push(0i8.into());
1064 } else {
1065 validity.push(0);
1066 data.push(col.value(i).into());
1067 }
1068 }
1069
1070 ArrowJsonColumn {
1071 name: field.name().clone(),
1072 count: col.len(),
1073 validity: Some(validity),
1074 data: Some(data),
1075 offset: None,
1076 type_id: None,
1077 children: None,
1078 }
1079 }
1080 _ => ArrowJsonColumn {
1081 name: field.name().clone(),
1082 count: col.len(),
1083 validity: None,
1084 data: None,
1085 offset: None,
1086 type_id: None,
1087 children: None,
1088 },
1089 };
1090
1091 json_batch.columns.push(json_col);
1092 }
1093
1094 json_batch
1095 }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use super::*;
1101
1102 use std::fs::File;
1103 use std::io::Read;
1104
1105 #[test]
1106 fn test_schema_equality() {
1107 let json = r#"
1108 {
1109 "fields": [
1110 {
1111 "name": "c1",
1112 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1113 "nullable": true,
1114 "children": []
1115 },
1116 {
1117 "name": "c2",
1118 "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1119 "nullable": true,
1120 "children": []
1121 },
1122 {
1123 "name": "c3",
1124 "type": {"name": "utf8"},
1125 "nullable": true,
1126 "children": []
1127 },
1128 {
1129 "name": "c4",
1130 "type": {
1131 "name": "list"
1132 },
1133 "nullable": true,
1134 "children": [
1135 {
1136 "name": "custom_item",
1137 "type": {
1138 "name": "int",
1139 "isSigned": true,
1140 "bitWidth": 32
1141 },
1142 "nullable": false,
1143 "children": []
1144 }
1145 ]
1146 }
1147 ]
1148 }"#;
1149 let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1150 let schema = Schema::new(vec![
1151 Field::new("c1", DataType::Int32, true),
1152 Field::new("c2", DataType::Float64, true),
1153 Field::new("c3", DataType::Utf8, true),
1154 Field::new(
1155 "c4",
1156 DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1157 true,
1158 ),
1159 ]);
1160 assert!(json_schema.equals_schema(&schema));
1161 }
1162
1163 #[test]
1164 fn test_arrow_data_equality() {
1165 let secs_tz = Some("Europe/Budapest".into());
1166 let millis_tz = Some("America/New_York".into());
1167 let micros_tz = Some("UTC".into());
1168 let nanos_tz = Some("Africa/Johannesburg".into());
1169
1170 let schema = Schema::new(vec![
1171 Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1172 [("k".to_string(), "v".to_string())]
1173 .iter()
1174 .cloned()
1175 .collect(),
1176 ),
1177 Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1178 [("k2".to_string(), "v2".to_string())]
1179 .iter()
1180 .cloned()
1181 .collect(),
1182 ),
1183 Field::new("bools", DataType::Boolean, true),
1184 Field::new("int8s", DataType::Int8, true),
1185 Field::new("int16s", DataType::Int16, true),
1186 Field::new("int32s", DataType::Int32, true),
1187 Field::new("int64s", DataType::Int64, true),
1188 Field::new("uint8s", DataType::UInt8, true),
1189 Field::new("uint16s", DataType::UInt16, true),
1190 Field::new("uint32s", DataType::UInt32, true),
1191 Field::new("uint64s", DataType::UInt64, true),
1192 Field::new("float32s", DataType::Float32, true),
1193 Field::new("float64s", DataType::Float64, true),
1194 Field::new("date_days", DataType::Date32, true),
1195 Field::new("date_millis", DataType::Date64, true),
1196 Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1197 Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1198 Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1199 Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1200 Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1201 Field::new(
1202 "ts_millis",
1203 DataType::Timestamp(TimeUnit::Millisecond, None),
1204 true,
1205 ),
1206 Field::new(
1207 "ts_micros",
1208 DataType::Timestamp(TimeUnit::Microsecond, None),
1209 true,
1210 ),
1211 Field::new(
1212 "ts_nanos",
1213 DataType::Timestamp(TimeUnit::Nanosecond, None),
1214 true,
1215 ),
1216 Field::new(
1217 "ts_secs_tz",
1218 DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1219 true,
1220 ),
1221 Field::new(
1222 "ts_millis_tz",
1223 DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1224 true,
1225 ),
1226 Field::new(
1227 "ts_micros_tz",
1228 DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1229 true,
1230 ),
1231 Field::new(
1232 "ts_nanos_tz",
1233 DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1234 true,
1235 ),
1236 Field::new("utf8s", DataType::Utf8, true),
1237 Field::new(
1238 "lists",
1239 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1240 true,
1241 ),
1242 Field::new(
1243 "structs",
1244 DataType::Struct(Fields::from(vec![
1245 Field::new("int32s", DataType::Int32, true),
1246 Field::new("utf8s", DataType::Utf8, true),
1247 ])),
1248 true,
1249 ),
1250 ]);
1251
1252 let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1253 let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1254 let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1255 let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1256 let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1257 let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1258 let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1259 let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1260 let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1261 let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1262 let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1263 let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1264 let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1265 let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1266 let date_millis = Date64Array::from(vec![
1267 Some(167903550396207),
1268 Some(29923997007884),
1269 Some(30612271819236),
1270 ]);
1271 let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1272 let time_millis =
1273 Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1274 let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1275 let time_nanos =
1276 Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1277 let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1278 let ts_millis =
1279 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1280 let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1281 let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1282 let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1283 .with_timezone_opt(secs_tz);
1284 let ts_millis_tz =
1285 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1286 .with_timezone_opt(millis_tz);
1287 let ts_micros_tz =
1288 TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1289 let ts_nanos_tz =
1290 TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1291 .with_timezone_opt(nanos_tz);
1292 let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1293
1294 let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1295 let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1296 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1297 let list_data = ArrayData::builder(list_data_type)
1298 .len(3)
1299 .add_buffer(value_offsets)
1300 .add_child_data(value_data.into_data())
1301 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1302 .build()
1303 .unwrap();
1304 let lists = ListArray::from(list_data);
1305
1306 let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1307 let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1308 let struct_data_type = DataType::Struct(Fields::from(vec![
1309 Field::new("int32s", DataType::Int32, true),
1310 Field::new("utf8s", DataType::Utf8, true),
1311 ]));
1312 let struct_data = ArrayData::builder(struct_data_type)
1313 .len(3)
1314 .add_child_data(structs_int32s.into_data())
1315 .add_child_data(structs_utf8s.into_data())
1316 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1317 .build()
1318 .unwrap();
1319 let structs = StructArray::from(struct_data);
1320
1321 let record_batch = RecordBatch::try_new(
1322 Arc::new(schema.clone()),
1323 vec![
1324 Arc::new(bools_with_metadata_map),
1325 Arc::new(bools_with_metadata_vec),
1326 Arc::new(bools),
1327 Arc::new(int8s),
1328 Arc::new(int16s),
1329 Arc::new(int32s),
1330 Arc::new(int64s),
1331 Arc::new(uint8s),
1332 Arc::new(uint16s),
1333 Arc::new(uint32s),
1334 Arc::new(uint64s),
1335 Arc::new(float32s),
1336 Arc::new(float64s),
1337 Arc::new(date_days),
1338 Arc::new(date_millis),
1339 Arc::new(time_secs),
1340 Arc::new(time_millis),
1341 Arc::new(time_micros),
1342 Arc::new(time_nanos),
1343 Arc::new(ts_secs),
1344 Arc::new(ts_millis),
1345 Arc::new(ts_micros),
1346 Arc::new(ts_nanos),
1347 Arc::new(ts_secs_tz),
1348 Arc::new(ts_millis_tz),
1349 Arc::new(ts_micros_tz),
1350 Arc::new(ts_nanos_tz),
1351 Arc::new(utf8s),
1352 Arc::new(lists),
1353 Arc::new(structs),
1354 ],
1355 )
1356 .unwrap();
1357 let mut file = File::open("data/integration.json").unwrap();
1358 let mut json = String::new();
1359 file.read_to_string(&mut json).unwrap();
1360 let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1361 assert!(arrow_json.schema.equals_schema(&schema));
1363 assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1365 }
1366}