Skip to main content

arrow_integration_test/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Partial support for the [Apache Arrow JSON test data format](https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format)
19//!
20//! These utilities define structs that read the integration JSON format for integration testing purposes.
21//!
22//! This is not a canonical format, but provides a human-readable way of verifying language implementations
23//!
24//! <div class="warning">
25//!
26//! This crate is **only intended for integration testing the
27//! [Arrow project](https://github.com/apache/arrow-rs)**. It is not [intended for usage outside of
28//! this context](https://github.com/apache/arrow-rs/issues/8684#issuecomment-3433193158).
29//!
30//! </div>
31
32#![doc(
33    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
34    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
35)]
36#![cfg_attr(docsrs, feature(doc_cfg))]
37#![warn(missing_docs)]
38use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
39use hex::decode;
40use num_bigint::BigInt;
41use num_traits::Signed;
42use serde::{Deserialize, Serialize};
43use serde_json::{Map as SJMap, Value};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use arrow::array::*;
48use arrow::buffer::{Buffer, MutableBuffer};
49use arrow::datatypes::*;
50use arrow::error::{ArrowError, Result};
51use arrow::util::bit_util;
52
53mod datatype;
54mod field;
55mod schema;
56
57pub use datatype::*;
58pub use field::*;
59pub use schema::*;
60
61/// A struct that represents an Arrow file with a schema and record batches
62///
63/// See <https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format>
64#[derive(Deserialize, Serialize, Debug)]
65pub struct ArrowJson {
66    /// The Arrow schema for JSON file
67    pub schema: ArrowJsonSchema,
68    /// The `RecordBatch`es in the JSON file
69    pub batches: Vec<ArrowJsonBatch>,
70    /// The dictionaries in the JSON file
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
73}
74
75/// A struct that partially reads the Arrow JSON schema.
76///
77/// Fields are left as JSON `Value` as they vary by `DataType`
78#[derive(Deserialize, Serialize, Debug)]
79pub struct ArrowJsonSchema {
80    /// An array of JSON fields
81    pub fields: Vec<ArrowJsonField>,
82    /// An array of metadata key-value pairs
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<Vec<HashMap<String, String>>>,
85}
86
87/// Fields are left as JSON `Value` as they vary by `DataType`
88#[derive(Deserialize, Serialize, Debug)]
89pub struct ArrowJsonField {
90    /// The name of the field
91    pub name: String,
92    /// The data type of the field,
93    /// can be any valid JSON value
94    #[serde(rename = "type")]
95    pub field_type: Value,
96    /// Whether the field is nullable
97    pub nullable: bool,
98    /// The children fields
99    pub children: Vec<ArrowJsonField>,
100    /// The dictionary for the field
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub dictionary: Option<ArrowJsonFieldDictionary>,
103    /// The metadata for the field, if any
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub metadata: Option<Value>,
106}
107
108impl From<&FieldRef> for ArrowJsonField {
109    fn from(value: &FieldRef) -> Self {
110        Self::from(value.as_ref())
111    }
112}
113
114impl From<&Field> for ArrowJsonField {
115    fn from(field: &Field) -> Self {
116        let metadata_value = match field.metadata().is_empty() {
117            false => {
118                let mut array = Vec::new();
119                for (k, v) in field.metadata() {
120                    let mut kv_map = SJMap::new();
121                    kv_map.insert(k.clone(), Value::String(v.clone()));
122                    array.push(Value::Object(kv_map));
123                }
124                if !array.is_empty() {
125                    Some(Value::Array(array))
126                } else {
127                    None
128                }
129            }
130            _ => None,
131        };
132
133        Self {
134            name: field.name().to_string(),
135            field_type: data_type_to_json(field.data_type()),
136            nullable: field.is_nullable(),
137            children: vec![],
138            dictionary: None, // TODO: not enough info
139            metadata: metadata_value,
140        }
141    }
142}
143
144/// Represents a dictionary-encoded field in the Arrow JSON format
145#[derive(Deserialize, Serialize, Debug)]
146pub struct ArrowJsonFieldDictionary {
147    /// A unique identifier for the dictionary
148    pub id: i64,
149    /// The type of the dictionary index
150    #[serde(rename = "indexType")]
151    pub index_type: DictionaryIndexType,
152    /// Whether the dictionary is ordered
153    #[serde(rename = "isOrdered")]
154    pub is_ordered: bool,
155}
156
157/// Type of an index for a dictionary-encoded field in the Arrow JSON format
158#[derive(Deserialize, Serialize, Debug)]
159pub struct DictionaryIndexType {
160    /// The name of the dictionary index type
161    pub name: String,
162    /// Whether the dictionary index type is signed
163    #[serde(rename = "isSigned")]
164    pub is_signed: bool,
165    /// The bit width of the dictionary index type
166    #[serde(rename = "bitWidth")]
167    pub bit_width: i64,
168}
169
170/// A struct that partially reads the Arrow JSON record batch
171#[derive(Deserialize, Serialize, Debug, Clone)]
172pub struct ArrowJsonBatch {
173    count: usize,
174    /// The columns in the record batch
175    pub columns: Vec<ArrowJsonColumn>,
176}
177
178/// A struct that partially reads the Arrow JSON dictionary batch
179#[derive(Deserialize, Serialize, Debug, Clone)]
180#[allow(non_snake_case)]
181pub struct ArrowJsonDictionaryBatch {
182    /// The unique identifier for the dictionary
183    pub id: i64,
184    /// The data for the dictionary
185    pub data: ArrowJsonBatch,
186}
187
188/// A struct that partially reads the Arrow JSON column/array
189#[derive(Deserialize, Serialize, Clone, Debug)]
190pub struct ArrowJsonColumn {
191    name: String,
192    /// The number of elements in the column
193    pub count: usize,
194    /// The validity bitmap to determine null values
195    #[serde(rename = "VALIDITY")]
196    pub validity: Option<Vec<u8>>,
197    /// The data values in the column
198    #[serde(rename = "DATA")]
199    pub data: Option<Vec<Value>>,
200    /// The offsets for variable-sized data types
201    #[serde(rename = "OFFSET")]
202    pub offset: Option<Vec<Value>>, // leaving as Value as 64-bit offsets are strings
203    /// The type id for union types
204    #[serde(rename = "TYPE_ID")]
205    pub type_id: Option<Vec<i8>>,
206    /// The sizes for ListView/LargeListView types
207    #[serde(rename = "SIZE")]
208    pub size: Option<Vec<Value>>,
209    /// The views for BinaryView/Utf8View types
210    #[serde(rename = "VIEWS")]
211    pub views: Option<Vec<Value>>,
212    /// The variadic data buffers for BinaryView/Utf8View types
213    #[serde(rename = "VARIADIC_DATA_BUFFERS")]
214    pub variadic_data_buffers: Option<Vec<String>>,
215    /// The children columns for nested types
216    pub children: Option<Vec<ArrowJsonColumn>>,
217}
218
219impl ArrowJson {
220    /// Compare the Arrow JSON with a record batch reader
221    pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
222        if !self.schema.equals_schema(&reader.schema()) {
223            return Ok(false);
224        }
225
226        for json_batch in self.get_record_batches()?.into_iter() {
227            let batch = reader.next();
228            match batch {
229                Some(Ok(batch)) => {
230                    if json_batch != batch {
231                        println!("json: {json_batch:?}");
232                        println!("batch: {batch:?}");
233                        return Ok(false);
234                    }
235                }
236                Some(Err(e)) => return Err(e),
237                None => return Ok(false),
238            }
239        }
240
241        Ok(true)
242    }
243
244    /// Convert the stored dictionaries to `Vec[RecordBatch]`
245    pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
246        let schema = self.schema.to_arrow_schema()?;
247
248        let mut dictionaries = HashMap::new();
249        self.dictionaries.iter().for_each(|dict_batches| {
250            dict_batches.iter().for_each(|d| {
251                dictionaries.insert(d.id, d.clone());
252            });
253        });
254
255        let batches: Result<Vec<_>> = self
256            .batches
257            .iter()
258            .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
259            .collect();
260
261        batches
262    }
263}
264
265impl ArrowJsonSchema {
266    /// Compare the Arrow JSON schema with the Arrow `Schema`
267    fn equals_schema(&self, schema: &Schema) -> bool {
268        let field_len = self.fields.len();
269        if field_len != schema.fields().len() {
270            return false;
271        }
272        for i in 0..field_len {
273            let json_field = &self.fields[i];
274            let field = schema.field(i);
275            if !json_field.equals_field(field) {
276                return false;
277            }
278        }
279        true
280    }
281
282    fn to_arrow_schema(&self) -> Result<Schema> {
283        let arrow_fields: Result<Vec<_>> = self
284            .fields
285            .iter()
286            .map(|field| field.to_arrow_field())
287            .collect();
288
289        if let Some(metadatas) = &self.metadata {
290            let mut metadata: HashMap<String, String> = HashMap::new();
291
292            metadatas.iter().for_each(|pair| {
293                let key = pair.get("key").unwrap();
294                let value = pair.get("value").unwrap();
295                metadata.insert(key.clone(), value.clone());
296            });
297
298            Ok(Schema::new_with_metadata(arrow_fields?, metadata))
299        } else {
300            Ok(Schema::new(arrow_fields?))
301        }
302    }
303}
304
305impl ArrowJsonField {
306    /// Compare the Arrow JSON field with the Arrow `Field`
307    fn equals_field(&self, field: &Field) -> bool {
308        // convert to a field
309        match self.to_arrow_field() {
310            Ok(self_field) => {
311                assert_eq!(&self_field, field, "Arrow fields not the same");
312                true
313            }
314            Err(e) => {
315                eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
316                false
317            }
318        }
319    }
320
321    /// Convert to an Arrow Field
322    /// TODO: convert to use an Into
323    fn to_arrow_field(&self) -> Result<Field> {
324        // a bit regressive, but we have to convert the field to JSON in order to convert it
325        let field =
326            serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
327        field_from_json(&field)
328    }
329}
330
331/// Generates a [`RecordBatch`] from an Arrow JSON batch, given a schema
332pub fn record_batch_from_json(
333    schema: &Schema,
334    json_batch: ArrowJsonBatch,
335    json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
336) -> Result<RecordBatch> {
337    let mut columns = vec![];
338
339    for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
340        let col = array_from_json(field, json_col, json_dictionaries)?;
341        columns.push(col);
342    }
343
344    RecordBatch::try_new(Arc::new(schema.clone()), columns)
345}
346
347/// Construct an Arrow array from a partially typed JSON column
348pub fn array_from_json(
349    field: &Field,
350    json_col: ArrowJsonColumn,
351    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
352) -> Result<ArrayRef> {
353    match field.data_type() {
354        DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
355        DataType::Boolean => {
356            let mut b = BooleanBuilder::with_capacity(json_col.count);
357            for (is_valid, value) in json_col
358                .validity
359                .as_ref()
360                .unwrap()
361                .iter()
362                .zip(json_col.data.unwrap())
363            {
364                match is_valid {
365                    1 => b.append_value(value.as_bool().unwrap()),
366                    _ => b.append_null(),
367                };
368            }
369            Ok(Arc::new(b.finish()))
370        }
371        DataType::Int8 => {
372            let mut b = Int8Builder::with_capacity(json_col.count);
373            for (is_valid, value) in json_col
374                .validity
375                .as_ref()
376                .unwrap()
377                .iter()
378                .zip(json_col.data.unwrap())
379            {
380                match is_valid {
381                    1 => b.append_value(value.as_i64().ok_or_else(|| {
382                        ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
383                    })? as i8),
384                    _ => b.append_null(),
385                };
386            }
387            Ok(Arc::new(b.finish()))
388        }
389        DataType::Int16 => {
390            let mut b = Int16Builder::with_capacity(json_col.count);
391            for (is_valid, value) in json_col
392                .validity
393                .as_ref()
394                .unwrap()
395                .iter()
396                .zip(json_col.data.unwrap())
397            {
398                match is_valid {
399                    1 => b.append_value(value.as_i64().unwrap() as i16),
400                    _ => b.append_null(),
401                };
402            }
403            Ok(Arc::new(b.finish()))
404        }
405        DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
406            let mut b = Int32Builder::with_capacity(json_col.count);
407            for (is_valid, value) in json_col
408                .validity
409                .as_ref()
410                .unwrap()
411                .iter()
412                .zip(json_col.data.unwrap())
413            {
414                match is_valid {
415                    1 => b.append_value(value.as_i64().unwrap() as i32),
416                    _ => b.append_null(),
417                };
418            }
419            let array = Arc::new(b.finish()) as ArrayRef;
420            arrow::compute::cast(&array, field.data_type())
421        }
422        DataType::Interval(IntervalUnit::YearMonth) => {
423            let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
424            for (is_valid, value) in json_col
425                .validity
426                .as_ref()
427                .unwrap()
428                .iter()
429                .zip(json_col.data.unwrap())
430            {
431                match is_valid {
432                    1 => b.append_value(value.as_i64().unwrap() as i32),
433                    _ => b.append_null(),
434                };
435            }
436            Ok(Arc::new(b.finish()))
437        }
438        DataType::Int64
439        | DataType::Date64
440        | DataType::Time64(_)
441        | DataType::Timestamp(_, _)
442        | DataType::Duration(_) => {
443            let mut b = Int64Builder::with_capacity(json_col.count);
444            for (is_valid, value) in json_col
445                .validity
446                .as_ref()
447                .unwrap()
448                .iter()
449                .zip(json_col.data.unwrap())
450            {
451                match is_valid {
452                    1 => b.append_value(match value {
453                        Value::Number(n) => n.as_i64().unwrap(),
454                        Value::String(s) => s.parse().expect("Unable to parse string as i64"),
455                        _ => panic!("Unable to parse {value:?} as number"),
456                    }),
457                    _ => b.append_null(),
458                };
459            }
460            let array = Arc::new(b.finish()) as ArrayRef;
461            arrow::compute::cast(&array, field.data_type())
462        }
463        DataType::Interval(IntervalUnit::DayTime) => {
464            let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
465            for (is_valid, value) in json_col
466                .validity
467                .as_ref()
468                .unwrap()
469                .iter()
470                .zip(json_col.data.unwrap())
471            {
472                match is_valid {
473                    1 => b.append_value(match value {
474                        Value::Object(ref map)
475                            if map.contains_key("days") && map.contains_key("milliseconds") =>
476                        {
477                            match field.data_type() {
478                                DataType::Interval(IntervalUnit::DayTime) => {
479                                    let days = map.get("days").unwrap();
480                                    let milliseconds = map.get("milliseconds").unwrap();
481
482                                    match (days, milliseconds) {
483                                        (Value::Number(d), Value::Number(m)) => {
484                                            let days = d.as_i64().unwrap() as _;
485                                            let millis = m.as_i64().unwrap() as _;
486                                            IntervalDayTime::new(days, millis)
487                                        }
488                                        _ => {
489                                            panic!("Unable to parse {value:?} as interval daytime")
490                                        }
491                                    }
492                                }
493                                _ => panic!("Unable to parse {value:?} as interval daytime"),
494                            }
495                        }
496                        _ => panic!("Unable to parse {value:?} as number"),
497                    }),
498                    _ => b.append_null(),
499                };
500            }
501            Ok(Arc::new(b.finish()))
502        }
503        DataType::UInt8 => {
504            let mut b = UInt8Builder::with_capacity(json_col.count);
505            for (is_valid, value) in json_col
506                .validity
507                .as_ref()
508                .unwrap()
509                .iter()
510                .zip(json_col.data.unwrap())
511            {
512                match is_valid {
513                    1 => b.append_value(value.as_u64().unwrap() as u8),
514                    _ => b.append_null(),
515                };
516            }
517            Ok(Arc::new(b.finish()))
518        }
519        DataType::UInt16 => {
520            let mut b = UInt16Builder::with_capacity(json_col.count);
521            for (is_valid, value) in json_col
522                .validity
523                .as_ref()
524                .unwrap()
525                .iter()
526                .zip(json_col.data.unwrap())
527            {
528                match is_valid {
529                    1 => b.append_value(value.as_u64().unwrap() as u16),
530                    _ => b.append_null(),
531                };
532            }
533            Ok(Arc::new(b.finish()))
534        }
535        DataType::UInt32 => {
536            let mut b = UInt32Builder::with_capacity(json_col.count);
537            for (is_valid, value) in json_col
538                .validity
539                .as_ref()
540                .unwrap()
541                .iter()
542                .zip(json_col.data.unwrap())
543            {
544                match is_valid {
545                    1 => b.append_value(value.as_u64().unwrap() as u32),
546                    _ => b.append_null(),
547                };
548            }
549            Ok(Arc::new(b.finish()))
550        }
551        DataType::UInt64 => {
552            let mut b = UInt64Builder::with_capacity(json_col.count);
553            for (is_valid, value) in json_col
554                .validity
555                .as_ref()
556                .unwrap()
557                .iter()
558                .zip(json_col.data.unwrap())
559            {
560                match is_valid {
561                    1 => {
562                        if value.is_string() {
563                            b.append_value(
564                                value
565                                    .as_str()
566                                    .unwrap()
567                                    .parse()
568                                    .expect("Unable to parse string as u64"),
569                            )
570                        } else if value.is_number() {
571                            b.append_value(value.as_u64().expect("Unable to read number as u64"))
572                        } else {
573                            panic!("Unable to parse value {value:?} as u64")
574                        }
575                    }
576                    _ => b.append_null(),
577                };
578            }
579            Ok(Arc::new(b.finish()))
580        }
581        DataType::Interval(IntervalUnit::MonthDayNano) => {
582            let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
583            for (is_valid, value) in json_col
584                .validity
585                .as_ref()
586                .unwrap()
587                .iter()
588                .zip(json_col.data.unwrap())
589            {
590                match is_valid {
591                    1 => b.append_value(match value {
592                        Value::Object(v) => {
593                            let months = v.get("months").unwrap();
594                            let days = v.get("days").unwrap();
595                            let nanoseconds = v.get("nanoseconds").unwrap();
596                            match (months, days, nanoseconds) {
597                                (
598                                    Value::Number(months),
599                                    Value::Number(days),
600                                    Value::Number(nanoseconds),
601                                ) => {
602                                    let months = months.as_i64().unwrap() as i32;
603                                    let days = days.as_i64().unwrap() as i32;
604                                    let nanoseconds = nanoseconds.as_i64().unwrap();
605                                    IntervalMonthDayNano::new(months, days, nanoseconds)
606                                }
607                                (_, _, _) => {
608                                    panic!("Unable to parse {v:?} as MonthDayNano")
609                                }
610                            }
611                        }
612                        _ => panic!("Unable to parse {value:?} as MonthDayNano"),
613                    }),
614                    _ => b.append_null(),
615                };
616            }
617            Ok(Arc::new(b.finish()))
618        }
619        DataType::Float32 => {
620            let mut b = Float32Builder::with_capacity(json_col.count);
621            for (is_valid, value) in json_col
622                .validity
623                .as_ref()
624                .unwrap()
625                .iter()
626                .zip(json_col.data.unwrap())
627            {
628                match is_valid {
629                    1 => b.append_value(value.as_f64().unwrap() as f32),
630                    _ => b.append_null(),
631                };
632            }
633            Ok(Arc::new(b.finish()))
634        }
635        DataType::Float64 => {
636            let mut b = Float64Builder::with_capacity(json_col.count);
637            for (is_valid, value) in json_col
638                .validity
639                .as_ref()
640                .unwrap()
641                .iter()
642                .zip(json_col.data.unwrap())
643            {
644                match is_valid {
645                    1 => b.append_value(value.as_f64().unwrap()),
646                    _ => b.append_null(),
647                };
648            }
649            Ok(Arc::new(b.finish()))
650        }
651        DataType::Binary => {
652            let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
653            for (is_valid, value) in json_col
654                .validity
655                .as_ref()
656                .unwrap()
657                .iter()
658                .zip(json_col.data.unwrap())
659            {
660                match is_valid {
661                    1 => {
662                        let v = decode(value.as_str().unwrap()).unwrap();
663                        b.append_value(&v)
664                    }
665                    _ => b.append_null(),
666                };
667            }
668            Ok(Arc::new(b.finish()))
669        }
670        DataType::LargeBinary => {
671            let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
672            for (is_valid, value) in json_col
673                .validity
674                .as_ref()
675                .unwrap()
676                .iter()
677                .zip(json_col.data.unwrap())
678            {
679                match is_valid {
680                    1 => {
681                        let v = decode(value.as_str().unwrap()).unwrap();
682                        b.append_value(&v)
683                    }
684                    _ => b.append_null(),
685                };
686            }
687            Ok(Arc::new(b.finish()))
688        }
689        DataType::Utf8 => {
690            let mut b = StringBuilder::with_capacity(json_col.count, 1024);
691            for (is_valid, value) in json_col
692                .validity
693                .as_ref()
694                .unwrap()
695                .iter()
696                .zip(json_col.data.unwrap())
697            {
698                match is_valid {
699                    1 => b.append_value(value.as_str().unwrap()),
700                    _ => b.append_null(),
701                };
702            }
703            Ok(Arc::new(b.finish()))
704        }
705        DataType::LargeUtf8 => {
706            let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
707            for (is_valid, value) in json_col
708                .validity
709                .as_ref()
710                .unwrap()
711                .iter()
712                .zip(json_col.data.unwrap())
713            {
714                match is_valid {
715                    1 => b.append_value(value.as_str().unwrap()),
716                    _ => b.append_null(),
717                };
718            }
719            Ok(Arc::new(b.finish()))
720        }
721        DataType::FixedSizeBinary(len) => {
722            let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
723            for (is_valid, value) in json_col
724                .validity
725                .as_ref()
726                .unwrap()
727                .iter()
728                .zip(json_col.data.unwrap())
729            {
730                match is_valid {
731                    1 => {
732                        let v = hex::decode(value.as_str().unwrap()).unwrap();
733                        b.append_value(&v)?
734                    }
735                    _ => b.append_null(),
736                };
737            }
738            Ok(Arc::new(b.finish()))
739        }
740        DataType::List(child_field) => {
741            let null_buf = create_null_buf(&json_col);
742            let children = json_col.children.clone().unwrap();
743            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
744            let offsets: Vec<i32> = json_col
745                .offset
746                .unwrap()
747                .iter()
748                .map(|v| v.as_i64().unwrap() as i32)
749                .collect();
750            let list_data = ArrayData::builder(field.data_type().clone())
751                .len(json_col.count)
752                .offset(0)
753                .add_buffer(Buffer::from(offsets.to_byte_slice()))
754                .add_child_data(child_array.into_data())
755                .null_bit_buffer(Some(null_buf))
756                .build()
757                .unwrap();
758            Ok(Arc::new(ListArray::from(list_data)))
759        }
760        DataType::LargeList(child_field) => {
761            let null_buf = create_null_buf(&json_col);
762            let children = json_col.children.clone().unwrap();
763            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
764            let offsets: Vec<i64> = json_col
765                .offset
766                .unwrap()
767                .iter()
768                .map(|v| match v {
769                    Value::Number(n) => n.as_i64().unwrap(),
770                    Value::String(s) => s.parse::<i64>().unwrap(),
771                    _ => panic!("64-bit offset must be either string or number"),
772                })
773                .collect();
774            let list_data = ArrayData::builder(field.data_type().clone())
775                .len(json_col.count)
776                .offset(0)
777                .add_buffer(Buffer::from(offsets.to_byte_slice()))
778                .add_child_data(child_array.into_data())
779                .null_bit_buffer(Some(null_buf))
780                .build()
781                .unwrap();
782            Ok(Arc::new(LargeListArray::from(list_data)))
783        }
784        DataType::ListView(child_field) => {
785            let null_buf = create_null_buf(&json_col);
786            let children = json_col.children.clone().unwrap();
787            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
788            let offsets: Vec<i32> = json_col
789                .offset
790                .unwrap()
791                .iter()
792                .map(|v| v.as_i64().unwrap() as i32)
793                .collect();
794            let sizes: Vec<i32> = json_col
795                .size
796                .unwrap()
797                .iter()
798                .map(|v| v.as_i64().unwrap() as i32)
799                .collect();
800            let list_data = ArrayData::builder(field.data_type().clone())
801                .len(json_col.count)
802                .add_buffer(Buffer::from(offsets.to_byte_slice()))
803                .add_buffer(Buffer::from(sizes.to_byte_slice()))
804                .add_child_data(child_array.into_data())
805                .null_bit_buffer(Some(null_buf))
806                .build()
807                .unwrap();
808            Ok(Arc::new(ListViewArray::from(list_data)))
809        }
810        DataType::LargeListView(child_field) => {
811            let null_buf = create_null_buf(&json_col);
812            let children = json_col.children.clone().unwrap();
813            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
814            let offsets: Vec<i64> = json_col
815                .offset
816                .unwrap()
817                .iter()
818                .map(|v| match v {
819                    Value::Number(n) => n.as_i64().unwrap(),
820                    Value::String(s) => s.parse::<i64>().unwrap(),
821                    _ => panic!("64-bit offset must be either string or number"),
822                })
823                .collect();
824            let sizes: Vec<i64> = json_col
825                .size
826                .unwrap()
827                .iter()
828                .map(|v| match v {
829                    Value::Number(n) => n.as_i64().unwrap(),
830                    Value::String(s) => s.parse::<i64>().unwrap(),
831                    _ => panic!("64-bit size must be either string or number"),
832                })
833                .collect();
834            let list_data = ArrayData::builder(field.data_type().clone())
835                .len(json_col.count)
836                .add_buffer(Buffer::from(offsets.to_byte_slice()))
837                .add_buffer(Buffer::from(sizes.to_byte_slice()))
838                .add_child_data(child_array.into_data())
839                .null_bit_buffer(Some(null_buf))
840                .build()
841                .unwrap();
842            Ok(Arc::new(LargeListViewArray::from(list_data)))
843        }
844        DataType::FixedSizeList(child_field, _) => {
845            let children = json_col.children.clone().unwrap();
846            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
847            let null_buf = create_null_buf(&json_col);
848            let list_data = ArrayData::builder(field.data_type().clone())
849                .len(json_col.count)
850                .add_child_data(child_array.into_data())
851                .null_bit_buffer(Some(null_buf))
852                .build()
853                .unwrap();
854            Ok(Arc::new(FixedSizeListArray::from(list_data)))
855        }
856        DataType::Struct(fields) => {
857            // construct struct with null data
858            let null_buf = create_null_buf(&json_col);
859            let mut array_data = ArrayData::builder(field.data_type().clone())
860                .len(json_col.count)
861                .null_bit_buffer(Some(null_buf));
862
863            for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
864                let array = array_from_json(field, col, dictionaries)?;
865                array_data = array_data.add_child_data(array.into_data());
866            }
867
868            let array = StructArray::from(array_data.build().unwrap());
869            Ok(Arc::new(array))
870        }
871        DataType::Dictionary(key_type, value_type) => {
872            #[allow(deprecated)]
873            let dict_id = field.dict_id().ok_or_else(|| {
874                ArrowError::JsonError(format!("Unable to find dict_id for field {field}"))
875            })?;
876            // find dictionary
877            let dictionary = dictionaries
878                .ok_or_else(|| {
879                    ArrowError::JsonError(format!(
880                        "Unable to find any dictionaries for field {field}"
881                    ))
882                })?
883                .get(&dict_id);
884            match dictionary {
885                Some(dictionary) => dictionary_array_from_json(
886                    field,
887                    json_col,
888                    key_type,
889                    value_type,
890                    dictionary,
891                    dictionaries,
892                ),
893                None => Err(ArrowError::JsonError(format!(
894                    "Unable to find dictionary for field {field}"
895                ))),
896            }
897        }
898        DataType::Decimal32(precision, scale) => {
899            let mut b = Decimal32Builder::with_capacity(json_col.count);
900            for (is_valid, value) in json_col
901                .validity
902                .as_ref()
903                .unwrap()
904                .iter()
905                .zip(json_col.data.unwrap())
906            {
907                match is_valid {
908                    1 => b.append_value(value.as_str().unwrap().parse::<i32>().unwrap()),
909                    _ => b.append_null(),
910                };
911            }
912            Ok(Arc::new(
913                b.finish().with_precision_and_scale(*precision, *scale)?,
914            ))
915        }
916        DataType::Decimal64(precision, scale) => {
917            let mut b = Decimal64Builder::with_capacity(json_col.count);
918            for (is_valid, value) in json_col
919                .validity
920                .as_ref()
921                .unwrap()
922                .iter()
923                .zip(json_col.data.unwrap())
924            {
925                match is_valid {
926                    1 => b.append_value(value.as_str().unwrap().parse::<i64>().unwrap()),
927                    _ => b.append_null(),
928                };
929            }
930            Ok(Arc::new(
931                b.finish().with_precision_and_scale(*precision, *scale)?,
932            ))
933        }
934        DataType::Decimal128(precision, scale) => {
935            let mut b = Decimal128Builder::with_capacity(json_col.count);
936            for (is_valid, value) in json_col
937                .validity
938                .as_ref()
939                .unwrap()
940                .iter()
941                .zip(json_col.data.unwrap())
942            {
943                match is_valid {
944                    1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
945                    _ => b.append_null(),
946                };
947            }
948            Ok(Arc::new(
949                b.finish().with_precision_and_scale(*precision, *scale)?,
950            ))
951        }
952        DataType::Decimal256(precision, scale) => {
953            let mut b = Decimal256Builder::with_capacity(json_col.count);
954            for (is_valid, value) in json_col
955                .validity
956                .as_ref()
957                .unwrap()
958                .iter()
959                .zip(json_col.data.unwrap())
960            {
961                match is_valid {
962                    1 => {
963                        let str = value.as_str().unwrap();
964                        let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
965                        let integer_bytes = integer.to_signed_bytes_le();
966                        let mut bytes = if integer.is_positive() {
967                            [0_u8; 32]
968                        } else {
969                            [255_u8; 32]
970                        };
971                        bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
972                        b.append_value(i256::from_le_bytes(bytes));
973                    }
974                    _ => b.append_null(),
975                }
976            }
977            Ok(Arc::new(
978                b.finish().with_precision_and_scale(*precision, *scale)?,
979            ))
980        }
981        DataType::Map(child_field, _) => {
982            let null_buf = create_null_buf(&json_col);
983            let children = json_col.children.clone().unwrap();
984            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
985            let offsets: Vec<i32> = json_col
986                .offset
987                .unwrap()
988                .iter()
989                .map(|v| v.as_i64().unwrap() as i32)
990                .collect();
991            let array_data = ArrayData::builder(field.data_type().clone())
992                .len(json_col.count)
993                .add_buffer(Buffer::from(offsets.to_byte_slice()))
994                .add_child_data(child_array.into_data())
995                .null_bit_buffer(Some(null_buf))
996                .build()
997                .unwrap();
998
999            let array = MapArray::from(array_data);
1000            Ok(Arc::new(array))
1001        }
1002        DataType::Union(fields, _) => {
1003            let type_ids = if let Some(type_id) = json_col.type_id {
1004                type_id
1005            } else {
1006                return Err(ArrowError::JsonError(
1007                    "Cannot find expected type_id in json column".to_string(),
1008                ));
1009            };
1010
1011            let offset: Option<ScalarBuffer<i32>> = json_col
1012                .offset
1013                .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
1014
1015            let mut children = Vec::with_capacity(fields.len());
1016            for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
1017                let array = array_from_json(field, col, dictionaries)?;
1018                children.push(array);
1019            }
1020
1021            let array =
1022                UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
1023            Ok(Arc::new(array))
1024        }
1025        DataType::Utf8View => {
1026            let views = json_col.views.ok_or_else(|| {
1027                ArrowError::JsonError("Utf8View requires VIEWS field".to_string())
1028            })?;
1029            let variadic_buffers = json_col.variadic_data_buffers.unwrap_or_default();
1030            let validity = json_col.validity.as_ref();
1031
1032            let mut builder = StringViewBuilder::new();
1033            for (i, view) in views.iter().enumerate() {
1034                let is_valid = validity.map_or(1, |v| v[i]);
1035                if is_valid == 0 {
1036                    builder.append_null();
1037                } else {
1038                    let view_obj = view.as_object().unwrap();
1039                    let size = view_obj["SIZE"].as_u64().unwrap() as usize;
1040                    // Check for INLINED key presence - inlined if SIZE <= 12
1041                    if let Some(inlined) = view_obj.get("INLINED") {
1042                        builder.append_value(inlined.as_str().unwrap());
1043                    } else {
1044                        // Reference to variadic buffer
1045                        let buffer_index = view_obj["BUFFER_INDEX"].as_u64().unwrap() as usize;
1046                        let offset = view_obj["OFFSET"].as_u64().unwrap() as usize;
1047                        let buffer_data = hex::decode(&variadic_buffers[buffer_index]).unwrap();
1048                        let s = std::str::from_utf8(&buffer_data[offset..offset + size]).unwrap();
1049                        builder.append_value(s);
1050                    }
1051                }
1052            }
1053            Ok(Arc::new(builder.finish()))
1054        }
1055        DataType::BinaryView => {
1056            let views = json_col.views.ok_or_else(|| {
1057                ArrowError::JsonError("BinaryView requires VIEWS field".to_string())
1058            })?;
1059            let variadic_buffers = json_col.variadic_data_buffers.unwrap_or_default();
1060            let validity = json_col.validity.as_ref();
1061
1062            let mut builder = BinaryViewBuilder::new();
1063            for (i, view) in views.iter().enumerate() {
1064                let is_valid = validity.map_or(1, |v| v[i]);
1065                if is_valid == 0 {
1066                    builder.append_null();
1067                } else {
1068                    let view_obj = view.as_object().unwrap();
1069                    let size = view_obj["SIZE"].as_u64().unwrap() as usize;
1070                    // Check for INLINED key presence - inlined if SIZE <= 12
1071                    if let Some(inlined) = view_obj.get("INLINED") {
1072                        let data = hex::decode(inlined.as_str().unwrap()).unwrap();
1073                        builder.append_value(&data);
1074                    } else {
1075                        // Reference to variadic buffer
1076                        let buffer_index = view_obj["BUFFER_INDEX"].as_u64().unwrap() as usize;
1077                        let offset = view_obj["OFFSET"].as_u64().unwrap() as usize;
1078                        let buffer_data = hex::decode(&variadic_buffers[buffer_index]).unwrap();
1079                        builder.append_value(&buffer_data[offset..offset + size]);
1080                    }
1081                }
1082            }
1083            Ok(Arc::new(builder.finish()))
1084        }
1085        DataType::RunEndEncoded(run_ends_field, values_field) => {
1086            let children = json_col.children.clone().unwrap();
1087            if children.len() != 2 {
1088                return Err(ArrowError::JsonError(
1089                    "RunEndEncoded requires exactly 2 children".to_string(),
1090                ));
1091            }
1092            let run_ends_array =
1093                array_from_json(run_ends_field, children[0].clone(), dictionaries)?;
1094            let values_array = array_from_json(values_field, children[1].clone(), dictionaries)?;
1095
1096            let run_array_data = ArrayData::builder(field.data_type().clone())
1097                .len(json_col.count)
1098                .add_child_data(run_ends_array.into_data())
1099                .add_child_data(values_array.into_data())
1100                .build()
1101                .unwrap();
1102
1103            Ok(make_array(run_array_data))
1104        }
1105        t => Err(ArrowError::JsonError(format!(
1106            "data type {t} not supported"
1107        ))),
1108    }
1109}
1110
1111/// Construct a [`DictionaryArray`] from a partially typed JSON column
1112pub fn dictionary_array_from_json(
1113    field: &Field,
1114    json_col: ArrowJsonColumn,
1115    dict_key: &DataType,
1116    dict_value: &DataType,
1117    dictionary: &ArrowJsonDictionaryBatch,
1118    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
1119) -> Result<ArrayRef> {
1120    match dict_key {
1121        DataType::Int8
1122        | DataType::Int16
1123        | DataType::Int32
1124        | DataType::Int64
1125        | DataType::UInt8
1126        | DataType::UInt16
1127        | DataType::UInt32
1128        | DataType::UInt64 => {
1129            let null_buf = create_null_buf(&json_col);
1130
1131            // build the key data into a buffer, then construct values separately
1132            #[allow(deprecated)]
1133            let key_field = Field::new_dict(
1134                "key",
1135                dict_key.clone(),
1136                field.is_nullable(),
1137                #[allow(deprecated)]
1138                field
1139                    .dict_id()
1140                    .expect("Dictionary fields must have a dict_id value"),
1141                field
1142                    .dict_is_ordered()
1143                    .expect("Dictionary fields must have a dict_is_ordered value"),
1144            );
1145            let keys = array_from_json(&key_field, json_col, None)?;
1146            // note: not enough info on nullability of dictionary
1147            let value_field = Field::new("value", dict_value.clone(), true);
1148            let values = array_from_json(
1149                &value_field,
1150                dictionary.data.columns[0].clone(),
1151                dictionaries,
1152            )?;
1153
1154            // convert key and value to dictionary data
1155            let dict_data = ArrayData::builder(field.data_type().clone())
1156                .len(keys.len())
1157                .add_buffer(keys.to_data().buffers()[0].clone())
1158                .null_bit_buffer(Some(null_buf))
1159                .add_child_data(values.into_data())
1160                .build()
1161                .unwrap();
1162
1163            let array = match dict_key {
1164                DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
1165                DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
1166                DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
1167                DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
1168                DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
1169                DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
1170                DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
1171                DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
1172                _ => unreachable!(),
1173            };
1174            Ok(array)
1175        }
1176        _ => Err(ArrowError::JsonError(format!(
1177            "Dictionary key type {dict_key:?} not supported"
1178        ))),
1179    }
1180}
1181
1182/// A helper to create a null buffer from a `Vec<bool>`
1183fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
1184    let num_bytes = bit_util::ceil(json_col.count, 8);
1185    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
1186    json_col
1187        .validity
1188        .clone()
1189        .unwrap()
1190        .iter()
1191        .enumerate()
1192        .for_each(|(i, v)| {
1193            let null_slice = null_buf.as_slice_mut();
1194            if *v != 0 {
1195                bit_util::set_bit(null_slice, i);
1196            }
1197        });
1198    null_buf.into()
1199}
1200
1201impl ArrowJsonBatch {
1202    /// Convert a [`RecordBatch`] to an [`ArrowJsonBatch`]
1203    ///
1204    /// <div class="warning">
1205    ///
1206    /// This function is **deliberately incomplete**! As noted in the crate-level documentation,
1207    /// this crate is only intended for use within the Arrow project itself.
1208    ///
1209    /// Right now, this function only supports `DataType::Int8` columns. Other data types will lead
1210    /// to an empty `ArrowJsonColumn`.
1211    ///
1212    /// </div>
1213    pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1214        let mut json_batch = ArrowJsonBatch {
1215            count: batch.num_rows(),
1216            columns: Vec::with_capacity(batch.num_columns()),
1217        };
1218
1219        for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1220            let json_col = match field.data_type() {
1221                DataType::Int8 => {
1222                    let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1223
1224                    let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1225                    let mut data: Vec<Value> = Vec::with_capacity(col.len());
1226
1227                    for i in 0..col.len() {
1228                        if col.is_null(i) {
1229                            validity.push(1);
1230                            data.push(0i8.into());
1231                        } else {
1232                            validity.push(0);
1233                            data.push(col.value(i).into());
1234                        }
1235                    }
1236
1237                    ArrowJsonColumn {
1238                        name: field.name().clone(),
1239                        count: col.len(),
1240                        validity: Some(validity),
1241                        data: Some(data),
1242                        offset: None,
1243                        type_id: None,
1244                        size: None,
1245                        views: None,
1246                        variadic_data_buffers: None,
1247                        children: None,
1248                    }
1249                }
1250                _ => ArrowJsonColumn {
1251                    name: field.name().clone(),
1252                    count: col.len(),
1253                    validity: None,
1254                    data: None,
1255                    offset: None,
1256                    type_id: None,
1257                    size: None,
1258                    views: None,
1259                    variadic_data_buffers: None,
1260                    children: None,
1261                },
1262            };
1263
1264            json_batch.columns.push(json_col);
1265        }
1266
1267        json_batch
1268    }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    use super::*;
1274
1275    use std::fs::File;
1276    use std::io::Read;
1277
1278    #[test]
1279    fn test_schema_equality() {
1280        let json = r#"
1281        {
1282            "fields": [
1283                {
1284                    "name": "c1",
1285                    "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1286                    "nullable": true,
1287                    "children": []
1288                },
1289                {
1290                    "name": "c2",
1291                    "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1292                    "nullable": true,
1293                    "children": []
1294                },
1295                {
1296                    "name": "c3",
1297                    "type": {"name": "utf8"},
1298                    "nullable": true,
1299                    "children": []
1300                },
1301                {
1302                    "name": "c4",
1303                    "type": {
1304                        "name": "list"
1305                    },
1306                    "nullable": true,
1307                    "children": [
1308                        {
1309                            "name": "custom_item",
1310                            "type": {
1311                                "name": "int",
1312                                "isSigned": true,
1313                                "bitWidth": 32
1314                            },
1315                            "nullable": false,
1316                            "children": []
1317                        }
1318                    ]
1319                }
1320            ]
1321        }"#;
1322        let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1323        let schema = Schema::new(vec![
1324            Field::new("c1", DataType::Int32, true),
1325            Field::new("c2", DataType::Float64, true),
1326            Field::new("c3", DataType::Utf8, true),
1327            Field::new(
1328                "c4",
1329                DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1330                true,
1331            ),
1332        ]);
1333        assert!(json_schema.equals_schema(&schema));
1334    }
1335
1336    #[test]
1337    fn test_arrow_data_equality() {
1338        let secs_tz = Some("Europe/Budapest".into());
1339        let millis_tz = Some("America/New_York".into());
1340        let micros_tz = Some("UTC".into());
1341        let nanos_tz = Some("Africa/Johannesburg".into());
1342
1343        let schema = Schema::new(vec![
1344            Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1345                [("k".to_string(), "v".to_string())]
1346                    .iter()
1347                    .cloned()
1348                    .collect(),
1349            ),
1350            Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1351                [("k2".to_string(), "v2".to_string())]
1352                    .iter()
1353                    .cloned()
1354                    .collect(),
1355            ),
1356            Field::new("bools", DataType::Boolean, true),
1357            Field::new("int8s", DataType::Int8, true),
1358            Field::new("int16s", DataType::Int16, true),
1359            Field::new("int32s", DataType::Int32, true),
1360            Field::new("int64s", DataType::Int64, true),
1361            Field::new("uint8s", DataType::UInt8, true),
1362            Field::new("uint16s", DataType::UInt16, true),
1363            Field::new("uint32s", DataType::UInt32, true),
1364            Field::new("uint64s", DataType::UInt64, true),
1365            Field::new("float32s", DataType::Float32, true),
1366            Field::new("float64s", DataType::Float64, true),
1367            Field::new("date_days", DataType::Date32, true),
1368            Field::new("date_millis", DataType::Date64, true),
1369            Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1370            Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1371            Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1372            Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1373            Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1374            Field::new(
1375                "ts_millis",
1376                DataType::Timestamp(TimeUnit::Millisecond, None),
1377                true,
1378            ),
1379            Field::new(
1380                "ts_micros",
1381                DataType::Timestamp(TimeUnit::Microsecond, None),
1382                true,
1383            ),
1384            Field::new(
1385                "ts_nanos",
1386                DataType::Timestamp(TimeUnit::Nanosecond, None),
1387                true,
1388            ),
1389            Field::new(
1390                "ts_secs_tz",
1391                DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1392                true,
1393            ),
1394            Field::new(
1395                "ts_millis_tz",
1396                DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1397                true,
1398            ),
1399            Field::new(
1400                "ts_micros_tz",
1401                DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1402                true,
1403            ),
1404            Field::new(
1405                "ts_nanos_tz",
1406                DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1407                true,
1408            ),
1409            Field::new("utf8s", DataType::Utf8, true),
1410            Field::new(
1411                "lists",
1412                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1413                true,
1414            ),
1415            Field::new(
1416                "structs",
1417                DataType::Struct(Fields::from(vec![
1418                    Field::new("int32s", DataType::Int32, true),
1419                    Field::new("utf8s", DataType::Utf8, true),
1420                ])),
1421                true,
1422            ),
1423            Field::new("utf8views", DataType::Utf8View, true),
1424            Field::new("binaryviews", DataType::BinaryView, true),
1425            Field::new(
1426                "listviews",
1427                DataType::ListView(Arc::new(Field::new_list_field(DataType::Int32, true))),
1428                true,
1429            ),
1430            Field::new(
1431                "largelistviews",
1432                DataType::LargeListView(Arc::new(Field::new_list_field(DataType::Int32, true))),
1433                true,
1434            ),
1435            Field::new(
1436                "runendencoded",
1437                DataType::RunEndEncoded(
1438                    Arc::new(Field::new("run_ends", DataType::Int16, false)),
1439                    Arc::new(Field::new("values", DataType::Int32, true)),
1440                ),
1441                true,
1442            ),
1443        ]);
1444
1445        let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1446        let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1447        let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1448        let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1449        let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1450        let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1451        let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1452        let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1453        let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1454        let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1455        let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1456        let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1457        let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1458        let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1459        let date_millis = Date64Array::from(vec![
1460            Some(167903550396207),
1461            Some(29923997007884),
1462            Some(30612271819236),
1463        ]);
1464        let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1465        let time_millis =
1466            Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1467        let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1468        let time_nanos =
1469            Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1470        let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1471        let ts_millis =
1472            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1473        let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1474        let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1475        let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1476            .with_timezone_opt(secs_tz);
1477        let ts_millis_tz =
1478            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1479                .with_timezone_opt(millis_tz);
1480        let ts_micros_tz =
1481            TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1482        let ts_nanos_tz =
1483            TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1484                .with_timezone_opt(nanos_tz);
1485        let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1486
1487        let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1488        let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1489        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1490        let list_data = ArrayData::builder(list_data_type)
1491            .len(3)
1492            .add_buffer(value_offsets)
1493            .add_child_data(value_data.into_data())
1494            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1495            .build()
1496            .unwrap();
1497        let lists = ListArray::from(list_data);
1498
1499        let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1500        let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1501        let struct_data_type = DataType::Struct(Fields::from(vec![
1502            Field::new("int32s", DataType::Int32, true),
1503            Field::new("utf8s", DataType::Utf8, true),
1504        ]));
1505        let struct_data = ArrayData::builder(struct_data_type)
1506            .len(3)
1507            .add_child_data(structs_int32s.into_data())
1508            .add_child_data(structs_utf8s.into_data())
1509            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1510            .build()
1511            .unwrap();
1512        let structs = StructArray::from(struct_data);
1513
1514        let utf8views =
1515            StringViewArray::from(vec![Some("hello"), None, Some("this is not inlined")]);
1516        let binaryviews = BinaryViewArray::from_iter(vec![
1517            Some(b"\xf3\x4d".as_slice()),
1518            Some(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f".as_slice()),
1519            None,
1520        ]);
1521
1522        let listview_value_data = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
1523        let listview_offsets = Buffer::from_slice_ref([0i32, 2, 2]);
1524        let listview_sizes = Buffer::from_slice_ref([2i32, 0, 3]);
1525        let listview_data_type =
1526            DataType::ListView(Arc::new(Field::new_list_field(DataType::Int32, true)));
1527        let listview_data = ArrayData::builder(listview_data_type)
1528            .len(3)
1529            .add_buffer(listview_offsets)
1530            .add_buffer(listview_sizes)
1531            .add_child_data(listview_value_data.into_data())
1532            .null_bit_buffer(Some(Buffer::from([0b00000101])))
1533            .build()
1534            .unwrap();
1535        let listviews = ListViewArray::from(listview_data);
1536
1537        let largelistview_value_data = Int32Array::from(vec![Some(10), None, Some(30)]);
1538        let largelistview_offsets = Buffer::from_slice_ref([0i64, 2, 3]);
1539        let largelistview_sizes = Buffer::from_slice_ref([2i64, 1, 0]);
1540        let largelistview_data_type =
1541            DataType::LargeListView(Arc::new(Field::new_list_field(DataType::Int32, true)));
1542        let largelistview_data = ArrayData::builder(largelistview_data_type)
1543            .len(3)
1544            .add_buffer(largelistview_offsets)
1545            .add_buffer(largelistview_sizes)
1546            .add_child_data(largelistview_value_data.into_data())
1547            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1548            .build()
1549            .unwrap();
1550        let largelistviews = LargeListViewArray::from(largelistview_data);
1551
1552        let ree_run_ends = Int16Array::from(vec![2, 3]);
1553        let ree_values = Int32Array::from(vec![Some(100), None]);
1554        let ree_data_type = DataType::RunEndEncoded(
1555            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1556            Arc::new(Field::new("values", DataType::Int32, true)),
1557        );
1558        let ree_data = ArrayData::builder(ree_data_type)
1559            .len(3)
1560            .add_child_data(ree_run_ends.into_data())
1561            .add_child_data(ree_values.into_data())
1562            .build()
1563            .unwrap();
1564        let runendencoded = RunArray::<Int16Type>::from(ree_data);
1565
1566        let record_batch = RecordBatch::try_new(
1567            Arc::new(schema.clone()),
1568            vec![
1569                Arc::new(bools_with_metadata_map),
1570                Arc::new(bools_with_metadata_vec),
1571                Arc::new(bools),
1572                Arc::new(int8s),
1573                Arc::new(int16s),
1574                Arc::new(int32s),
1575                Arc::new(int64s),
1576                Arc::new(uint8s),
1577                Arc::new(uint16s),
1578                Arc::new(uint32s),
1579                Arc::new(uint64s),
1580                Arc::new(float32s),
1581                Arc::new(float64s),
1582                Arc::new(date_days),
1583                Arc::new(date_millis),
1584                Arc::new(time_secs),
1585                Arc::new(time_millis),
1586                Arc::new(time_micros),
1587                Arc::new(time_nanos),
1588                Arc::new(ts_secs),
1589                Arc::new(ts_millis),
1590                Arc::new(ts_micros),
1591                Arc::new(ts_nanos),
1592                Arc::new(ts_secs_tz),
1593                Arc::new(ts_millis_tz),
1594                Arc::new(ts_micros_tz),
1595                Arc::new(ts_nanos_tz),
1596                Arc::new(utf8s),
1597                Arc::new(lists),
1598                Arc::new(structs),
1599                Arc::new(utf8views),
1600                Arc::new(binaryviews),
1601                Arc::new(listviews),
1602                Arc::new(largelistviews),
1603                Arc::new(runendencoded),
1604            ],
1605        )
1606        .unwrap();
1607        let mut file = File::open("data/integration.json").unwrap();
1608        let mut json = String::new();
1609        file.read_to_string(&mut json).unwrap();
1610        let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1611        // test schemas
1612        assert!(arrow_json.schema.equals_schema(&schema));
1613        // test record batch
1614        assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1615    }
1616}