1#![doc(
25 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
26 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
27)]
28#![cfg_attr(docsrs, feature(doc_auto_cfg))]
29#![warn(missing_docs)]
30use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
31use hex::decode;
32use num::BigInt;
33use num::Signed;
34use serde::{Deserialize, Serialize};
35use serde_json::{Map as SJMap, Value};
36use std::collections::HashMap;
37use std::sync::Arc;
38
39use arrow::array::*;
40use arrow::buffer::{Buffer, MutableBuffer};
41use arrow::datatypes::*;
42use arrow::error::{ArrowError, Result};
43use arrow::util::bit_util;
44
45mod datatype;
46mod field;
47mod schema;
48
49pub use datatype::*;
50pub use field::*;
51pub use schema::*;
52
53#[derive(Deserialize, Serialize, Debug)]
57pub struct ArrowJson {
58 pub schema: ArrowJsonSchema,
60 pub batches: Vec<ArrowJsonBatch>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
65}
66
67#[derive(Deserialize, Serialize, Debug)]
71pub struct ArrowJsonSchema {
72 pub fields: Vec<ArrowJsonField>,
74 #[serde(skip_serializing_if = "Option::is_none")]
76 pub metadata: Option<Vec<HashMap<String, String>>>,
77}
78
79#[derive(Deserialize, Serialize, Debug)]
81pub struct ArrowJsonField {
82 pub name: String,
84 #[serde(rename = "type")]
87 pub field_type: Value,
88 pub nullable: bool,
90 pub children: Vec<ArrowJsonField>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub dictionary: Option<ArrowJsonFieldDictionary>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub metadata: Option<Value>,
98}
99
100impl From<&FieldRef> for ArrowJsonField {
101 fn from(value: &FieldRef) -> Self {
102 Self::from(value.as_ref())
103 }
104}
105
106impl From<&Field> for ArrowJsonField {
107 fn from(field: &Field) -> Self {
108 let metadata_value = match field.metadata().is_empty() {
109 false => {
110 let mut array = Vec::new();
111 for (k, v) in field.metadata() {
112 let mut kv_map = SJMap::new();
113 kv_map.insert(k.clone(), Value::String(v.clone()));
114 array.push(Value::Object(kv_map));
115 }
116 if !array.is_empty() {
117 Some(Value::Array(array))
118 } else {
119 None
120 }
121 }
122 _ => None,
123 };
124
125 Self {
126 name: field.name().to_string(),
127 field_type: data_type_to_json(field.data_type()),
128 nullable: field.is_nullable(),
129 children: vec![],
130 dictionary: None, metadata: metadata_value,
132 }
133 }
134}
135
136#[derive(Deserialize, Serialize, Debug)]
138pub struct ArrowJsonFieldDictionary {
139 pub id: i64,
141 #[serde(rename = "indexType")]
143 pub index_type: DictionaryIndexType,
144 #[serde(rename = "isOrdered")]
146 pub is_ordered: bool,
147}
148
149#[derive(Deserialize, Serialize, Debug)]
151pub struct DictionaryIndexType {
152 pub name: String,
154 #[serde(rename = "isSigned")]
156 pub is_signed: bool,
157 #[serde(rename = "bitWidth")]
159 pub bit_width: i64,
160}
161
162#[derive(Deserialize, Serialize, Debug, Clone)]
164pub struct ArrowJsonBatch {
165 count: usize,
166 pub columns: Vec<ArrowJsonColumn>,
168}
169
170#[derive(Deserialize, Serialize, Debug, Clone)]
172#[allow(non_snake_case)]
173pub struct ArrowJsonDictionaryBatch {
174 pub id: i64,
176 pub data: ArrowJsonBatch,
178}
179
180#[derive(Deserialize, Serialize, Clone, Debug)]
182pub struct ArrowJsonColumn {
183 name: String,
184 pub count: usize,
186 #[serde(rename = "VALIDITY")]
188 pub validity: Option<Vec<u8>>,
189 #[serde(rename = "DATA")]
191 pub data: Option<Vec<Value>>,
192 #[serde(rename = "OFFSET")]
194 pub offset: Option<Vec<Value>>, #[serde(rename = "TYPE_ID")]
197 pub type_id: Option<Vec<i8>>,
198 pub children: Option<Vec<ArrowJsonColumn>>,
200}
201
202impl ArrowJson {
203 pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
205 if !self.schema.equals_schema(&reader.schema()) {
206 return Ok(false);
207 }
208
209 for json_batch in self.get_record_batches()?.into_iter() {
210 let batch = reader.next();
211 match batch {
212 Some(Ok(batch)) => {
213 if json_batch != batch {
214 println!("json: {json_batch:?}");
215 println!("batch: {batch:?}");
216 return Ok(false);
217 }
218 }
219 Some(Err(e)) => return Err(e),
220 None => return Ok(false),
221 }
222 }
223
224 Ok(true)
225 }
226
227 pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
229 let schema = self.schema.to_arrow_schema()?;
230
231 let mut dictionaries = HashMap::new();
232 self.dictionaries.iter().for_each(|dict_batches| {
233 dict_batches.iter().for_each(|d| {
234 dictionaries.insert(d.id, d.clone());
235 });
236 });
237
238 let batches: Result<Vec<_>> = self
239 .batches
240 .iter()
241 .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
242 .collect();
243
244 batches
245 }
246}
247
248impl ArrowJsonSchema {
249 fn equals_schema(&self, schema: &Schema) -> bool {
251 let field_len = self.fields.len();
252 if field_len != schema.fields().len() {
253 return false;
254 }
255 for i in 0..field_len {
256 let json_field = &self.fields[i];
257 let field = schema.field(i);
258 if !json_field.equals_field(field) {
259 return false;
260 }
261 }
262 true
263 }
264
265 fn to_arrow_schema(&self) -> Result<Schema> {
266 let arrow_fields: Result<Vec<_>> = self
267 .fields
268 .iter()
269 .map(|field| field.to_arrow_field())
270 .collect();
271
272 if let Some(metadatas) = &self.metadata {
273 let mut metadata: HashMap<String, String> = HashMap::new();
274
275 metadatas.iter().for_each(|pair| {
276 let key = pair.get("key").unwrap();
277 let value = pair.get("value").unwrap();
278 metadata.insert(key.clone(), value.clone());
279 });
280
281 Ok(Schema::new_with_metadata(arrow_fields?, metadata))
282 } else {
283 Ok(Schema::new(arrow_fields?))
284 }
285 }
286}
287
288impl ArrowJsonField {
289 fn equals_field(&self, field: &Field) -> bool {
291 match self.to_arrow_field() {
293 Ok(self_field) => {
294 assert_eq!(&self_field, field, "Arrow fields not the same");
295 true
296 }
297 Err(e) => {
298 eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
299 false
300 }
301 }
302 }
303
304 fn to_arrow_field(&self) -> Result<Field> {
307 let field =
309 serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
310 field_from_json(&field)
311 }
312}
313
314pub fn record_batch_from_json(
316 schema: &Schema,
317 json_batch: ArrowJsonBatch,
318 json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
319) -> Result<RecordBatch> {
320 let mut columns = vec![];
321
322 for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
323 let col = array_from_json(field, json_col, json_dictionaries)?;
324 columns.push(col);
325 }
326
327 RecordBatch::try_new(Arc::new(schema.clone()), columns)
328}
329
330pub fn array_from_json(
332 field: &Field,
333 json_col: ArrowJsonColumn,
334 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
335) -> Result<ArrayRef> {
336 match field.data_type() {
337 DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
338 DataType::Boolean => {
339 let mut b = BooleanBuilder::with_capacity(json_col.count);
340 for (is_valid, value) in json_col
341 .validity
342 .as_ref()
343 .unwrap()
344 .iter()
345 .zip(json_col.data.unwrap())
346 {
347 match is_valid {
348 1 => b.append_value(value.as_bool().unwrap()),
349 _ => b.append_null(),
350 };
351 }
352 Ok(Arc::new(b.finish()))
353 }
354 DataType::Int8 => {
355 let mut b = Int8Builder::with_capacity(json_col.count);
356 for (is_valid, value) in json_col
357 .validity
358 .as_ref()
359 .unwrap()
360 .iter()
361 .zip(json_col.data.unwrap())
362 {
363 match is_valid {
364 1 => b.append_value(value.as_i64().ok_or_else(|| {
365 ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
366 })? as i8),
367 _ => b.append_null(),
368 };
369 }
370 Ok(Arc::new(b.finish()))
371 }
372 DataType::Int16 => {
373 let mut b = Int16Builder::with_capacity(json_col.count);
374 for (is_valid, value) in json_col
375 .validity
376 .as_ref()
377 .unwrap()
378 .iter()
379 .zip(json_col.data.unwrap())
380 {
381 match is_valid {
382 1 => b.append_value(value.as_i64().unwrap() as i16),
383 _ => b.append_null(),
384 };
385 }
386 Ok(Arc::new(b.finish()))
387 }
388 DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
389 let mut b = Int32Builder::with_capacity(json_col.count);
390 for (is_valid, value) in json_col
391 .validity
392 .as_ref()
393 .unwrap()
394 .iter()
395 .zip(json_col.data.unwrap())
396 {
397 match is_valid {
398 1 => b.append_value(value.as_i64().unwrap() as i32),
399 _ => b.append_null(),
400 };
401 }
402 let array = Arc::new(b.finish()) as ArrayRef;
403 arrow::compute::cast(&array, field.data_type())
404 }
405 DataType::Interval(IntervalUnit::YearMonth) => {
406 let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
407 for (is_valid, value) in json_col
408 .validity
409 .as_ref()
410 .unwrap()
411 .iter()
412 .zip(json_col.data.unwrap())
413 {
414 match is_valid {
415 1 => b.append_value(value.as_i64().unwrap() as i32),
416 _ => b.append_null(),
417 };
418 }
419 Ok(Arc::new(b.finish()))
420 }
421 DataType::Int64
422 | DataType::Date64
423 | DataType::Time64(_)
424 | DataType::Timestamp(_, _)
425 | DataType::Duration(_) => {
426 let mut b = Int64Builder::with_capacity(json_col.count);
427 for (is_valid, value) in json_col
428 .validity
429 .as_ref()
430 .unwrap()
431 .iter()
432 .zip(json_col.data.unwrap())
433 {
434 match is_valid {
435 1 => b.append_value(match value {
436 Value::Number(n) => n.as_i64().unwrap(),
437 Value::String(s) => s.parse().expect("Unable to parse string as i64"),
438 _ => panic!("Unable to parse {value:?} as number"),
439 }),
440 _ => b.append_null(),
441 };
442 }
443 let array = Arc::new(b.finish()) as ArrayRef;
444 arrow::compute::cast(&array, field.data_type())
445 }
446 DataType::Interval(IntervalUnit::DayTime) => {
447 let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
448 for (is_valid, value) in json_col
449 .validity
450 .as_ref()
451 .unwrap()
452 .iter()
453 .zip(json_col.data.unwrap())
454 {
455 match is_valid {
456 1 => b.append_value(match value {
457 Value::Object(ref map)
458 if map.contains_key("days") && map.contains_key("milliseconds") =>
459 {
460 match field.data_type() {
461 DataType::Interval(IntervalUnit::DayTime) => {
462 let days = map.get("days").unwrap();
463 let milliseconds = map.get("milliseconds").unwrap();
464
465 match (days, milliseconds) {
466 (Value::Number(d), Value::Number(m)) => {
467 let days = d.as_i64().unwrap() as _;
468 let millis = m.as_i64().unwrap() as _;
469 IntervalDayTime::new(days, millis)
470 }
471 _ => {
472 panic!("Unable to parse {value:?} as interval daytime")
473 }
474 }
475 }
476 _ => panic!("Unable to parse {value:?} as interval daytime"),
477 }
478 }
479 _ => panic!("Unable to parse {value:?} as number"),
480 }),
481 _ => b.append_null(),
482 };
483 }
484 Ok(Arc::new(b.finish()))
485 }
486 DataType::UInt8 => {
487 let mut b = UInt8Builder::with_capacity(json_col.count);
488 for (is_valid, value) in json_col
489 .validity
490 .as_ref()
491 .unwrap()
492 .iter()
493 .zip(json_col.data.unwrap())
494 {
495 match is_valid {
496 1 => b.append_value(value.as_u64().unwrap() as u8),
497 _ => b.append_null(),
498 };
499 }
500 Ok(Arc::new(b.finish()))
501 }
502 DataType::UInt16 => {
503 let mut b = UInt16Builder::with_capacity(json_col.count);
504 for (is_valid, value) in json_col
505 .validity
506 .as_ref()
507 .unwrap()
508 .iter()
509 .zip(json_col.data.unwrap())
510 {
511 match is_valid {
512 1 => b.append_value(value.as_u64().unwrap() as u16),
513 _ => b.append_null(),
514 };
515 }
516 Ok(Arc::new(b.finish()))
517 }
518 DataType::UInt32 => {
519 let mut b = UInt32Builder::with_capacity(json_col.count);
520 for (is_valid, value) in json_col
521 .validity
522 .as_ref()
523 .unwrap()
524 .iter()
525 .zip(json_col.data.unwrap())
526 {
527 match is_valid {
528 1 => b.append_value(value.as_u64().unwrap() as u32),
529 _ => b.append_null(),
530 };
531 }
532 Ok(Arc::new(b.finish()))
533 }
534 DataType::UInt64 => {
535 let mut b = UInt64Builder::with_capacity(json_col.count);
536 for (is_valid, value) in json_col
537 .validity
538 .as_ref()
539 .unwrap()
540 .iter()
541 .zip(json_col.data.unwrap())
542 {
543 match is_valid {
544 1 => {
545 if value.is_string() {
546 b.append_value(
547 value
548 .as_str()
549 .unwrap()
550 .parse()
551 .expect("Unable to parse string as u64"),
552 )
553 } else if value.is_number() {
554 b.append_value(value.as_u64().expect("Unable to read number as u64"))
555 } else {
556 panic!("Unable to parse value {value:?} as u64")
557 }
558 }
559 _ => b.append_null(),
560 };
561 }
562 Ok(Arc::new(b.finish()))
563 }
564 DataType::Interval(IntervalUnit::MonthDayNano) => {
565 let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
566 for (is_valid, value) in json_col
567 .validity
568 .as_ref()
569 .unwrap()
570 .iter()
571 .zip(json_col.data.unwrap())
572 {
573 match is_valid {
574 1 => b.append_value(match value {
575 Value::Object(v) => {
576 let months = v.get("months").unwrap();
577 let days = v.get("days").unwrap();
578 let nanoseconds = v.get("nanoseconds").unwrap();
579 match (months, days, nanoseconds) {
580 (
581 Value::Number(months),
582 Value::Number(days),
583 Value::Number(nanoseconds),
584 ) => {
585 let months = months.as_i64().unwrap() as i32;
586 let days = days.as_i64().unwrap() as i32;
587 let nanoseconds = nanoseconds.as_i64().unwrap();
588 IntervalMonthDayNano::new(months, days, nanoseconds)
589 }
590 (_, _, _) => {
591 panic!("Unable to parse {v:?} as MonthDayNano")
592 }
593 }
594 }
595 _ => panic!("Unable to parse {value:?} as MonthDayNano"),
596 }),
597 _ => b.append_null(),
598 };
599 }
600 Ok(Arc::new(b.finish()))
601 }
602 DataType::Float32 => {
603 let mut b = Float32Builder::with_capacity(json_col.count);
604 for (is_valid, value) in json_col
605 .validity
606 .as_ref()
607 .unwrap()
608 .iter()
609 .zip(json_col.data.unwrap())
610 {
611 match is_valid {
612 1 => b.append_value(value.as_f64().unwrap() as f32),
613 _ => b.append_null(),
614 };
615 }
616 Ok(Arc::new(b.finish()))
617 }
618 DataType::Float64 => {
619 let mut b = Float64Builder::with_capacity(json_col.count);
620 for (is_valid, value) in json_col
621 .validity
622 .as_ref()
623 .unwrap()
624 .iter()
625 .zip(json_col.data.unwrap())
626 {
627 match is_valid {
628 1 => b.append_value(value.as_f64().unwrap()),
629 _ => b.append_null(),
630 };
631 }
632 Ok(Arc::new(b.finish()))
633 }
634 DataType::Binary => {
635 let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
636 for (is_valid, value) in json_col
637 .validity
638 .as_ref()
639 .unwrap()
640 .iter()
641 .zip(json_col.data.unwrap())
642 {
643 match is_valid {
644 1 => {
645 let v = decode(value.as_str().unwrap()).unwrap();
646 b.append_value(&v)
647 }
648 _ => b.append_null(),
649 };
650 }
651 Ok(Arc::new(b.finish()))
652 }
653 DataType::LargeBinary => {
654 let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
655 for (is_valid, value) in json_col
656 .validity
657 .as_ref()
658 .unwrap()
659 .iter()
660 .zip(json_col.data.unwrap())
661 {
662 match is_valid {
663 1 => {
664 let v = decode(value.as_str().unwrap()).unwrap();
665 b.append_value(&v)
666 }
667 _ => b.append_null(),
668 };
669 }
670 Ok(Arc::new(b.finish()))
671 }
672 DataType::Utf8 => {
673 let mut b = StringBuilder::with_capacity(json_col.count, 1024);
674 for (is_valid, value) in json_col
675 .validity
676 .as_ref()
677 .unwrap()
678 .iter()
679 .zip(json_col.data.unwrap())
680 {
681 match is_valid {
682 1 => b.append_value(value.as_str().unwrap()),
683 _ => b.append_null(),
684 };
685 }
686 Ok(Arc::new(b.finish()))
687 }
688 DataType::LargeUtf8 => {
689 let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
690 for (is_valid, value) in json_col
691 .validity
692 .as_ref()
693 .unwrap()
694 .iter()
695 .zip(json_col.data.unwrap())
696 {
697 match is_valid {
698 1 => b.append_value(value.as_str().unwrap()),
699 _ => b.append_null(),
700 };
701 }
702 Ok(Arc::new(b.finish()))
703 }
704 DataType::FixedSizeBinary(len) => {
705 let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
706 for (is_valid, value) in json_col
707 .validity
708 .as_ref()
709 .unwrap()
710 .iter()
711 .zip(json_col.data.unwrap())
712 {
713 match is_valid {
714 1 => {
715 let v = hex::decode(value.as_str().unwrap()).unwrap();
716 b.append_value(&v)?
717 }
718 _ => b.append_null(),
719 };
720 }
721 Ok(Arc::new(b.finish()))
722 }
723 DataType::List(child_field) => {
724 let null_buf = create_null_buf(&json_col);
725 let children = json_col.children.clone().unwrap();
726 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
727 let offsets: Vec<i32> = json_col
728 .offset
729 .unwrap()
730 .iter()
731 .map(|v| v.as_i64().unwrap() as i32)
732 .collect();
733 let list_data = ArrayData::builder(field.data_type().clone())
734 .len(json_col.count)
735 .offset(0)
736 .add_buffer(Buffer::from(offsets.to_byte_slice()))
737 .add_child_data(child_array.into_data())
738 .null_bit_buffer(Some(null_buf))
739 .build()
740 .unwrap();
741 Ok(Arc::new(ListArray::from(list_data)))
742 }
743 DataType::LargeList(child_field) => {
744 let null_buf = create_null_buf(&json_col);
745 let children = json_col.children.clone().unwrap();
746 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
747 let offsets: Vec<i64> = json_col
748 .offset
749 .unwrap()
750 .iter()
751 .map(|v| match v {
752 Value::Number(n) => n.as_i64().unwrap(),
753 Value::String(s) => s.parse::<i64>().unwrap(),
754 _ => panic!("64-bit offset must be either string or number"),
755 })
756 .collect();
757 let list_data = ArrayData::builder(field.data_type().clone())
758 .len(json_col.count)
759 .offset(0)
760 .add_buffer(Buffer::from(offsets.to_byte_slice()))
761 .add_child_data(child_array.into_data())
762 .null_bit_buffer(Some(null_buf))
763 .build()
764 .unwrap();
765 Ok(Arc::new(LargeListArray::from(list_data)))
766 }
767 DataType::FixedSizeList(child_field, _) => {
768 let children = json_col.children.clone().unwrap();
769 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
770 let null_buf = create_null_buf(&json_col);
771 let list_data = ArrayData::builder(field.data_type().clone())
772 .len(json_col.count)
773 .add_child_data(child_array.into_data())
774 .null_bit_buffer(Some(null_buf))
775 .build()
776 .unwrap();
777 Ok(Arc::new(FixedSizeListArray::from(list_data)))
778 }
779 DataType::Struct(fields) => {
780 let null_buf = create_null_buf(&json_col);
782 let mut array_data = ArrayData::builder(field.data_type().clone())
783 .len(json_col.count)
784 .null_bit_buffer(Some(null_buf));
785
786 for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
787 let array = array_from_json(field, col, dictionaries)?;
788 array_data = array_data.add_child_data(array.into_data());
789 }
790
791 let array = StructArray::from(array_data.build().unwrap());
792 Ok(Arc::new(array))
793 }
794 DataType::Dictionary(key_type, value_type) => {
795 #[allow(deprecated)]
796 let dict_id = field.dict_id().ok_or_else(|| {
797 ArrowError::JsonError(format!("Unable to find dict_id for field {field:?}"))
798 })?;
799 let dictionary = dictionaries
801 .ok_or_else(|| {
802 ArrowError::JsonError(format!(
803 "Unable to find any dictionaries for field {field:?}"
804 ))
805 })?
806 .get(&dict_id);
807 match dictionary {
808 Some(dictionary) => dictionary_array_from_json(
809 field,
810 json_col,
811 key_type,
812 value_type,
813 dictionary,
814 dictionaries,
815 ),
816 None => Err(ArrowError::JsonError(format!(
817 "Unable to find dictionary for field {field:?}"
818 ))),
819 }
820 }
821 DataType::Decimal128(precision, scale) => {
822 let mut b = Decimal128Builder::with_capacity(json_col.count);
823 for (is_valid, value) in json_col
824 .validity
825 .as_ref()
826 .unwrap()
827 .iter()
828 .zip(json_col.data.unwrap())
829 {
830 match is_valid {
831 1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
832 _ => b.append_null(),
833 };
834 }
835 Ok(Arc::new(
836 b.finish().with_precision_and_scale(*precision, *scale)?,
837 ))
838 }
839 DataType::Decimal256(precision, scale) => {
840 let mut b = Decimal256Builder::with_capacity(json_col.count);
841 for (is_valid, value) in json_col
842 .validity
843 .as_ref()
844 .unwrap()
845 .iter()
846 .zip(json_col.data.unwrap())
847 {
848 match is_valid {
849 1 => {
850 let str = value.as_str().unwrap();
851 let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
852 let integer_bytes = integer.to_signed_bytes_le();
853 let mut bytes = if integer.is_positive() {
854 [0_u8; 32]
855 } else {
856 [255_u8; 32]
857 };
858 bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
859 b.append_value(i256::from_le_bytes(bytes));
860 }
861 _ => b.append_null(),
862 }
863 }
864 Ok(Arc::new(
865 b.finish().with_precision_and_scale(*precision, *scale)?,
866 ))
867 }
868 DataType::Map(child_field, _) => {
869 let null_buf = create_null_buf(&json_col);
870 let children = json_col.children.clone().unwrap();
871 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
872 let offsets: Vec<i32> = json_col
873 .offset
874 .unwrap()
875 .iter()
876 .map(|v| v.as_i64().unwrap() as i32)
877 .collect();
878 let array_data = ArrayData::builder(field.data_type().clone())
879 .len(json_col.count)
880 .add_buffer(Buffer::from(offsets.to_byte_slice()))
881 .add_child_data(child_array.into_data())
882 .null_bit_buffer(Some(null_buf))
883 .build()
884 .unwrap();
885
886 let array = MapArray::from(array_data);
887 Ok(Arc::new(array))
888 }
889 DataType::Union(fields, _) => {
890 let type_ids = if let Some(type_id) = json_col.type_id {
891 type_id
892 } else {
893 return Err(ArrowError::JsonError(
894 "Cannot find expected type_id in json column".to_string(),
895 ));
896 };
897
898 let offset: Option<ScalarBuffer<i32>> = json_col
899 .offset
900 .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
901
902 let mut children = Vec::with_capacity(fields.len());
903 for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
904 let array = array_from_json(field, col, dictionaries)?;
905 children.push(array);
906 }
907
908 let array =
909 UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
910 Ok(Arc::new(array))
911 }
912 t => Err(ArrowError::JsonError(format!(
913 "data type {t:?} not supported"
914 ))),
915 }
916}
917
918pub fn dictionary_array_from_json(
920 field: &Field,
921 json_col: ArrowJsonColumn,
922 dict_key: &DataType,
923 dict_value: &DataType,
924 dictionary: &ArrowJsonDictionaryBatch,
925 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
926) -> Result<ArrayRef> {
927 match dict_key {
928 DataType::Int8
929 | DataType::Int16
930 | DataType::Int32
931 | DataType::Int64
932 | DataType::UInt8
933 | DataType::UInt16
934 | DataType::UInt32
935 | DataType::UInt64 => {
936 let null_buf = create_null_buf(&json_col);
937
938 #[allow(deprecated)]
940 let key_field = Field::new_dict(
941 "key",
942 dict_key.clone(),
943 field.is_nullable(),
944 #[allow(deprecated)]
945 field
946 .dict_id()
947 .expect("Dictionary fields must have a dict_id value"),
948 field
949 .dict_is_ordered()
950 .expect("Dictionary fields must have a dict_is_ordered value"),
951 );
952 let keys = array_from_json(&key_field, json_col, None)?;
953 let value_field = Field::new("value", dict_value.clone(), true);
955 let values = array_from_json(
956 &value_field,
957 dictionary.data.columns[0].clone(),
958 dictionaries,
959 )?;
960
961 let dict_data = ArrayData::builder(field.data_type().clone())
963 .len(keys.len())
964 .add_buffer(keys.to_data().buffers()[0].clone())
965 .null_bit_buffer(Some(null_buf))
966 .add_child_data(values.into_data())
967 .build()
968 .unwrap();
969
970 let array = match dict_key {
971 DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
972 DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
973 DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
974 DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
975 DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
976 DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
977 DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
978 DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
979 _ => unreachable!(),
980 };
981 Ok(array)
982 }
983 _ => Err(ArrowError::JsonError(format!(
984 "Dictionary key type {dict_key:?} not supported"
985 ))),
986 }
987}
988
989fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
991 let num_bytes = bit_util::ceil(json_col.count, 8);
992 let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
993 json_col
994 .validity
995 .clone()
996 .unwrap()
997 .iter()
998 .enumerate()
999 .for_each(|(i, v)| {
1000 let null_slice = null_buf.as_slice_mut();
1001 if *v != 0 {
1002 bit_util::set_bit(null_slice, i);
1003 }
1004 });
1005 null_buf.into()
1006}
1007
1008impl ArrowJsonBatch {
1009 pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1011 let mut json_batch = ArrowJsonBatch {
1012 count: batch.num_rows(),
1013 columns: Vec::with_capacity(batch.num_columns()),
1014 };
1015
1016 for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1017 let json_col = match field.data_type() {
1018 DataType::Int8 => {
1019 let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1020
1021 let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1022 let mut data: Vec<Value> = Vec::with_capacity(col.len());
1023
1024 for i in 0..col.len() {
1025 if col.is_null(i) {
1026 validity.push(1);
1027 data.push(0i8.into());
1028 } else {
1029 validity.push(0);
1030 data.push(col.value(i).into());
1031 }
1032 }
1033
1034 ArrowJsonColumn {
1035 name: field.name().clone(),
1036 count: col.len(),
1037 validity: Some(validity),
1038 data: Some(data),
1039 offset: None,
1040 type_id: None,
1041 children: None,
1042 }
1043 }
1044 _ => ArrowJsonColumn {
1045 name: field.name().clone(),
1046 count: col.len(),
1047 validity: None,
1048 data: None,
1049 offset: None,
1050 type_id: None,
1051 children: None,
1052 },
1053 };
1054
1055 json_batch.columns.push(json_col);
1056 }
1057
1058 json_batch
1059 }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064 use super::*;
1065
1066 use std::fs::File;
1067 use std::io::Read;
1068
1069 #[test]
1070 fn test_schema_equality() {
1071 let json = r#"
1072 {
1073 "fields": [
1074 {
1075 "name": "c1",
1076 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1077 "nullable": true,
1078 "children": []
1079 },
1080 {
1081 "name": "c2",
1082 "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1083 "nullable": true,
1084 "children": []
1085 },
1086 {
1087 "name": "c3",
1088 "type": {"name": "utf8"},
1089 "nullable": true,
1090 "children": []
1091 },
1092 {
1093 "name": "c4",
1094 "type": {
1095 "name": "list"
1096 },
1097 "nullable": true,
1098 "children": [
1099 {
1100 "name": "custom_item",
1101 "type": {
1102 "name": "int",
1103 "isSigned": true,
1104 "bitWidth": 32
1105 },
1106 "nullable": false,
1107 "children": []
1108 }
1109 ]
1110 }
1111 ]
1112 }"#;
1113 let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1114 let schema = Schema::new(vec![
1115 Field::new("c1", DataType::Int32, true),
1116 Field::new("c2", DataType::Float64, true),
1117 Field::new("c3", DataType::Utf8, true),
1118 Field::new(
1119 "c4",
1120 DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1121 true,
1122 ),
1123 ]);
1124 assert!(json_schema.equals_schema(&schema));
1125 }
1126
1127 #[test]
1128 fn test_arrow_data_equality() {
1129 let secs_tz = Some("Europe/Budapest".into());
1130 let millis_tz = Some("America/New_York".into());
1131 let micros_tz = Some("UTC".into());
1132 let nanos_tz = Some("Africa/Johannesburg".into());
1133
1134 let schema = Schema::new(vec![
1135 Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1136 [("k".to_string(), "v".to_string())]
1137 .iter()
1138 .cloned()
1139 .collect(),
1140 ),
1141 Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1142 [("k2".to_string(), "v2".to_string())]
1143 .iter()
1144 .cloned()
1145 .collect(),
1146 ),
1147 Field::new("bools", DataType::Boolean, true),
1148 Field::new("int8s", DataType::Int8, true),
1149 Field::new("int16s", DataType::Int16, true),
1150 Field::new("int32s", DataType::Int32, true),
1151 Field::new("int64s", DataType::Int64, true),
1152 Field::new("uint8s", DataType::UInt8, true),
1153 Field::new("uint16s", DataType::UInt16, true),
1154 Field::new("uint32s", DataType::UInt32, true),
1155 Field::new("uint64s", DataType::UInt64, true),
1156 Field::new("float32s", DataType::Float32, true),
1157 Field::new("float64s", DataType::Float64, true),
1158 Field::new("date_days", DataType::Date32, true),
1159 Field::new("date_millis", DataType::Date64, true),
1160 Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1161 Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1162 Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1163 Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1164 Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1165 Field::new(
1166 "ts_millis",
1167 DataType::Timestamp(TimeUnit::Millisecond, None),
1168 true,
1169 ),
1170 Field::new(
1171 "ts_micros",
1172 DataType::Timestamp(TimeUnit::Microsecond, None),
1173 true,
1174 ),
1175 Field::new(
1176 "ts_nanos",
1177 DataType::Timestamp(TimeUnit::Nanosecond, None),
1178 true,
1179 ),
1180 Field::new(
1181 "ts_secs_tz",
1182 DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1183 true,
1184 ),
1185 Field::new(
1186 "ts_millis_tz",
1187 DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1188 true,
1189 ),
1190 Field::new(
1191 "ts_micros_tz",
1192 DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1193 true,
1194 ),
1195 Field::new(
1196 "ts_nanos_tz",
1197 DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1198 true,
1199 ),
1200 Field::new("utf8s", DataType::Utf8, true),
1201 Field::new(
1202 "lists",
1203 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1204 true,
1205 ),
1206 Field::new(
1207 "structs",
1208 DataType::Struct(Fields::from(vec![
1209 Field::new("int32s", DataType::Int32, true),
1210 Field::new("utf8s", DataType::Utf8, true),
1211 ])),
1212 true,
1213 ),
1214 ]);
1215
1216 let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1217 let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1218 let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1219 let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1220 let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1221 let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1222 let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1223 let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1224 let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1225 let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1226 let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1227 let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1228 let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1229 let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1230 let date_millis = Date64Array::from(vec![
1231 Some(167903550396207),
1232 Some(29923997007884),
1233 Some(30612271819236),
1234 ]);
1235 let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1236 let time_millis =
1237 Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1238 let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1239 let time_nanos =
1240 Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1241 let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1242 let ts_millis =
1243 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1244 let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1245 let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1246 let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1247 .with_timezone_opt(secs_tz);
1248 let ts_millis_tz =
1249 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1250 .with_timezone_opt(millis_tz);
1251 let ts_micros_tz =
1252 TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1253 let ts_nanos_tz =
1254 TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1255 .with_timezone_opt(nanos_tz);
1256 let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1257
1258 let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1259 let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1260 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1261 let list_data = ArrayData::builder(list_data_type)
1262 .len(3)
1263 .add_buffer(value_offsets)
1264 .add_child_data(value_data.into_data())
1265 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1266 .build()
1267 .unwrap();
1268 let lists = ListArray::from(list_data);
1269
1270 let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1271 let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1272 let struct_data_type = DataType::Struct(Fields::from(vec![
1273 Field::new("int32s", DataType::Int32, true),
1274 Field::new("utf8s", DataType::Utf8, true),
1275 ]));
1276 let struct_data = ArrayData::builder(struct_data_type)
1277 .len(3)
1278 .add_child_data(structs_int32s.into_data())
1279 .add_child_data(structs_utf8s.into_data())
1280 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1281 .build()
1282 .unwrap();
1283 let structs = StructArray::from(struct_data);
1284
1285 let record_batch = RecordBatch::try_new(
1286 Arc::new(schema.clone()),
1287 vec![
1288 Arc::new(bools_with_metadata_map),
1289 Arc::new(bools_with_metadata_vec),
1290 Arc::new(bools),
1291 Arc::new(int8s),
1292 Arc::new(int16s),
1293 Arc::new(int32s),
1294 Arc::new(int64s),
1295 Arc::new(uint8s),
1296 Arc::new(uint16s),
1297 Arc::new(uint32s),
1298 Arc::new(uint64s),
1299 Arc::new(float32s),
1300 Arc::new(float64s),
1301 Arc::new(date_days),
1302 Arc::new(date_millis),
1303 Arc::new(time_secs),
1304 Arc::new(time_millis),
1305 Arc::new(time_micros),
1306 Arc::new(time_nanos),
1307 Arc::new(ts_secs),
1308 Arc::new(ts_millis),
1309 Arc::new(ts_micros),
1310 Arc::new(ts_nanos),
1311 Arc::new(ts_secs_tz),
1312 Arc::new(ts_millis_tz),
1313 Arc::new(ts_micros_tz),
1314 Arc::new(ts_nanos_tz),
1315 Arc::new(utf8s),
1316 Arc::new(lists),
1317 Arc::new(structs),
1318 ],
1319 )
1320 .unwrap();
1321 let mut file = File::open("data/integration.json").unwrap();
1322 let mut json = String::new();
1323 file.read_to_string(&mut json).unwrap();
1324 let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1325 assert!(arrow_json.schema.equals_schema(&schema));
1327 assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1329 }
1330}