1#![doc(
33 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
34 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
35)]
36#![cfg_attr(docsrs, feature(doc_cfg))]
37#![warn(missing_docs)]
38use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
39use hex::decode;
40use num_bigint::BigInt;
41use num_traits::Signed;
42use serde::{Deserialize, Serialize};
43use serde_json::{Map as SJMap, Value};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use arrow::array::*;
48use arrow::buffer::{Buffer, MutableBuffer};
49use arrow::datatypes::*;
50use arrow::error::{ArrowError, Result};
51use arrow::util::bit_util;
52
53mod datatype;
54mod field;
55mod schema;
56
57pub use datatype::*;
58pub use field::*;
59pub use schema::*;
60
61#[derive(Deserialize, Serialize, Debug)]
65pub struct ArrowJson {
66 pub schema: ArrowJsonSchema,
68 pub batches: Vec<ArrowJsonBatch>,
70 #[serde(skip_serializing_if = "Option::is_none")]
72 pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
73}
74
75#[derive(Deserialize, Serialize, Debug)]
79pub struct ArrowJsonSchema {
80 pub fields: Vec<ArrowJsonField>,
82 #[serde(skip_serializing_if = "Option::is_none")]
84 pub metadata: Option<Vec<HashMap<String, String>>>,
85}
86
87#[derive(Deserialize, Serialize, Debug)]
89pub struct ArrowJsonField {
90 pub name: String,
92 #[serde(rename = "type")]
95 pub field_type: Value,
96 pub nullable: bool,
98 pub children: Vec<ArrowJsonField>,
100 #[serde(skip_serializing_if = "Option::is_none")]
102 pub dictionary: Option<ArrowJsonFieldDictionary>,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub metadata: Option<Value>,
106}
107
108impl From<&FieldRef> for ArrowJsonField {
109 fn from(value: &FieldRef) -> Self {
110 Self::from(value.as_ref())
111 }
112}
113
114impl From<&Field> for ArrowJsonField {
115 fn from(field: &Field) -> Self {
116 let metadata_value = match field.metadata().is_empty() {
117 false => {
118 let mut array = Vec::new();
119 for (k, v) in field.metadata() {
120 let mut kv_map = SJMap::new();
121 kv_map.insert(k.clone(), Value::String(v.clone()));
122 array.push(Value::Object(kv_map));
123 }
124 if !array.is_empty() {
125 Some(Value::Array(array))
126 } else {
127 None
128 }
129 }
130 _ => None,
131 };
132
133 Self {
134 name: field.name().to_string(),
135 field_type: data_type_to_json(field.data_type()),
136 nullable: field.is_nullable(),
137 children: vec![],
138 dictionary: None, metadata: metadata_value,
140 }
141 }
142}
143
144#[derive(Deserialize, Serialize, Debug)]
146pub struct ArrowJsonFieldDictionary {
147 pub id: i64,
149 #[serde(rename = "indexType")]
151 pub index_type: DictionaryIndexType,
152 #[serde(rename = "isOrdered")]
154 pub is_ordered: bool,
155}
156
157#[derive(Deserialize, Serialize, Debug)]
159pub struct DictionaryIndexType {
160 pub name: String,
162 #[serde(rename = "isSigned")]
164 pub is_signed: bool,
165 #[serde(rename = "bitWidth")]
167 pub bit_width: i64,
168}
169
170#[derive(Deserialize, Serialize, Debug, Clone)]
172pub struct ArrowJsonBatch {
173 count: usize,
174 pub columns: Vec<ArrowJsonColumn>,
176}
177
178#[derive(Deserialize, Serialize, Debug, Clone)]
180#[allow(non_snake_case)]
181pub struct ArrowJsonDictionaryBatch {
182 pub id: i64,
184 pub data: ArrowJsonBatch,
186}
187
188#[derive(Deserialize, Serialize, Clone, Debug)]
190pub struct ArrowJsonColumn {
191 name: String,
192 pub count: usize,
194 #[serde(rename = "VALIDITY")]
196 pub validity: Option<Vec<u8>>,
197 #[serde(rename = "DATA")]
199 pub data: Option<Vec<Value>>,
200 #[serde(rename = "OFFSET")]
202 pub offset: Option<Vec<Value>>, #[serde(rename = "TYPE_ID")]
205 pub type_id: Option<Vec<i8>>,
206 #[serde(rename = "SIZE")]
208 pub size: Option<Vec<Value>>,
209 #[serde(rename = "VIEWS")]
211 pub views: Option<Vec<Value>>,
212 #[serde(rename = "VARIADIC_DATA_BUFFERS")]
214 pub variadic_data_buffers: Option<Vec<String>>,
215 pub children: Option<Vec<ArrowJsonColumn>>,
217}
218
219impl ArrowJson {
220 pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
222 if !self.schema.equals_schema(&reader.schema()) {
223 return Ok(false);
224 }
225
226 for json_batch in self.get_record_batches()?.into_iter() {
227 let batch = reader.next();
228 match batch {
229 Some(Ok(batch)) => {
230 if json_batch != batch {
231 println!("json: {json_batch:?}");
232 println!("batch: {batch:?}");
233 return Ok(false);
234 }
235 }
236 Some(Err(e)) => return Err(e),
237 None => return Ok(false),
238 }
239 }
240
241 Ok(true)
242 }
243
244 pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
246 let schema = self.schema.to_arrow_schema()?;
247
248 let mut dictionaries = HashMap::new();
249 self.dictionaries.iter().for_each(|dict_batches| {
250 dict_batches.iter().for_each(|d| {
251 dictionaries.insert(d.id, d.clone());
252 });
253 });
254
255 let batches: Result<Vec<_>> = self
256 .batches
257 .iter()
258 .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
259 .collect();
260
261 batches
262 }
263}
264
265impl ArrowJsonSchema {
266 fn equals_schema(&self, schema: &Schema) -> bool {
268 let field_len = self.fields.len();
269 if field_len != schema.fields().len() {
270 return false;
271 }
272 for i in 0..field_len {
273 let json_field = &self.fields[i];
274 let field = schema.field(i);
275 if !json_field.equals_field(field) {
276 return false;
277 }
278 }
279 true
280 }
281
282 fn to_arrow_schema(&self) -> Result<Schema> {
283 let arrow_fields: Result<Vec<_>> = self
284 .fields
285 .iter()
286 .map(|field| field.to_arrow_field())
287 .collect();
288
289 if let Some(metadatas) = &self.metadata {
290 let mut metadata: HashMap<String, String> = HashMap::new();
291
292 metadatas.iter().for_each(|pair| {
293 let key = pair.get("key").unwrap();
294 let value = pair.get("value").unwrap();
295 metadata.insert(key.clone(), value.clone());
296 });
297
298 Ok(Schema::new_with_metadata(arrow_fields?, metadata))
299 } else {
300 Ok(Schema::new(arrow_fields?))
301 }
302 }
303}
304
305impl ArrowJsonField {
306 fn equals_field(&self, field: &Field) -> bool {
308 match self.to_arrow_field() {
310 Ok(self_field) => {
311 assert_eq!(&self_field, field, "Arrow fields not the same");
312 true
313 }
314 Err(e) => {
315 eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
316 false
317 }
318 }
319 }
320
321 fn to_arrow_field(&self) -> Result<Field> {
324 let field =
326 serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
327 field_from_json(&field)
328 }
329}
330
331pub fn record_batch_from_json(
333 schema: &Schema,
334 json_batch: ArrowJsonBatch,
335 json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
336) -> Result<RecordBatch> {
337 let mut columns = vec![];
338
339 for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
340 let col = array_from_json(field, json_col, json_dictionaries)?;
341 columns.push(col);
342 }
343
344 RecordBatch::try_new(Arc::new(schema.clone()), columns)
345}
346
347pub fn array_from_json(
349 field: &Field,
350 json_col: ArrowJsonColumn,
351 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
352) -> Result<ArrayRef> {
353 match field.data_type() {
354 DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
355 DataType::Boolean => {
356 let mut b = BooleanBuilder::with_capacity(json_col.count);
357 for (is_valid, value) in json_col
358 .validity
359 .as_ref()
360 .unwrap()
361 .iter()
362 .zip(json_col.data.unwrap())
363 {
364 match is_valid {
365 1 => b.append_value(value.as_bool().unwrap()),
366 _ => b.append_null(),
367 };
368 }
369 Ok(Arc::new(b.finish()))
370 }
371 DataType::Int8 => {
372 let mut b = Int8Builder::with_capacity(json_col.count);
373 for (is_valid, value) in json_col
374 .validity
375 .as_ref()
376 .unwrap()
377 .iter()
378 .zip(json_col.data.unwrap())
379 {
380 match is_valid {
381 1 => b.append_value(value.as_i64().ok_or_else(|| {
382 ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
383 })? as i8),
384 _ => b.append_null(),
385 };
386 }
387 Ok(Arc::new(b.finish()))
388 }
389 DataType::Int16 => {
390 let mut b = Int16Builder::with_capacity(json_col.count);
391 for (is_valid, value) in json_col
392 .validity
393 .as_ref()
394 .unwrap()
395 .iter()
396 .zip(json_col.data.unwrap())
397 {
398 match is_valid {
399 1 => b.append_value(value.as_i64().unwrap() as i16),
400 _ => b.append_null(),
401 };
402 }
403 Ok(Arc::new(b.finish()))
404 }
405 DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
406 let mut b = Int32Builder::with_capacity(json_col.count);
407 for (is_valid, value) in json_col
408 .validity
409 .as_ref()
410 .unwrap()
411 .iter()
412 .zip(json_col.data.unwrap())
413 {
414 match is_valid {
415 1 => b.append_value(value.as_i64().unwrap() as i32),
416 _ => b.append_null(),
417 };
418 }
419 let array = Arc::new(b.finish()) as ArrayRef;
420 arrow::compute::cast(&array, field.data_type())
421 }
422 DataType::Interval(IntervalUnit::YearMonth) => {
423 let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
424 for (is_valid, value) in json_col
425 .validity
426 .as_ref()
427 .unwrap()
428 .iter()
429 .zip(json_col.data.unwrap())
430 {
431 match is_valid {
432 1 => b.append_value(value.as_i64().unwrap() as i32),
433 _ => b.append_null(),
434 };
435 }
436 Ok(Arc::new(b.finish()))
437 }
438 DataType::Int64
439 | DataType::Date64
440 | DataType::Time64(_)
441 | DataType::Timestamp(_, _)
442 | DataType::Duration(_) => {
443 let mut b = Int64Builder::with_capacity(json_col.count);
444 for (is_valid, value) in json_col
445 .validity
446 .as_ref()
447 .unwrap()
448 .iter()
449 .zip(json_col.data.unwrap())
450 {
451 match is_valid {
452 1 => b.append_value(match value {
453 Value::Number(n) => n.as_i64().unwrap(),
454 Value::String(s) => s.parse().expect("Unable to parse string as i64"),
455 _ => panic!("Unable to parse {value:?} as number"),
456 }),
457 _ => b.append_null(),
458 };
459 }
460 let array = Arc::new(b.finish()) as ArrayRef;
461 arrow::compute::cast(&array, field.data_type())
462 }
463 DataType::Interval(IntervalUnit::DayTime) => {
464 let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
465 for (is_valid, value) in json_col
466 .validity
467 .as_ref()
468 .unwrap()
469 .iter()
470 .zip(json_col.data.unwrap())
471 {
472 match is_valid {
473 1 => b.append_value(match value {
474 Value::Object(ref map)
475 if map.contains_key("days") && map.contains_key("milliseconds") =>
476 {
477 match field.data_type() {
478 DataType::Interval(IntervalUnit::DayTime) => {
479 let days = map.get("days").unwrap();
480 let milliseconds = map.get("milliseconds").unwrap();
481
482 match (days, milliseconds) {
483 (Value::Number(d), Value::Number(m)) => {
484 let days = d.as_i64().unwrap() as _;
485 let millis = m.as_i64().unwrap() as _;
486 IntervalDayTime::new(days, millis)
487 }
488 _ => {
489 panic!("Unable to parse {value:?} as interval daytime")
490 }
491 }
492 }
493 _ => panic!("Unable to parse {value:?} as interval daytime"),
494 }
495 }
496 _ => panic!("Unable to parse {value:?} as number"),
497 }),
498 _ => b.append_null(),
499 };
500 }
501 Ok(Arc::new(b.finish()))
502 }
503 DataType::UInt8 => {
504 let mut b = UInt8Builder::with_capacity(json_col.count);
505 for (is_valid, value) in json_col
506 .validity
507 .as_ref()
508 .unwrap()
509 .iter()
510 .zip(json_col.data.unwrap())
511 {
512 match is_valid {
513 1 => b.append_value(value.as_u64().unwrap() as u8),
514 _ => b.append_null(),
515 };
516 }
517 Ok(Arc::new(b.finish()))
518 }
519 DataType::UInt16 => {
520 let mut b = UInt16Builder::with_capacity(json_col.count);
521 for (is_valid, value) in json_col
522 .validity
523 .as_ref()
524 .unwrap()
525 .iter()
526 .zip(json_col.data.unwrap())
527 {
528 match is_valid {
529 1 => b.append_value(value.as_u64().unwrap() as u16),
530 _ => b.append_null(),
531 };
532 }
533 Ok(Arc::new(b.finish()))
534 }
535 DataType::UInt32 => {
536 let mut b = UInt32Builder::with_capacity(json_col.count);
537 for (is_valid, value) in json_col
538 .validity
539 .as_ref()
540 .unwrap()
541 .iter()
542 .zip(json_col.data.unwrap())
543 {
544 match is_valid {
545 1 => b.append_value(value.as_u64().unwrap() as u32),
546 _ => b.append_null(),
547 };
548 }
549 Ok(Arc::new(b.finish()))
550 }
551 DataType::UInt64 => {
552 let mut b = UInt64Builder::with_capacity(json_col.count);
553 for (is_valid, value) in json_col
554 .validity
555 .as_ref()
556 .unwrap()
557 .iter()
558 .zip(json_col.data.unwrap())
559 {
560 match is_valid {
561 1 => {
562 if value.is_string() {
563 b.append_value(
564 value
565 .as_str()
566 .unwrap()
567 .parse()
568 .expect("Unable to parse string as u64"),
569 )
570 } else if value.is_number() {
571 b.append_value(value.as_u64().expect("Unable to read number as u64"))
572 } else {
573 panic!("Unable to parse value {value:?} as u64")
574 }
575 }
576 _ => b.append_null(),
577 };
578 }
579 Ok(Arc::new(b.finish()))
580 }
581 DataType::Interval(IntervalUnit::MonthDayNano) => {
582 let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
583 for (is_valid, value) in json_col
584 .validity
585 .as_ref()
586 .unwrap()
587 .iter()
588 .zip(json_col.data.unwrap())
589 {
590 match is_valid {
591 1 => b.append_value(match value {
592 Value::Object(v) => {
593 let months = v.get("months").unwrap();
594 let days = v.get("days").unwrap();
595 let nanoseconds = v.get("nanoseconds").unwrap();
596 match (months, days, nanoseconds) {
597 (
598 Value::Number(months),
599 Value::Number(days),
600 Value::Number(nanoseconds),
601 ) => {
602 let months = months.as_i64().unwrap() as i32;
603 let days = days.as_i64().unwrap() as i32;
604 let nanoseconds = nanoseconds.as_i64().unwrap();
605 IntervalMonthDayNano::new(months, days, nanoseconds)
606 }
607 (_, _, _) => {
608 panic!("Unable to parse {v:?} as MonthDayNano")
609 }
610 }
611 }
612 _ => panic!("Unable to parse {value:?} as MonthDayNano"),
613 }),
614 _ => b.append_null(),
615 };
616 }
617 Ok(Arc::new(b.finish()))
618 }
619 DataType::Float32 => {
620 let mut b = Float32Builder::with_capacity(json_col.count);
621 for (is_valid, value) in json_col
622 .validity
623 .as_ref()
624 .unwrap()
625 .iter()
626 .zip(json_col.data.unwrap())
627 {
628 match is_valid {
629 1 => b.append_value(value.as_f64().unwrap() as f32),
630 _ => b.append_null(),
631 };
632 }
633 Ok(Arc::new(b.finish()))
634 }
635 DataType::Float64 => {
636 let mut b = Float64Builder::with_capacity(json_col.count);
637 for (is_valid, value) in json_col
638 .validity
639 .as_ref()
640 .unwrap()
641 .iter()
642 .zip(json_col.data.unwrap())
643 {
644 match is_valid {
645 1 => b.append_value(value.as_f64().unwrap()),
646 _ => b.append_null(),
647 };
648 }
649 Ok(Arc::new(b.finish()))
650 }
651 DataType::Binary => {
652 let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
653 for (is_valid, value) in json_col
654 .validity
655 .as_ref()
656 .unwrap()
657 .iter()
658 .zip(json_col.data.unwrap())
659 {
660 match is_valid {
661 1 => {
662 let v = decode(value.as_str().unwrap()).unwrap();
663 b.append_value(&v)
664 }
665 _ => b.append_null(),
666 };
667 }
668 Ok(Arc::new(b.finish()))
669 }
670 DataType::LargeBinary => {
671 let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
672 for (is_valid, value) in json_col
673 .validity
674 .as_ref()
675 .unwrap()
676 .iter()
677 .zip(json_col.data.unwrap())
678 {
679 match is_valid {
680 1 => {
681 let v = decode(value.as_str().unwrap()).unwrap();
682 b.append_value(&v)
683 }
684 _ => b.append_null(),
685 };
686 }
687 Ok(Arc::new(b.finish()))
688 }
689 DataType::Utf8 => {
690 let mut b = StringBuilder::with_capacity(json_col.count, 1024);
691 for (is_valid, value) in json_col
692 .validity
693 .as_ref()
694 .unwrap()
695 .iter()
696 .zip(json_col.data.unwrap())
697 {
698 match is_valid {
699 1 => b.append_value(value.as_str().unwrap()),
700 _ => b.append_null(),
701 };
702 }
703 Ok(Arc::new(b.finish()))
704 }
705 DataType::LargeUtf8 => {
706 let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
707 for (is_valid, value) in json_col
708 .validity
709 .as_ref()
710 .unwrap()
711 .iter()
712 .zip(json_col.data.unwrap())
713 {
714 match is_valid {
715 1 => b.append_value(value.as_str().unwrap()),
716 _ => b.append_null(),
717 };
718 }
719 Ok(Arc::new(b.finish()))
720 }
721 DataType::FixedSizeBinary(len) => {
722 let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
723 for (is_valid, value) in json_col
724 .validity
725 .as_ref()
726 .unwrap()
727 .iter()
728 .zip(json_col.data.unwrap())
729 {
730 match is_valid {
731 1 => {
732 let v = hex::decode(value.as_str().unwrap()).unwrap();
733 b.append_value(&v)?
734 }
735 _ => b.append_null(),
736 };
737 }
738 Ok(Arc::new(b.finish()))
739 }
740 DataType::List(child_field) => {
741 let null_buf = create_null_buf(&json_col);
742 let children = json_col.children.clone().unwrap();
743 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
744 let offsets: Vec<i32> = json_col
745 .offset
746 .unwrap()
747 .iter()
748 .map(|v| v.as_i64().unwrap() as i32)
749 .collect();
750 let list_data = ArrayData::builder(field.data_type().clone())
751 .len(json_col.count)
752 .offset(0)
753 .add_buffer(Buffer::from(offsets.to_byte_slice()))
754 .add_child_data(child_array.into_data())
755 .null_bit_buffer(Some(null_buf))
756 .build()
757 .unwrap();
758 Ok(Arc::new(ListArray::from(list_data)))
759 }
760 DataType::LargeList(child_field) => {
761 let null_buf = create_null_buf(&json_col);
762 let children = json_col.children.clone().unwrap();
763 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
764 let offsets: Vec<i64> = json_col
765 .offset
766 .unwrap()
767 .iter()
768 .map(|v| match v {
769 Value::Number(n) => n.as_i64().unwrap(),
770 Value::String(s) => s.parse::<i64>().unwrap(),
771 _ => panic!("64-bit offset must be either string or number"),
772 })
773 .collect();
774 let list_data = ArrayData::builder(field.data_type().clone())
775 .len(json_col.count)
776 .offset(0)
777 .add_buffer(Buffer::from(offsets.to_byte_slice()))
778 .add_child_data(child_array.into_data())
779 .null_bit_buffer(Some(null_buf))
780 .build()
781 .unwrap();
782 Ok(Arc::new(LargeListArray::from(list_data)))
783 }
784 DataType::ListView(child_field) => {
785 let null_buf = create_null_buf(&json_col);
786 let children = json_col.children.clone().unwrap();
787 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
788 let offsets: Vec<i32> = json_col
789 .offset
790 .unwrap()
791 .iter()
792 .map(|v| v.as_i64().unwrap() as i32)
793 .collect();
794 let sizes: Vec<i32> = json_col
795 .size
796 .unwrap()
797 .iter()
798 .map(|v| v.as_i64().unwrap() as i32)
799 .collect();
800 let list_data = ArrayData::builder(field.data_type().clone())
801 .len(json_col.count)
802 .add_buffer(Buffer::from(offsets.to_byte_slice()))
803 .add_buffer(Buffer::from(sizes.to_byte_slice()))
804 .add_child_data(child_array.into_data())
805 .null_bit_buffer(Some(null_buf))
806 .build()
807 .unwrap();
808 Ok(Arc::new(ListViewArray::from(list_data)))
809 }
810 DataType::LargeListView(child_field) => {
811 let null_buf = create_null_buf(&json_col);
812 let children = json_col.children.clone().unwrap();
813 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
814 let offsets: Vec<i64> = json_col
815 .offset
816 .unwrap()
817 .iter()
818 .map(|v| match v {
819 Value::Number(n) => n.as_i64().unwrap(),
820 Value::String(s) => s.parse::<i64>().unwrap(),
821 _ => panic!("64-bit offset must be either string or number"),
822 })
823 .collect();
824 let sizes: Vec<i64> = json_col
825 .size
826 .unwrap()
827 .iter()
828 .map(|v| match v {
829 Value::Number(n) => n.as_i64().unwrap(),
830 Value::String(s) => s.parse::<i64>().unwrap(),
831 _ => panic!("64-bit size must be either string or number"),
832 })
833 .collect();
834 let list_data = ArrayData::builder(field.data_type().clone())
835 .len(json_col.count)
836 .add_buffer(Buffer::from(offsets.to_byte_slice()))
837 .add_buffer(Buffer::from(sizes.to_byte_slice()))
838 .add_child_data(child_array.into_data())
839 .null_bit_buffer(Some(null_buf))
840 .build()
841 .unwrap();
842 Ok(Arc::new(LargeListViewArray::from(list_data)))
843 }
844 DataType::FixedSizeList(child_field, _) => {
845 let children = json_col.children.clone().unwrap();
846 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
847 let null_buf = create_null_buf(&json_col);
848 let list_data = ArrayData::builder(field.data_type().clone())
849 .len(json_col.count)
850 .add_child_data(child_array.into_data())
851 .null_bit_buffer(Some(null_buf))
852 .build()
853 .unwrap();
854 Ok(Arc::new(FixedSizeListArray::from(list_data)))
855 }
856 DataType::Struct(fields) => {
857 let null_buf = create_null_buf(&json_col);
859 let mut array_data = ArrayData::builder(field.data_type().clone())
860 .len(json_col.count)
861 .null_bit_buffer(Some(null_buf));
862
863 for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
864 let array = array_from_json(field, col, dictionaries)?;
865 array_data = array_data.add_child_data(array.into_data());
866 }
867
868 let array = StructArray::from(array_data.build().unwrap());
869 Ok(Arc::new(array))
870 }
871 DataType::Dictionary(key_type, value_type) => {
872 #[allow(deprecated)]
873 let dict_id = field.dict_id().ok_or_else(|| {
874 ArrowError::JsonError(format!("Unable to find dict_id for field {field}"))
875 })?;
876 let dictionary = dictionaries
878 .ok_or_else(|| {
879 ArrowError::JsonError(format!(
880 "Unable to find any dictionaries for field {field}"
881 ))
882 })?
883 .get(&dict_id);
884 match dictionary {
885 Some(dictionary) => dictionary_array_from_json(
886 field,
887 json_col,
888 key_type,
889 value_type,
890 dictionary,
891 dictionaries,
892 ),
893 None => Err(ArrowError::JsonError(format!(
894 "Unable to find dictionary for field {field}"
895 ))),
896 }
897 }
898 DataType::Decimal32(precision, scale) => {
899 let mut b = Decimal32Builder::with_capacity(json_col.count);
900 for (is_valid, value) in json_col
901 .validity
902 .as_ref()
903 .unwrap()
904 .iter()
905 .zip(json_col.data.unwrap())
906 {
907 match is_valid {
908 1 => b.append_value(value.as_str().unwrap().parse::<i32>().unwrap()),
909 _ => b.append_null(),
910 };
911 }
912 Ok(Arc::new(
913 b.finish().with_precision_and_scale(*precision, *scale)?,
914 ))
915 }
916 DataType::Decimal64(precision, scale) => {
917 let mut b = Decimal64Builder::with_capacity(json_col.count);
918 for (is_valid, value) in json_col
919 .validity
920 .as_ref()
921 .unwrap()
922 .iter()
923 .zip(json_col.data.unwrap())
924 {
925 match is_valid {
926 1 => b.append_value(value.as_str().unwrap().parse::<i64>().unwrap()),
927 _ => b.append_null(),
928 };
929 }
930 Ok(Arc::new(
931 b.finish().with_precision_and_scale(*precision, *scale)?,
932 ))
933 }
934 DataType::Decimal128(precision, scale) => {
935 let mut b = Decimal128Builder::with_capacity(json_col.count);
936 for (is_valid, value) in json_col
937 .validity
938 .as_ref()
939 .unwrap()
940 .iter()
941 .zip(json_col.data.unwrap())
942 {
943 match is_valid {
944 1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
945 _ => b.append_null(),
946 };
947 }
948 Ok(Arc::new(
949 b.finish().with_precision_and_scale(*precision, *scale)?,
950 ))
951 }
952 DataType::Decimal256(precision, scale) => {
953 let mut b = Decimal256Builder::with_capacity(json_col.count);
954 for (is_valid, value) in json_col
955 .validity
956 .as_ref()
957 .unwrap()
958 .iter()
959 .zip(json_col.data.unwrap())
960 {
961 match is_valid {
962 1 => {
963 let str = value.as_str().unwrap();
964 let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
965 let integer_bytes = integer.to_signed_bytes_le();
966 let mut bytes = if integer.is_positive() {
967 [0_u8; 32]
968 } else {
969 [255_u8; 32]
970 };
971 bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
972 b.append_value(i256::from_le_bytes(bytes));
973 }
974 _ => b.append_null(),
975 }
976 }
977 Ok(Arc::new(
978 b.finish().with_precision_and_scale(*precision, *scale)?,
979 ))
980 }
981 DataType::Map(child_field, _) => {
982 let null_buf = create_null_buf(&json_col);
983 let children = json_col.children.clone().unwrap();
984 let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
985 let offsets: Vec<i32> = json_col
986 .offset
987 .unwrap()
988 .iter()
989 .map(|v| v.as_i64().unwrap() as i32)
990 .collect();
991 let array_data = ArrayData::builder(field.data_type().clone())
992 .len(json_col.count)
993 .add_buffer(Buffer::from(offsets.to_byte_slice()))
994 .add_child_data(child_array.into_data())
995 .null_bit_buffer(Some(null_buf))
996 .build()
997 .unwrap();
998
999 let array = MapArray::from(array_data);
1000 Ok(Arc::new(array))
1001 }
1002 DataType::Union(fields, _) => {
1003 let type_ids = if let Some(type_id) = json_col.type_id {
1004 type_id
1005 } else {
1006 return Err(ArrowError::JsonError(
1007 "Cannot find expected type_id in json column".to_string(),
1008 ));
1009 };
1010
1011 let offset: Option<ScalarBuffer<i32>> = json_col
1012 .offset
1013 .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
1014
1015 let mut children = Vec::with_capacity(fields.len());
1016 for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
1017 let array = array_from_json(field, col, dictionaries)?;
1018 children.push(array);
1019 }
1020
1021 let array =
1022 UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
1023 Ok(Arc::new(array))
1024 }
1025 DataType::Utf8View => {
1026 let views = json_col.views.ok_or_else(|| {
1027 ArrowError::JsonError("Utf8View requires VIEWS field".to_string())
1028 })?;
1029 let variadic_buffers = json_col.variadic_data_buffers.unwrap_or_default();
1030 let validity = json_col.validity.as_ref();
1031
1032 let mut builder = StringViewBuilder::new();
1033 for (i, view) in views.iter().enumerate() {
1034 let is_valid = validity.map_or(1, |v| v[i]);
1035 if is_valid == 0 {
1036 builder.append_null();
1037 } else {
1038 let view_obj = view.as_object().unwrap();
1039 let size = view_obj["SIZE"].as_u64().unwrap() as usize;
1040 if let Some(inlined) = view_obj.get("INLINED") {
1042 builder.append_value(inlined.as_str().unwrap());
1043 } else {
1044 let buffer_index = view_obj["BUFFER_INDEX"].as_u64().unwrap() as usize;
1046 let offset = view_obj["OFFSET"].as_u64().unwrap() as usize;
1047 let buffer_data = hex::decode(&variadic_buffers[buffer_index]).unwrap();
1048 let s = std::str::from_utf8(&buffer_data[offset..offset + size]).unwrap();
1049 builder.append_value(s);
1050 }
1051 }
1052 }
1053 Ok(Arc::new(builder.finish()))
1054 }
1055 DataType::BinaryView => {
1056 let views = json_col.views.ok_or_else(|| {
1057 ArrowError::JsonError("BinaryView requires VIEWS field".to_string())
1058 })?;
1059 let variadic_buffers = json_col.variadic_data_buffers.unwrap_or_default();
1060 let validity = json_col.validity.as_ref();
1061
1062 let mut builder = BinaryViewBuilder::new();
1063 for (i, view) in views.iter().enumerate() {
1064 let is_valid = validity.map_or(1, |v| v[i]);
1065 if is_valid == 0 {
1066 builder.append_null();
1067 } else {
1068 let view_obj = view.as_object().unwrap();
1069 let size = view_obj["SIZE"].as_u64().unwrap() as usize;
1070 if let Some(inlined) = view_obj.get("INLINED") {
1072 let data = hex::decode(inlined.as_str().unwrap()).unwrap();
1073 builder.append_value(&data);
1074 } else {
1075 let buffer_index = view_obj["BUFFER_INDEX"].as_u64().unwrap() as usize;
1077 let offset = view_obj["OFFSET"].as_u64().unwrap() as usize;
1078 let buffer_data = hex::decode(&variadic_buffers[buffer_index]).unwrap();
1079 builder.append_value(&buffer_data[offset..offset + size]);
1080 }
1081 }
1082 }
1083 Ok(Arc::new(builder.finish()))
1084 }
1085 DataType::RunEndEncoded(run_ends_field, values_field) => {
1086 let children = json_col.children.clone().unwrap();
1087 if children.len() != 2 {
1088 return Err(ArrowError::JsonError(
1089 "RunEndEncoded requires exactly 2 children".to_string(),
1090 ));
1091 }
1092 let run_ends_array =
1093 array_from_json(run_ends_field, children[0].clone(), dictionaries)?;
1094 let values_array = array_from_json(values_field, children[1].clone(), dictionaries)?;
1095
1096 let run_array_data = ArrayData::builder(field.data_type().clone())
1097 .len(json_col.count)
1098 .add_child_data(run_ends_array.into_data())
1099 .add_child_data(values_array.into_data())
1100 .build()
1101 .unwrap();
1102
1103 Ok(make_array(run_array_data))
1104 }
1105 t => Err(ArrowError::JsonError(format!(
1106 "data type {t} not supported"
1107 ))),
1108 }
1109}
1110
1111pub fn dictionary_array_from_json(
1113 field: &Field,
1114 json_col: ArrowJsonColumn,
1115 dict_key: &DataType,
1116 dict_value: &DataType,
1117 dictionary: &ArrowJsonDictionaryBatch,
1118 dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
1119) -> Result<ArrayRef> {
1120 match dict_key {
1121 DataType::Int8
1122 | DataType::Int16
1123 | DataType::Int32
1124 | DataType::Int64
1125 | DataType::UInt8
1126 | DataType::UInt16
1127 | DataType::UInt32
1128 | DataType::UInt64 => {
1129 let null_buf = create_null_buf(&json_col);
1130
1131 #[allow(deprecated)]
1133 let key_field = Field::new_dict(
1134 "key",
1135 dict_key.clone(),
1136 field.is_nullable(),
1137 #[allow(deprecated)]
1138 field
1139 .dict_id()
1140 .expect("Dictionary fields must have a dict_id value"),
1141 field
1142 .dict_is_ordered()
1143 .expect("Dictionary fields must have a dict_is_ordered value"),
1144 );
1145 let keys = array_from_json(&key_field, json_col, None)?;
1146 let value_field = Field::new("value", dict_value.clone(), true);
1148 let values = array_from_json(
1149 &value_field,
1150 dictionary.data.columns[0].clone(),
1151 dictionaries,
1152 )?;
1153
1154 let dict_data = ArrayData::builder(field.data_type().clone())
1156 .len(keys.len())
1157 .add_buffer(keys.to_data().buffers()[0].clone())
1158 .null_bit_buffer(Some(null_buf))
1159 .add_child_data(values.into_data())
1160 .build()
1161 .unwrap();
1162
1163 let array = match dict_key {
1164 DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
1165 DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
1166 DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
1167 DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
1168 DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
1169 DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
1170 DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
1171 DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
1172 _ => unreachable!(),
1173 };
1174 Ok(array)
1175 }
1176 _ => Err(ArrowError::JsonError(format!(
1177 "Dictionary key type {dict_key:?} not supported"
1178 ))),
1179 }
1180}
1181
1182fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
1184 let num_bytes = bit_util::ceil(json_col.count, 8);
1185 let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
1186 json_col
1187 .validity
1188 .clone()
1189 .unwrap()
1190 .iter()
1191 .enumerate()
1192 .for_each(|(i, v)| {
1193 let null_slice = null_buf.as_slice_mut();
1194 if *v != 0 {
1195 bit_util::set_bit(null_slice, i);
1196 }
1197 });
1198 null_buf.into()
1199}
1200
1201impl ArrowJsonBatch {
1202 pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1214 let mut json_batch = ArrowJsonBatch {
1215 count: batch.num_rows(),
1216 columns: Vec::with_capacity(batch.num_columns()),
1217 };
1218
1219 for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1220 let json_col = match field.data_type() {
1221 DataType::Int8 => {
1222 let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1223
1224 let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1225 let mut data: Vec<Value> = Vec::with_capacity(col.len());
1226
1227 for i in 0..col.len() {
1228 if col.is_null(i) {
1229 validity.push(1);
1230 data.push(0i8.into());
1231 } else {
1232 validity.push(0);
1233 data.push(col.value(i).into());
1234 }
1235 }
1236
1237 ArrowJsonColumn {
1238 name: field.name().clone(),
1239 count: col.len(),
1240 validity: Some(validity),
1241 data: Some(data),
1242 offset: None,
1243 type_id: None,
1244 size: None,
1245 views: None,
1246 variadic_data_buffers: None,
1247 children: None,
1248 }
1249 }
1250 _ => ArrowJsonColumn {
1251 name: field.name().clone(),
1252 count: col.len(),
1253 validity: None,
1254 data: None,
1255 offset: None,
1256 type_id: None,
1257 size: None,
1258 views: None,
1259 variadic_data_buffers: None,
1260 children: None,
1261 },
1262 };
1263
1264 json_batch.columns.push(json_col);
1265 }
1266
1267 json_batch
1268 }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use super::*;
1274
1275 use std::fs::File;
1276 use std::io::Read;
1277
1278 #[test]
1279 fn test_schema_equality() {
1280 let json = r#"
1281 {
1282 "fields": [
1283 {
1284 "name": "c1",
1285 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1286 "nullable": true,
1287 "children": []
1288 },
1289 {
1290 "name": "c2",
1291 "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1292 "nullable": true,
1293 "children": []
1294 },
1295 {
1296 "name": "c3",
1297 "type": {"name": "utf8"},
1298 "nullable": true,
1299 "children": []
1300 },
1301 {
1302 "name": "c4",
1303 "type": {
1304 "name": "list"
1305 },
1306 "nullable": true,
1307 "children": [
1308 {
1309 "name": "custom_item",
1310 "type": {
1311 "name": "int",
1312 "isSigned": true,
1313 "bitWidth": 32
1314 },
1315 "nullable": false,
1316 "children": []
1317 }
1318 ]
1319 }
1320 ]
1321 }"#;
1322 let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1323 let schema = Schema::new(vec![
1324 Field::new("c1", DataType::Int32, true),
1325 Field::new("c2", DataType::Float64, true),
1326 Field::new("c3", DataType::Utf8, true),
1327 Field::new(
1328 "c4",
1329 DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1330 true,
1331 ),
1332 ]);
1333 assert!(json_schema.equals_schema(&schema));
1334 }
1335
1336 #[test]
1337 fn test_arrow_data_equality() {
1338 let secs_tz = Some("Europe/Budapest".into());
1339 let millis_tz = Some("America/New_York".into());
1340 let micros_tz = Some("UTC".into());
1341 let nanos_tz = Some("Africa/Johannesburg".into());
1342
1343 let schema = Schema::new(vec![
1344 Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1345 [("k".to_string(), "v".to_string())]
1346 .iter()
1347 .cloned()
1348 .collect(),
1349 ),
1350 Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1351 [("k2".to_string(), "v2".to_string())]
1352 .iter()
1353 .cloned()
1354 .collect(),
1355 ),
1356 Field::new("bools", DataType::Boolean, true),
1357 Field::new("int8s", DataType::Int8, true),
1358 Field::new("int16s", DataType::Int16, true),
1359 Field::new("int32s", DataType::Int32, true),
1360 Field::new("int64s", DataType::Int64, true),
1361 Field::new("uint8s", DataType::UInt8, true),
1362 Field::new("uint16s", DataType::UInt16, true),
1363 Field::new("uint32s", DataType::UInt32, true),
1364 Field::new("uint64s", DataType::UInt64, true),
1365 Field::new("float32s", DataType::Float32, true),
1366 Field::new("float64s", DataType::Float64, true),
1367 Field::new("date_days", DataType::Date32, true),
1368 Field::new("date_millis", DataType::Date64, true),
1369 Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1370 Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1371 Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1372 Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1373 Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1374 Field::new(
1375 "ts_millis",
1376 DataType::Timestamp(TimeUnit::Millisecond, None),
1377 true,
1378 ),
1379 Field::new(
1380 "ts_micros",
1381 DataType::Timestamp(TimeUnit::Microsecond, None),
1382 true,
1383 ),
1384 Field::new(
1385 "ts_nanos",
1386 DataType::Timestamp(TimeUnit::Nanosecond, None),
1387 true,
1388 ),
1389 Field::new(
1390 "ts_secs_tz",
1391 DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1392 true,
1393 ),
1394 Field::new(
1395 "ts_millis_tz",
1396 DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1397 true,
1398 ),
1399 Field::new(
1400 "ts_micros_tz",
1401 DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1402 true,
1403 ),
1404 Field::new(
1405 "ts_nanos_tz",
1406 DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1407 true,
1408 ),
1409 Field::new("utf8s", DataType::Utf8, true),
1410 Field::new(
1411 "lists",
1412 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1413 true,
1414 ),
1415 Field::new(
1416 "structs",
1417 DataType::Struct(Fields::from(vec![
1418 Field::new("int32s", DataType::Int32, true),
1419 Field::new("utf8s", DataType::Utf8, true),
1420 ])),
1421 true,
1422 ),
1423 Field::new("utf8views", DataType::Utf8View, true),
1424 Field::new("binaryviews", DataType::BinaryView, true),
1425 Field::new(
1426 "listviews",
1427 DataType::ListView(Arc::new(Field::new_list_field(DataType::Int32, true))),
1428 true,
1429 ),
1430 Field::new(
1431 "largelistviews",
1432 DataType::LargeListView(Arc::new(Field::new_list_field(DataType::Int32, true))),
1433 true,
1434 ),
1435 Field::new(
1436 "runendencoded",
1437 DataType::RunEndEncoded(
1438 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1439 Arc::new(Field::new("values", DataType::Int32, true)),
1440 ),
1441 true,
1442 ),
1443 ]);
1444
1445 let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1446 let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1447 let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1448 let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1449 let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1450 let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1451 let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1452 let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1453 let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1454 let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1455 let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1456 let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1457 let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1458 let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1459 let date_millis = Date64Array::from(vec![
1460 Some(167903550396207),
1461 Some(29923997007884),
1462 Some(30612271819236),
1463 ]);
1464 let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1465 let time_millis =
1466 Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1467 let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1468 let time_nanos =
1469 Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1470 let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1471 let ts_millis =
1472 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1473 let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1474 let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1475 let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1476 .with_timezone_opt(secs_tz);
1477 let ts_millis_tz =
1478 TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1479 .with_timezone_opt(millis_tz);
1480 let ts_micros_tz =
1481 TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1482 let ts_nanos_tz =
1483 TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1484 .with_timezone_opt(nanos_tz);
1485 let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1486
1487 let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1488 let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1489 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1490 let list_data = ArrayData::builder(list_data_type)
1491 .len(3)
1492 .add_buffer(value_offsets)
1493 .add_child_data(value_data.into_data())
1494 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1495 .build()
1496 .unwrap();
1497 let lists = ListArray::from(list_data);
1498
1499 let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1500 let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1501 let struct_data_type = DataType::Struct(Fields::from(vec![
1502 Field::new("int32s", DataType::Int32, true),
1503 Field::new("utf8s", DataType::Utf8, true),
1504 ]));
1505 let struct_data = ArrayData::builder(struct_data_type)
1506 .len(3)
1507 .add_child_data(structs_int32s.into_data())
1508 .add_child_data(structs_utf8s.into_data())
1509 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1510 .build()
1511 .unwrap();
1512 let structs = StructArray::from(struct_data);
1513
1514 let utf8views =
1515 StringViewArray::from(vec![Some("hello"), None, Some("this is not inlined")]);
1516 let binaryviews = BinaryViewArray::from_iter(vec![
1517 Some(b"\xf3\x4d".as_slice()),
1518 Some(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f".as_slice()),
1519 None,
1520 ]);
1521
1522 let listview_value_data = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
1523 let listview_offsets = Buffer::from_slice_ref([0i32, 2, 2]);
1524 let listview_sizes = Buffer::from_slice_ref([2i32, 0, 3]);
1525 let listview_data_type =
1526 DataType::ListView(Arc::new(Field::new_list_field(DataType::Int32, true)));
1527 let listview_data = ArrayData::builder(listview_data_type)
1528 .len(3)
1529 .add_buffer(listview_offsets)
1530 .add_buffer(listview_sizes)
1531 .add_child_data(listview_value_data.into_data())
1532 .null_bit_buffer(Some(Buffer::from([0b00000101])))
1533 .build()
1534 .unwrap();
1535 let listviews = ListViewArray::from(listview_data);
1536
1537 let largelistview_value_data = Int32Array::from(vec![Some(10), None, Some(30)]);
1538 let largelistview_offsets = Buffer::from_slice_ref([0i64, 2, 3]);
1539 let largelistview_sizes = Buffer::from_slice_ref([2i64, 1, 0]);
1540 let largelistview_data_type =
1541 DataType::LargeListView(Arc::new(Field::new_list_field(DataType::Int32, true)));
1542 let largelistview_data = ArrayData::builder(largelistview_data_type)
1543 .len(3)
1544 .add_buffer(largelistview_offsets)
1545 .add_buffer(largelistview_sizes)
1546 .add_child_data(largelistview_value_data.into_data())
1547 .null_bit_buffer(Some(Buffer::from([0b00000011])))
1548 .build()
1549 .unwrap();
1550 let largelistviews = LargeListViewArray::from(largelistview_data);
1551
1552 let ree_run_ends = Int16Array::from(vec![2, 3]);
1553 let ree_values = Int32Array::from(vec![Some(100), None]);
1554 let ree_data_type = DataType::RunEndEncoded(
1555 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1556 Arc::new(Field::new("values", DataType::Int32, true)),
1557 );
1558 let ree_data = ArrayData::builder(ree_data_type)
1559 .len(3)
1560 .add_child_data(ree_run_ends.into_data())
1561 .add_child_data(ree_values.into_data())
1562 .build()
1563 .unwrap();
1564 let runendencoded = RunArray::<Int16Type>::from(ree_data);
1565
1566 let record_batch = RecordBatch::try_new(
1567 Arc::new(schema.clone()),
1568 vec![
1569 Arc::new(bools_with_metadata_map),
1570 Arc::new(bools_with_metadata_vec),
1571 Arc::new(bools),
1572 Arc::new(int8s),
1573 Arc::new(int16s),
1574 Arc::new(int32s),
1575 Arc::new(int64s),
1576 Arc::new(uint8s),
1577 Arc::new(uint16s),
1578 Arc::new(uint32s),
1579 Arc::new(uint64s),
1580 Arc::new(float32s),
1581 Arc::new(float64s),
1582 Arc::new(date_days),
1583 Arc::new(date_millis),
1584 Arc::new(time_secs),
1585 Arc::new(time_millis),
1586 Arc::new(time_micros),
1587 Arc::new(time_nanos),
1588 Arc::new(ts_secs),
1589 Arc::new(ts_millis),
1590 Arc::new(ts_micros),
1591 Arc::new(ts_nanos),
1592 Arc::new(ts_secs_tz),
1593 Arc::new(ts_millis_tz),
1594 Arc::new(ts_micros_tz),
1595 Arc::new(ts_nanos_tz),
1596 Arc::new(utf8s),
1597 Arc::new(lists),
1598 Arc::new(structs),
1599 Arc::new(utf8views),
1600 Arc::new(binaryviews),
1601 Arc::new(listviews),
1602 Arc::new(largelistviews),
1603 Arc::new(runendencoded),
1604 ],
1605 )
1606 .unwrap();
1607 let mut file = File::open("data/integration.json").unwrap();
1608 let mut json = String::new();
1609 file.read_to_string(&mut json).unwrap();
1610 let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1611 assert!(arrow_json.schema.equals_schema(&schema));
1613 assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1615 }
1616}