arrow_integration_test/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Partial support for the [Apache Arrow JSON test data format](https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format)
19//!
20//! These utilities define structs that read the integration JSON format for integration testing purposes.
21//!
22//! This is not a canonical format, but provides a human-readable way of verifying language implementations
23//!
24//! <div class="warning">
25//!
26//! This crate is **only intended for integration testing the
27//! [Arrow project](https://github.com/apache/arrow-rs)**. It is not [intended for usage outside of
28//! this context](https://github.com/apache/arrow-rs/issues/8684#issuecomment-3433193158).
29//!
30//! </div>
31
32#![doc(
33    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
34    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
35)]
36#![cfg_attr(docsrs, feature(doc_cfg))]
37#![warn(missing_docs)]
38use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
39use hex::decode;
40use num_bigint::BigInt;
41use num_traits::Signed;
42use serde::{Deserialize, Serialize};
43use serde_json::{Map as SJMap, Value};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use arrow::array::*;
48use arrow::buffer::{Buffer, MutableBuffer};
49use arrow::datatypes::*;
50use arrow::error::{ArrowError, Result};
51use arrow::util::bit_util;
52
53mod datatype;
54mod field;
55mod schema;
56
57pub use datatype::*;
58pub use field::*;
59pub use schema::*;
60
61/// A struct that represents an Arrow file with a schema and record batches
62///
63/// See <https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format>
64#[derive(Deserialize, Serialize, Debug)]
65pub struct ArrowJson {
66    /// The Arrow schema for JSON file
67    pub schema: ArrowJsonSchema,
68    /// The `RecordBatch`es in the JSON file
69    pub batches: Vec<ArrowJsonBatch>,
70    /// The dictionaries in the JSON file
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
73}
74
75/// A struct that partially reads the Arrow JSON schema.
76///
77/// Fields are left as JSON `Value` as they vary by `DataType`
78#[derive(Deserialize, Serialize, Debug)]
79pub struct ArrowJsonSchema {
80    /// An array of JSON fields
81    pub fields: Vec<ArrowJsonField>,
82    /// An array of metadata key-value pairs
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<Vec<HashMap<String, String>>>,
85}
86
87/// Fields are left as JSON `Value` as they vary by `DataType`
88#[derive(Deserialize, Serialize, Debug)]
89pub struct ArrowJsonField {
90    /// The name of the field
91    pub name: String,
92    /// The data type of the field,
93    /// can be any valid JSON value
94    #[serde(rename = "type")]
95    pub field_type: Value,
96    /// Whether the field is nullable
97    pub nullable: bool,
98    /// The children fields
99    pub children: Vec<ArrowJsonField>,
100    /// The dictionary for the field
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub dictionary: Option<ArrowJsonFieldDictionary>,
103    /// The metadata for the field, if any
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub metadata: Option<Value>,
106}
107
108impl From<&FieldRef> for ArrowJsonField {
109    fn from(value: &FieldRef) -> Self {
110        Self::from(value.as_ref())
111    }
112}
113
114impl From<&Field> for ArrowJsonField {
115    fn from(field: &Field) -> Self {
116        let metadata_value = match field.metadata().is_empty() {
117            false => {
118                let mut array = Vec::new();
119                for (k, v) in field.metadata() {
120                    let mut kv_map = SJMap::new();
121                    kv_map.insert(k.clone(), Value::String(v.clone()));
122                    array.push(Value::Object(kv_map));
123                }
124                if !array.is_empty() {
125                    Some(Value::Array(array))
126                } else {
127                    None
128                }
129            }
130            _ => None,
131        };
132
133        Self {
134            name: field.name().to_string(),
135            field_type: data_type_to_json(field.data_type()),
136            nullable: field.is_nullable(),
137            children: vec![],
138            dictionary: None, // TODO: not enough info
139            metadata: metadata_value,
140        }
141    }
142}
143
144/// Represents a dictionary-encoded field in the Arrow JSON format
145#[derive(Deserialize, Serialize, Debug)]
146pub struct ArrowJsonFieldDictionary {
147    /// A unique identifier for the dictionary
148    pub id: i64,
149    /// The type of the dictionary index
150    #[serde(rename = "indexType")]
151    pub index_type: DictionaryIndexType,
152    /// Whether the dictionary is ordered
153    #[serde(rename = "isOrdered")]
154    pub is_ordered: bool,
155}
156
157/// Type of an index for a dictionary-encoded field in the Arrow JSON format
158#[derive(Deserialize, Serialize, Debug)]
159pub struct DictionaryIndexType {
160    /// The name of the dictionary index type
161    pub name: String,
162    /// Whether the dictionary index type is signed
163    #[serde(rename = "isSigned")]
164    pub is_signed: bool,
165    /// The bit width of the dictionary index type
166    #[serde(rename = "bitWidth")]
167    pub bit_width: i64,
168}
169
170/// A struct that partially reads the Arrow JSON record batch
171#[derive(Deserialize, Serialize, Debug, Clone)]
172pub struct ArrowJsonBatch {
173    count: usize,
174    /// The columns in the record batch
175    pub columns: Vec<ArrowJsonColumn>,
176}
177
178/// A struct that partially reads the Arrow JSON dictionary batch
179#[derive(Deserialize, Serialize, Debug, Clone)]
180#[allow(non_snake_case)]
181pub struct ArrowJsonDictionaryBatch {
182    /// The unique identifier for the dictionary
183    pub id: i64,
184    /// The data for the dictionary
185    pub data: ArrowJsonBatch,
186}
187
188/// A struct that partially reads the Arrow JSON column/array
189#[derive(Deserialize, Serialize, Clone, Debug)]
190pub struct ArrowJsonColumn {
191    name: String,
192    /// The number of elements in the column
193    pub count: usize,
194    /// The validity bitmap to determine null values
195    #[serde(rename = "VALIDITY")]
196    pub validity: Option<Vec<u8>>,
197    /// The data values in the column
198    #[serde(rename = "DATA")]
199    pub data: Option<Vec<Value>>,
200    /// The offsets for variable-sized data types
201    #[serde(rename = "OFFSET")]
202    pub offset: Option<Vec<Value>>, // leaving as Value as 64-bit offsets are strings
203    /// The type id for union types
204    #[serde(rename = "TYPE_ID")]
205    pub type_id: Option<Vec<i8>>,
206    /// The children columns for nested types
207    pub children: Option<Vec<ArrowJsonColumn>>,
208}
209
210impl ArrowJson {
211    /// Compare the Arrow JSON with a record batch reader
212    pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
213        if !self.schema.equals_schema(&reader.schema()) {
214            return Ok(false);
215        }
216
217        for json_batch in self.get_record_batches()?.into_iter() {
218            let batch = reader.next();
219            match batch {
220                Some(Ok(batch)) => {
221                    if json_batch != batch {
222                        println!("json: {json_batch:?}");
223                        println!("batch: {batch:?}");
224                        return Ok(false);
225                    }
226                }
227                Some(Err(e)) => return Err(e),
228                None => return Ok(false),
229            }
230        }
231
232        Ok(true)
233    }
234
235    /// Convert the stored dictionaries to `Vec[RecordBatch]`
236    pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
237        let schema = self.schema.to_arrow_schema()?;
238
239        let mut dictionaries = HashMap::new();
240        self.dictionaries.iter().for_each(|dict_batches| {
241            dict_batches.iter().for_each(|d| {
242                dictionaries.insert(d.id, d.clone());
243            });
244        });
245
246        let batches: Result<Vec<_>> = self
247            .batches
248            .iter()
249            .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
250            .collect();
251
252        batches
253    }
254}
255
256impl ArrowJsonSchema {
257    /// Compare the Arrow JSON schema with the Arrow `Schema`
258    fn equals_schema(&self, schema: &Schema) -> bool {
259        let field_len = self.fields.len();
260        if field_len != schema.fields().len() {
261            return false;
262        }
263        for i in 0..field_len {
264            let json_field = &self.fields[i];
265            let field = schema.field(i);
266            if !json_field.equals_field(field) {
267                return false;
268            }
269        }
270        true
271    }
272
273    fn to_arrow_schema(&self) -> Result<Schema> {
274        let arrow_fields: Result<Vec<_>> = self
275            .fields
276            .iter()
277            .map(|field| field.to_arrow_field())
278            .collect();
279
280        if let Some(metadatas) = &self.metadata {
281            let mut metadata: HashMap<String, String> = HashMap::new();
282
283            metadatas.iter().for_each(|pair| {
284                let key = pair.get("key").unwrap();
285                let value = pair.get("value").unwrap();
286                metadata.insert(key.clone(), value.clone());
287            });
288
289            Ok(Schema::new_with_metadata(arrow_fields?, metadata))
290        } else {
291            Ok(Schema::new(arrow_fields?))
292        }
293    }
294}
295
296impl ArrowJsonField {
297    /// Compare the Arrow JSON field with the Arrow `Field`
298    fn equals_field(&self, field: &Field) -> bool {
299        // convert to a field
300        match self.to_arrow_field() {
301            Ok(self_field) => {
302                assert_eq!(&self_field, field, "Arrow fields not the same");
303                true
304            }
305            Err(e) => {
306                eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
307                false
308            }
309        }
310    }
311
312    /// Convert to an Arrow Field
313    /// TODO: convert to use an Into
314    fn to_arrow_field(&self) -> Result<Field> {
315        // a bit regressive, but we have to convert the field to JSON in order to convert it
316        let field =
317            serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
318        field_from_json(&field)
319    }
320}
321
322/// Generates a [`RecordBatch`] from an Arrow JSON batch, given a schema
323pub fn record_batch_from_json(
324    schema: &Schema,
325    json_batch: ArrowJsonBatch,
326    json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
327) -> Result<RecordBatch> {
328    let mut columns = vec![];
329
330    for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
331        let col = array_from_json(field, json_col, json_dictionaries)?;
332        columns.push(col);
333    }
334
335    RecordBatch::try_new(Arc::new(schema.clone()), columns)
336}
337
338/// Construct an Arrow array from a partially typed JSON column
339pub fn array_from_json(
340    field: &Field,
341    json_col: ArrowJsonColumn,
342    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
343) -> Result<ArrayRef> {
344    match field.data_type() {
345        DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
346        DataType::Boolean => {
347            let mut b = BooleanBuilder::with_capacity(json_col.count);
348            for (is_valid, value) in json_col
349                .validity
350                .as_ref()
351                .unwrap()
352                .iter()
353                .zip(json_col.data.unwrap())
354            {
355                match is_valid {
356                    1 => b.append_value(value.as_bool().unwrap()),
357                    _ => b.append_null(),
358                };
359            }
360            Ok(Arc::new(b.finish()))
361        }
362        DataType::Int8 => {
363            let mut b = Int8Builder::with_capacity(json_col.count);
364            for (is_valid, value) in json_col
365                .validity
366                .as_ref()
367                .unwrap()
368                .iter()
369                .zip(json_col.data.unwrap())
370            {
371                match is_valid {
372                    1 => b.append_value(value.as_i64().ok_or_else(|| {
373                        ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
374                    })? as i8),
375                    _ => b.append_null(),
376                };
377            }
378            Ok(Arc::new(b.finish()))
379        }
380        DataType::Int16 => {
381            let mut b = Int16Builder::with_capacity(json_col.count);
382            for (is_valid, value) in json_col
383                .validity
384                .as_ref()
385                .unwrap()
386                .iter()
387                .zip(json_col.data.unwrap())
388            {
389                match is_valid {
390                    1 => b.append_value(value.as_i64().unwrap() as i16),
391                    _ => b.append_null(),
392                };
393            }
394            Ok(Arc::new(b.finish()))
395        }
396        DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
397            let mut b = Int32Builder::with_capacity(json_col.count);
398            for (is_valid, value) in json_col
399                .validity
400                .as_ref()
401                .unwrap()
402                .iter()
403                .zip(json_col.data.unwrap())
404            {
405                match is_valid {
406                    1 => b.append_value(value.as_i64().unwrap() as i32),
407                    _ => b.append_null(),
408                };
409            }
410            let array = Arc::new(b.finish()) as ArrayRef;
411            arrow::compute::cast(&array, field.data_type())
412        }
413        DataType::Interval(IntervalUnit::YearMonth) => {
414            let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
415            for (is_valid, value) in json_col
416                .validity
417                .as_ref()
418                .unwrap()
419                .iter()
420                .zip(json_col.data.unwrap())
421            {
422                match is_valid {
423                    1 => b.append_value(value.as_i64().unwrap() as i32),
424                    _ => b.append_null(),
425                };
426            }
427            Ok(Arc::new(b.finish()))
428        }
429        DataType::Int64
430        | DataType::Date64
431        | DataType::Time64(_)
432        | DataType::Timestamp(_, _)
433        | DataType::Duration(_) => {
434            let mut b = Int64Builder::with_capacity(json_col.count);
435            for (is_valid, value) in json_col
436                .validity
437                .as_ref()
438                .unwrap()
439                .iter()
440                .zip(json_col.data.unwrap())
441            {
442                match is_valid {
443                    1 => b.append_value(match value {
444                        Value::Number(n) => n.as_i64().unwrap(),
445                        Value::String(s) => s.parse().expect("Unable to parse string as i64"),
446                        _ => panic!("Unable to parse {value:?} as number"),
447                    }),
448                    _ => b.append_null(),
449                };
450            }
451            let array = Arc::new(b.finish()) as ArrayRef;
452            arrow::compute::cast(&array, field.data_type())
453        }
454        DataType::Interval(IntervalUnit::DayTime) => {
455            let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
456            for (is_valid, value) in json_col
457                .validity
458                .as_ref()
459                .unwrap()
460                .iter()
461                .zip(json_col.data.unwrap())
462            {
463                match is_valid {
464                    1 => b.append_value(match value {
465                        Value::Object(ref map)
466                            if map.contains_key("days") && map.contains_key("milliseconds") =>
467                        {
468                            match field.data_type() {
469                                DataType::Interval(IntervalUnit::DayTime) => {
470                                    let days = map.get("days").unwrap();
471                                    let milliseconds = map.get("milliseconds").unwrap();
472
473                                    match (days, milliseconds) {
474                                        (Value::Number(d), Value::Number(m)) => {
475                                            let days = d.as_i64().unwrap() as _;
476                                            let millis = m.as_i64().unwrap() as _;
477                                            IntervalDayTime::new(days, millis)
478                                        }
479                                        _ => {
480                                            panic!("Unable to parse {value:?} as interval daytime")
481                                        }
482                                    }
483                                }
484                                _ => panic!("Unable to parse {value:?} as interval daytime"),
485                            }
486                        }
487                        _ => panic!("Unable to parse {value:?} as number"),
488                    }),
489                    _ => b.append_null(),
490                };
491            }
492            Ok(Arc::new(b.finish()))
493        }
494        DataType::UInt8 => {
495            let mut b = UInt8Builder::with_capacity(json_col.count);
496            for (is_valid, value) in json_col
497                .validity
498                .as_ref()
499                .unwrap()
500                .iter()
501                .zip(json_col.data.unwrap())
502            {
503                match is_valid {
504                    1 => b.append_value(value.as_u64().unwrap() as u8),
505                    _ => b.append_null(),
506                };
507            }
508            Ok(Arc::new(b.finish()))
509        }
510        DataType::UInt16 => {
511            let mut b = UInt16Builder::with_capacity(json_col.count);
512            for (is_valid, value) in json_col
513                .validity
514                .as_ref()
515                .unwrap()
516                .iter()
517                .zip(json_col.data.unwrap())
518            {
519                match is_valid {
520                    1 => b.append_value(value.as_u64().unwrap() as u16),
521                    _ => b.append_null(),
522                };
523            }
524            Ok(Arc::new(b.finish()))
525        }
526        DataType::UInt32 => {
527            let mut b = UInt32Builder::with_capacity(json_col.count);
528            for (is_valid, value) in json_col
529                .validity
530                .as_ref()
531                .unwrap()
532                .iter()
533                .zip(json_col.data.unwrap())
534            {
535                match is_valid {
536                    1 => b.append_value(value.as_u64().unwrap() as u32),
537                    _ => b.append_null(),
538                };
539            }
540            Ok(Arc::new(b.finish()))
541        }
542        DataType::UInt64 => {
543            let mut b = UInt64Builder::with_capacity(json_col.count);
544            for (is_valid, value) in json_col
545                .validity
546                .as_ref()
547                .unwrap()
548                .iter()
549                .zip(json_col.data.unwrap())
550            {
551                match is_valid {
552                    1 => {
553                        if value.is_string() {
554                            b.append_value(
555                                value
556                                    .as_str()
557                                    .unwrap()
558                                    .parse()
559                                    .expect("Unable to parse string as u64"),
560                            )
561                        } else if value.is_number() {
562                            b.append_value(value.as_u64().expect("Unable to read number as u64"))
563                        } else {
564                            panic!("Unable to parse value {value:?} as u64")
565                        }
566                    }
567                    _ => b.append_null(),
568                };
569            }
570            Ok(Arc::new(b.finish()))
571        }
572        DataType::Interval(IntervalUnit::MonthDayNano) => {
573            let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
574            for (is_valid, value) in json_col
575                .validity
576                .as_ref()
577                .unwrap()
578                .iter()
579                .zip(json_col.data.unwrap())
580            {
581                match is_valid {
582                    1 => b.append_value(match value {
583                        Value::Object(v) => {
584                            let months = v.get("months").unwrap();
585                            let days = v.get("days").unwrap();
586                            let nanoseconds = v.get("nanoseconds").unwrap();
587                            match (months, days, nanoseconds) {
588                                (
589                                    Value::Number(months),
590                                    Value::Number(days),
591                                    Value::Number(nanoseconds),
592                                ) => {
593                                    let months = months.as_i64().unwrap() as i32;
594                                    let days = days.as_i64().unwrap() as i32;
595                                    let nanoseconds = nanoseconds.as_i64().unwrap();
596                                    IntervalMonthDayNano::new(months, days, nanoseconds)
597                                }
598                                (_, _, _) => {
599                                    panic!("Unable to parse {v:?} as MonthDayNano")
600                                }
601                            }
602                        }
603                        _ => panic!("Unable to parse {value:?} as MonthDayNano"),
604                    }),
605                    _ => b.append_null(),
606                };
607            }
608            Ok(Arc::new(b.finish()))
609        }
610        DataType::Float32 => {
611            let mut b = Float32Builder::with_capacity(json_col.count);
612            for (is_valid, value) in json_col
613                .validity
614                .as_ref()
615                .unwrap()
616                .iter()
617                .zip(json_col.data.unwrap())
618            {
619                match is_valid {
620                    1 => b.append_value(value.as_f64().unwrap() as f32),
621                    _ => b.append_null(),
622                };
623            }
624            Ok(Arc::new(b.finish()))
625        }
626        DataType::Float64 => {
627            let mut b = Float64Builder::with_capacity(json_col.count);
628            for (is_valid, value) in json_col
629                .validity
630                .as_ref()
631                .unwrap()
632                .iter()
633                .zip(json_col.data.unwrap())
634            {
635                match is_valid {
636                    1 => b.append_value(value.as_f64().unwrap()),
637                    _ => b.append_null(),
638                };
639            }
640            Ok(Arc::new(b.finish()))
641        }
642        DataType::Binary => {
643            let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
644            for (is_valid, value) in json_col
645                .validity
646                .as_ref()
647                .unwrap()
648                .iter()
649                .zip(json_col.data.unwrap())
650            {
651                match is_valid {
652                    1 => {
653                        let v = decode(value.as_str().unwrap()).unwrap();
654                        b.append_value(&v)
655                    }
656                    _ => b.append_null(),
657                };
658            }
659            Ok(Arc::new(b.finish()))
660        }
661        DataType::LargeBinary => {
662            let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
663            for (is_valid, value) in json_col
664                .validity
665                .as_ref()
666                .unwrap()
667                .iter()
668                .zip(json_col.data.unwrap())
669            {
670                match is_valid {
671                    1 => {
672                        let v = decode(value.as_str().unwrap()).unwrap();
673                        b.append_value(&v)
674                    }
675                    _ => b.append_null(),
676                };
677            }
678            Ok(Arc::new(b.finish()))
679        }
680        DataType::Utf8 => {
681            let mut b = StringBuilder::with_capacity(json_col.count, 1024);
682            for (is_valid, value) in json_col
683                .validity
684                .as_ref()
685                .unwrap()
686                .iter()
687                .zip(json_col.data.unwrap())
688            {
689                match is_valid {
690                    1 => b.append_value(value.as_str().unwrap()),
691                    _ => b.append_null(),
692                };
693            }
694            Ok(Arc::new(b.finish()))
695        }
696        DataType::LargeUtf8 => {
697            let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
698            for (is_valid, value) in json_col
699                .validity
700                .as_ref()
701                .unwrap()
702                .iter()
703                .zip(json_col.data.unwrap())
704            {
705                match is_valid {
706                    1 => b.append_value(value.as_str().unwrap()),
707                    _ => b.append_null(),
708                };
709            }
710            Ok(Arc::new(b.finish()))
711        }
712        DataType::FixedSizeBinary(len) => {
713            let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
714            for (is_valid, value) in json_col
715                .validity
716                .as_ref()
717                .unwrap()
718                .iter()
719                .zip(json_col.data.unwrap())
720            {
721                match is_valid {
722                    1 => {
723                        let v = hex::decode(value.as_str().unwrap()).unwrap();
724                        b.append_value(&v)?
725                    }
726                    _ => b.append_null(),
727                };
728            }
729            Ok(Arc::new(b.finish()))
730        }
731        DataType::List(child_field) => {
732            let null_buf = create_null_buf(&json_col);
733            let children = json_col.children.clone().unwrap();
734            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
735            let offsets: Vec<i32> = json_col
736                .offset
737                .unwrap()
738                .iter()
739                .map(|v| v.as_i64().unwrap() as i32)
740                .collect();
741            let list_data = ArrayData::builder(field.data_type().clone())
742                .len(json_col.count)
743                .offset(0)
744                .add_buffer(Buffer::from(offsets.to_byte_slice()))
745                .add_child_data(child_array.into_data())
746                .null_bit_buffer(Some(null_buf))
747                .build()
748                .unwrap();
749            Ok(Arc::new(ListArray::from(list_data)))
750        }
751        DataType::LargeList(child_field) => {
752            let null_buf = create_null_buf(&json_col);
753            let children = json_col.children.clone().unwrap();
754            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
755            let offsets: Vec<i64> = json_col
756                .offset
757                .unwrap()
758                .iter()
759                .map(|v| match v {
760                    Value::Number(n) => n.as_i64().unwrap(),
761                    Value::String(s) => s.parse::<i64>().unwrap(),
762                    _ => panic!("64-bit offset must be either string or number"),
763                })
764                .collect();
765            let list_data = ArrayData::builder(field.data_type().clone())
766                .len(json_col.count)
767                .offset(0)
768                .add_buffer(Buffer::from(offsets.to_byte_slice()))
769                .add_child_data(child_array.into_data())
770                .null_bit_buffer(Some(null_buf))
771                .build()
772                .unwrap();
773            Ok(Arc::new(LargeListArray::from(list_data)))
774        }
775        DataType::FixedSizeList(child_field, _) => {
776            let children = json_col.children.clone().unwrap();
777            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
778            let null_buf = create_null_buf(&json_col);
779            let list_data = ArrayData::builder(field.data_type().clone())
780                .len(json_col.count)
781                .add_child_data(child_array.into_data())
782                .null_bit_buffer(Some(null_buf))
783                .build()
784                .unwrap();
785            Ok(Arc::new(FixedSizeListArray::from(list_data)))
786        }
787        DataType::Struct(fields) => {
788            // construct struct with null data
789            let null_buf = create_null_buf(&json_col);
790            let mut array_data = ArrayData::builder(field.data_type().clone())
791                .len(json_col.count)
792                .null_bit_buffer(Some(null_buf));
793
794            for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
795                let array = array_from_json(field, col, dictionaries)?;
796                array_data = array_data.add_child_data(array.into_data());
797            }
798
799            let array = StructArray::from(array_data.build().unwrap());
800            Ok(Arc::new(array))
801        }
802        DataType::Dictionary(key_type, value_type) => {
803            #[allow(deprecated)]
804            let dict_id = field.dict_id().ok_or_else(|| {
805                ArrowError::JsonError(format!("Unable to find dict_id for field {field}"))
806            })?;
807            // find dictionary
808            let dictionary = dictionaries
809                .ok_or_else(|| {
810                    ArrowError::JsonError(format!(
811                        "Unable to find any dictionaries for field {field}"
812                    ))
813                })?
814                .get(&dict_id);
815            match dictionary {
816                Some(dictionary) => dictionary_array_from_json(
817                    field,
818                    json_col,
819                    key_type,
820                    value_type,
821                    dictionary,
822                    dictionaries,
823                ),
824                None => Err(ArrowError::JsonError(format!(
825                    "Unable to find dictionary for field {field}"
826                ))),
827            }
828        }
829        DataType::Decimal32(precision, scale) => {
830            let mut b = Decimal32Builder::with_capacity(json_col.count);
831            for (is_valid, value) in json_col
832                .validity
833                .as_ref()
834                .unwrap()
835                .iter()
836                .zip(json_col.data.unwrap())
837            {
838                match is_valid {
839                    1 => b.append_value(value.as_str().unwrap().parse::<i32>().unwrap()),
840                    _ => b.append_null(),
841                };
842            }
843            Ok(Arc::new(
844                b.finish().with_precision_and_scale(*precision, *scale)?,
845            ))
846        }
847        DataType::Decimal64(precision, scale) => {
848            let mut b = Decimal64Builder::with_capacity(json_col.count);
849            for (is_valid, value) in json_col
850                .validity
851                .as_ref()
852                .unwrap()
853                .iter()
854                .zip(json_col.data.unwrap())
855            {
856                match is_valid {
857                    1 => b.append_value(value.as_str().unwrap().parse::<i64>().unwrap()),
858                    _ => b.append_null(),
859                };
860            }
861            Ok(Arc::new(
862                b.finish().with_precision_and_scale(*precision, *scale)?,
863            ))
864        }
865        DataType::Decimal128(precision, scale) => {
866            let mut b = Decimal128Builder::with_capacity(json_col.count);
867            for (is_valid, value) in json_col
868                .validity
869                .as_ref()
870                .unwrap()
871                .iter()
872                .zip(json_col.data.unwrap())
873            {
874                match is_valid {
875                    1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
876                    _ => b.append_null(),
877                };
878            }
879            Ok(Arc::new(
880                b.finish().with_precision_and_scale(*precision, *scale)?,
881            ))
882        }
883        DataType::Decimal256(precision, scale) => {
884            let mut b = Decimal256Builder::with_capacity(json_col.count);
885            for (is_valid, value) in json_col
886                .validity
887                .as_ref()
888                .unwrap()
889                .iter()
890                .zip(json_col.data.unwrap())
891            {
892                match is_valid {
893                    1 => {
894                        let str = value.as_str().unwrap();
895                        let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
896                        let integer_bytes = integer.to_signed_bytes_le();
897                        let mut bytes = if integer.is_positive() {
898                            [0_u8; 32]
899                        } else {
900                            [255_u8; 32]
901                        };
902                        bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
903                        b.append_value(i256::from_le_bytes(bytes));
904                    }
905                    _ => b.append_null(),
906                }
907            }
908            Ok(Arc::new(
909                b.finish().with_precision_and_scale(*precision, *scale)?,
910            ))
911        }
912        DataType::Map(child_field, _) => {
913            let null_buf = create_null_buf(&json_col);
914            let children = json_col.children.clone().unwrap();
915            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
916            let offsets: Vec<i32> = json_col
917                .offset
918                .unwrap()
919                .iter()
920                .map(|v| v.as_i64().unwrap() as i32)
921                .collect();
922            let array_data = ArrayData::builder(field.data_type().clone())
923                .len(json_col.count)
924                .add_buffer(Buffer::from(offsets.to_byte_slice()))
925                .add_child_data(child_array.into_data())
926                .null_bit_buffer(Some(null_buf))
927                .build()
928                .unwrap();
929
930            let array = MapArray::from(array_data);
931            Ok(Arc::new(array))
932        }
933        DataType::Union(fields, _) => {
934            let type_ids = if let Some(type_id) = json_col.type_id {
935                type_id
936            } else {
937                return Err(ArrowError::JsonError(
938                    "Cannot find expected type_id in json column".to_string(),
939                ));
940            };
941
942            let offset: Option<ScalarBuffer<i32>> = json_col
943                .offset
944                .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
945
946            let mut children = Vec::with_capacity(fields.len());
947            for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
948                let array = array_from_json(field, col, dictionaries)?;
949                children.push(array);
950            }
951
952            let array =
953                UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
954            Ok(Arc::new(array))
955        }
956        t => Err(ArrowError::JsonError(format!(
957            "data type {t} not supported"
958        ))),
959    }
960}
961
962/// Construct a [`DictionaryArray`] from a partially typed JSON column
963pub fn dictionary_array_from_json(
964    field: &Field,
965    json_col: ArrowJsonColumn,
966    dict_key: &DataType,
967    dict_value: &DataType,
968    dictionary: &ArrowJsonDictionaryBatch,
969    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
970) -> Result<ArrayRef> {
971    match dict_key {
972        DataType::Int8
973        | DataType::Int16
974        | DataType::Int32
975        | DataType::Int64
976        | DataType::UInt8
977        | DataType::UInt16
978        | DataType::UInt32
979        | DataType::UInt64 => {
980            let null_buf = create_null_buf(&json_col);
981
982            // build the key data into a buffer, then construct values separately
983            #[allow(deprecated)]
984            let key_field = Field::new_dict(
985                "key",
986                dict_key.clone(),
987                field.is_nullable(),
988                #[allow(deprecated)]
989                field
990                    .dict_id()
991                    .expect("Dictionary fields must have a dict_id value"),
992                field
993                    .dict_is_ordered()
994                    .expect("Dictionary fields must have a dict_is_ordered value"),
995            );
996            let keys = array_from_json(&key_field, json_col, None)?;
997            // note: not enough info on nullability of dictionary
998            let value_field = Field::new("value", dict_value.clone(), true);
999            let values = array_from_json(
1000                &value_field,
1001                dictionary.data.columns[0].clone(),
1002                dictionaries,
1003            )?;
1004
1005            // convert key and value to dictionary data
1006            let dict_data = ArrayData::builder(field.data_type().clone())
1007                .len(keys.len())
1008                .add_buffer(keys.to_data().buffers()[0].clone())
1009                .null_bit_buffer(Some(null_buf))
1010                .add_child_data(values.into_data())
1011                .build()
1012                .unwrap();
1013
1014            let array = match dict_key {
1015                DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
1016                DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
1017                DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
1018                DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
1019                DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
1020                DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
1021                DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
1022                DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
1023                _ => unreachable!(),
1024            };
1025            Ok(array)
1026        }
1027        _ => Err(ArrowError::JsonError(format!(
1028            "Dictionary key type {dict_key:?} not supported"
1029        ))),
1030    }
1031}
1032
1033/// A helper to create a null buffer from a `Vec<bool>`
1034fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
1035    let num_bytes = bit_util::ceil(json_col.count, 8);
1036    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
1037    json_col
1038        .validity
1039        .clone()
1040        .unwrap()
1041        .iter()
1042        .enumerate()
1043        .for_each(|(i, v)| {
1044            let null_slice = null_buf.as_slice_mut();
1045            if *v != 0 {
1046                bit_util::set_bit(null_slice, i);
1047            }
1048        });
1049    null_buf.into()
1050}
1051
1052impl ArrowJsonBatch {
1053    /// Convert a [`RecordBatch`] to an [`ArrowJsonBatch`]
1054    ///
1055    /// <div class="warning">
1056    ///
1057    /// This function is **deliberately incomplete**! As noted in the crate-level documentation,
1058    /// this crate is only intended for use within the Arrow project itself.
1059    ///
1060    /// Right now, this function only supports `DataType::Int8` columns. Other data types will lead
1061    /// to an empty `ArrowJsonColumn`.
1062    ///
1063    /// </div>
1064    pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1065        let mut json_batch = ArrowJsonBatch {
1066            count: batch.num_rows(),
1067            columns: Vec::with_capacity(batch.num_columns()),
1068        };
1069
1070        for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1071            let json_col = match field.data_type() {
1072                DataType::Int8 => {
1073                    let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1074
1075                    let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1076                    let mut data: Vec<Value> = Vec::with_capacity(col.len());
1077
1078                    for i in 0..col.len() {
1079                        if col.is_null(i) {
1080                            validity.push(1);
1081                            data.push(0i8.into());
1082                        } else {
1083                            validity.push(0);
1084                            data.push(col.value(i).into());
1085                        }
1086                    }
1087
1088                    ArrowJsonColumn {
1089                        name: field.name().clone(),
1090                        count: col.len(),
1091                        validity: Some(validity),
1092                        data: Some(data),
1093                        offset: None,
1094                        type_id: None,
1095                        children: None,
1096                    }
1097                }
1098                _ => ArrowJsonColumn {
1099                    name: field.name().clone(),
1100                    count: col.len(),
1101                    validity: None,
1102                    data: None,
1103                    offset: None,
1104                    type_id: None,
1105                    children: None,
1106                },
1107            };
1108
1109            json_batch.columns.push(json_col);
1110        }
1111
1112        json_batch
1113    }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118    use super::*;
1119
1120    use std::fs::File;
1121    use std::io::Read;
1122
1123    #[test]
1124    fn test_schema_equality() {
1125        let json = r#"
1126        {
1127            "fields": [
1128                {
1129                    "name": "c1",
1130                    "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1131                    "nullable": true,
1132                    "children": []
1133                },
1134                {
1135                    "name": "c2",
1136                    "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1137                    "nullable": true,
1138                    "children": []
1139                },
1140                {
1141                    "name": "c3",
1142                    "type": {"name": "utf8"},
1143                    "nullable": true,
1144                    "children": []
1145                },
1146                {
1147                    "name": "c4",
1148                    "type": {
1149                        "name": "list"
1150                    },
1151                    "nullable": true,
1152                    "children": [
1153                        {
1154                            "name": "custom_item",
1155                            "type": {
1156                                "name": "int",
1157                                "isSigned": true,
1158                                "bitWidth": 32
1159                            },
1160                            "nullable": false,
1161                            "children": []
1162                        }
1163                    ]
1164                }
1165            ]
1166        }"#;
1167        let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1168        let schema = Schema::new(vec![
1169            Field::new("c1", DataType::Int32, true),
1170            Field::new("c2", DataType::Float64, true),
1171            Field::new("c3", DataType::Utf8, true),
1172            Field::new(
1173                "c4",
1174                DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1175                true,
1176            ),
1177        ]);
1178        assert!(json_schema.equals_schema(&schema));
1179    }
1180
1181    #[test]
1182    fn test_arrow_data_equality() {
1183        let secs_tz = Some("Europe/Budapest".into());
1184        let millis_tz = Some("America/New_York".into());
1185        let micros_tz = Some("UTC".into());
1186        let nanos_tz = Some("Africa/Johannesburg".into());
1187
1188        let schema = Schema::new(vec![
1189            Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1190                [("k".to_string(), "v".to_string())]
1191                    .iter()
1192                    .cloned()
1193                    .collect(),
1194            ),
1195            Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1196                [("k2".to_string(), "v2".to_string())]
1197                    .iter()
1198                    .cloned()
1199                    .collect(),
1200            ),
1201            Field::new("bools", DataType::Boolean, true),
1202            Field::new("int8s", DataType::Int8, true),
1203            Field::new("int16s", DataType::Int16, true),
1204            Field::new("int32s", DataType::Int32, true),
1205            Field::new("int64s", DataType::Int64, true),
1206            Field::new("uint8s", DataType::UInt8, true),
1207            Field::new("uint16s", DataType::UInt16, true),
1208            Field::new("uint32s", DataType::UInt32, true),
1209            Field::new("uint64s", DataType::UInt64, true),
1210            Field::new("float32s", DataType::Float32, true),
1211            Field::new("float64s", DataType::Float64, true),
1212            Field::new("date_days", DataType::Date32, true),
1213            Field::new("date_millis", DataType::Date64, true),
1214            Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1215            Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1216            Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1217            Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1218            Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1219            Field::new(
1220                "ts_millis",
1221                DataType::Timestamp(TimeUnit::Millisecond, None),
1222                true,
1223            ),
1224            Field::new(
1225                "ts_micros",
1226                DataType::Timestamp(TimeUnit::Microsecond, None),
1227                true,
1228            ),
1229            Field::new(
1230                "ts_nanos",
1231                DataType::Timestamp(TimeUnit::Nanosecond, None),
1232                true,
1233            ),
1234            Field::new(
1235                "ts_secs_tz",
1236                DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1237                true,
1238            ),
1239            Field::new(
1240                "ts_millis_tz",
1241                DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1242                true,
1243            ),
1244            Field::new(
1245                "ts_micros_tz",
1246                DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1247                true,
1248            ),
1249            Field::new(
1250                "ts_nanos_tz",
1251                DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1252                true,
1253            ),
1254            Field::new("utf8s", DataType::Utf8, true),
1255            Field::new(
1256                "lists",
1257                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1258                true,
1259            ),
1260            Field::new(
1261                "structs",
1262                DataType::Struct(Fields::from(vec![
1263                    Field::new("int32s", DataType::Int32, true),
1264                    Field::new("utf8s", DataType::Utf8, true),
1265                ])),
1266                true,
1267            ),
1268        ]);
1269
1270        let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1271        let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1272        let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1273        let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1274        let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1275        let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1276        let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1277        let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1278        let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1279        let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1280        let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1281        let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1282        let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1283        let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1284        let date_millis = Date64Array::from(vec![
1285            Some(167903550396207),
1286            Some(29923997007884),
1287            Some(30612271819236),
1288        ]);
1289        let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1290        let time_millis =
1291            Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1292        let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1293        let time_nanos =
1294            Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1295        let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1296        let ts_millis =
1297            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1298        let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1299        let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1300        let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1301            .with_timezone_opt(secs_tz);
1302        let ts_millis_tz =
1303            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1304                .with_timezone_opt(millis_tz);
1305        let ts_micros_tz =
1306            TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1307        let ts_nanos_tz =
1308            TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1309                .with_timezone_opt(nanos_tz);
1310        let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1311
1312        let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1313        let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1314        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1315        let list_data = ArrayData::builder(list_data_type)
1316            .len(3)
1317            .add_buffer(value_offsets)
1318            .add_child_data(value_data.into_data())
1319            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1320            .build()
1321            .unwrap();
1322        let lists = ListArray::from(list_data);
1323
1324        let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1325        let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1326        let struct_data_type = DataType::Struct(Fields::from(vec![
1327            Field::new("int32s", DataType::Int32, true),
1328            Field::new("utf8s", DataType::Utf8, true),
1329        ]));
1330        let struct_data = ArrayData::builder(struct_data_type)
1331            .len(3)
1332            .add_child_data(structs_int32s.into_data())
1333            .add_child_data(structs_utf8s.into_data())
1334            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1335            .build()
1336            .unwrap();
1337        let structs = StructArray::from(struct_data);
1338
1339        let record_batch = RecordBatch::try_new(
1340            Arc::new(schema.clone()),
1341            vec![
1342                Arc::new(bools_with_metadata_map),
1343                Arc::new(bools_with_metadata_vec),
1344                Arc::new(bools),
1345                Arc::new(int8s),
1346                Arc::new(int16s),
1347                Arc::new(int32s),
1348                Arc::new(int64s),
1349                Arc::new(uint8s),
1350                Arc::new(uint16s),
1351                Arc::new(uint32s),
1352                Arc::new(uint64s),
1353                Arc::new(float32s),
1354                Arc::new(float64s),
1355                Arc::new(date_days),
1356                Arc::new(date_millis),
1357                Arc::new(time_secs),
1358                Arc::new(time_millis),
1359                Arc::new(time_micros),
1360                Arc::new(time_nanos),
1361                Arc::new(ts_secs),
1362                Arc::new(ts_millis),
1363                Arc::new(ts_micros),
1364                Arc::new(ts_nanos),
1365                Arc::new(ts_secs_tz),
1366                Arc::new(ts_millis_tz),
1367                Arc::new(ts_micros_tz),
1368                Arc::new(ts_nanos_tz),
1369                Arc::new(utf8s),
1370                Arc::new(lists),
1371                Arc::new(structs),
1372            ],
1373        )
1374        .unwrap();
1375        let mut file = File::open("data/integration.json").unwrap();
1376        let mut json = String::new();
1377        file.read_to_string(&mut json).unwrap();
1378        let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1379        // test schemas
1380        assert!(arrow_json.schema.equals_schema(&schema));
1381        // test record batch
1382        assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1383    }
1384}