arrow_integration_test/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for the [Apache Arrow JSON test data format](https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format)
19//!
20//! These utilities define structs that read the integration JSON format for integration testing purposes.
21//!
22//! This is not a canonical format, but provides a human-readable way of verifying language implementations
23
24#![doc(
25    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
26    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
27)]
28#![cfg_attr(docsrs, feature(doc_auto_cfg))]
29#![warn(missing_docs)]
30use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
31use hex::decode;
32use num::BigInt;
33use num::Signed;
34use serde::{Deserialize, Serialize};
35use serde_json::{Map as SJMap, Value};
36use std::collections::HashMap;
37use std::sync::Arc;
38
39use arrow::array::*;
40use arrow::buffer::{Buffer, MutableBuffer};
41use arrow::datatypes::*;
42use arrow::error::{ArrowError, Result};
43use arrow::util::bit_util;
44
45mod datatype;
46mod field;
47mod schema;
48
49pub use datatype::*;
50pub use field::*;
51pub use schema::*;
52
53/// A struct that represents an Arrow file with a schema and record batches
54///
55/// See <https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format>
56#[derive(Deserialize, Serialize, Debug)]
57pub struct ArrowJson {
58    /// The Arrow schema for JSON file
59    pub schema: ArrowJsonSchema,
60    /// The `RecordBatch`es in the JSON file
61    pub batches: Vec<ArrowJsonBatch>,
62    /// The dictionaries in the JSON file
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
65}
66
67/// A struct that partially reads the Arrow JSON schema.
68///
69/// Fields are left as JSON `Value` as they vary by `DataType`
70#[derive(Deserialize, Serialize, Debug)]
71pub struct ArrowJsonSchema {
72    /// An array of JSON fields
73    pub fields: Vec<ArrowJsonField>,
74    /// An array of metadata key-value pairs
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub metadata: Option<Vec<HashMap<String, String>>>,
77}
78
79/// Fields are left as JSON `Value` as they vary by `DataType`
80#[derive(Deserialize, Serialize, Debug)]
81pub struct ArrowJsonField {
82    /// The name of the field
83    pub name: String,
84    /// The data type of the field,
85    /// can be any valid JSON value
86    #[serde(rename = "type")]
87    pub field_type: Value,
88    /// Whether the field is nullable
89    pub nullable: bool,
90    /// The children fields
91    pub children: Vec<ArrowJsonField>,
92    /// The dictionary for the field
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub dictionary: Option<ArrowJsonFieldDictionary>,
95    /// The metadata for the field, if any
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub metadata: Option<Value>,
98}
99
100impl From<&FieldRef> for ArrowJsonField {
101    fn from(value: &FieldRef) -> Self {
102        Self::from(value.as_ref())
103    }
104}
105
106impl From<&Field> for ArrowJsonField {
107    fn from(field: &Field) -> Self {
108        let metadata_value = match field.metadata().is_empty() {
109            false => {
110                let mut array = Vec::new();
111                for (k, v) in field.metadata() {
112                    let mut kv_map = SJMap::new();
113                    kv_map.insert(k.clone(), Value::String(v.clone()));
114                    array.push(Value::Object(kv_map));
115                }
116                if !array.is_empty() {
117                    Some(Value::Array(array))
118                } else {
119                    None
120                }
121            }
122            _ => None,
123        };
124
125        Self {
126            name: field.name().to_string(),
127            field_type: data_type_to_json(field.data_type()),
128            nullable: field.is_nullable(),
129            children: vec![],
130            dictionary: None, // TODO: not enough info
131            metadata: metadata_value,
132        }
133    }
134}
135
136/// Represents a dictionary-encoded field in the Arrow JSON format
137#[derive(Deserialize, Serialize, Debug)]
138pub struct ArrowJsonFieldDictionary {
139    /// A unique identifier for the dictionary
140    pub id: i64,
141    /// The type of the dictionary index
142    #[serde(rename = "indexType")]
143    pub index_type: DictionaryIndexType,
144    /// Whether the dictionary is ordered
145    #[serde(rename = "isOrdered")]
146    pub is_ordered: bool,
147}
148
149/// Type of an index for a dictionary-encoded field in the Arrow JSON format
150#[derive(Deserialize, Serialize, Debug)]
151pub struct DictionaryIndexType {
152    /// The name of the dictionary index type
153    pub name: String,
154    /// Whether the dictionary index type is signed
155    #[serde(rename = "isSigned")]
156    pub is_signed: bool,
157    /// The bit width of the dictionary index type
158    #[serde(rename = "bitWidth")]
159    pub bit_width: i64,
160}
161
162/// A struct that partially reads the Arrow JSON record batch
163#[derive(Deserialize, Serialize, Debug, Clone)]
164pub struct ArrowJsonBatch {
165    count: usize,
166    /// The columns in the record batch
167    pub columns: Vec<ArrowJsonColumn>,
168}
169
170/// A struct that partially reads the Arrow JSON dictionary batch
171#[derive(Deserialize, Serialize, Debug, Clone)]
172#[allow(non_snake_case)]
173pub struct ArrowJsonDictionaryBatch {
174    /// The unique identifier for the dictionary
175    pub id: i64,
176    /// The data for the dictionary
177    pub data: ArrowJsonBatch,
178}
179
180/// A struct that partially reads the Arrow JSON column/array
181#[derive(Deserialize, Serialize, Clone, Debug)]
182pub struct ArrowJsonColumn {
183    name: String,
184    /// The number of elements in the column
185    pub count: usize,
186    /// The validity bitmap to determine null values
187    #[serde(rename = "VALIDITY")]
188    pub validity: Option<Vec<u8>>,
189    /// The data values in the column
190    #[serde(rename = "DATA")]
191    pub data: Option<Vec<Value>>,
192    /// The offsets for variable-sized data types
193    #[serde(rename = "OFFSET")]
194    pub offset: Option<Vec<Value>>, // leaving as Value as 64-bit offsets are strings
195    /// The type id for union types
196    #[serde(rename = "TYPE_ID")]
197    pub type_id: Option<Vec<i8>>,
198    /// The children columns for nested types
199    pub children: Option<Vec<ArrowJsonColumn>>,
200}
201
202impl ArrowJson {
203    /// Compare the Arrow JSON with a record batch reader
204    pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
205        if !self.schema.equals_schema(&reader.schema()) {
206            return Ok(false);
207        }
208
209        for json_batch in self.get_record_batches()?.into_iter() {
210            let batch = reader.next();
211            match batch {
212                Some(Ok(batch)) => {
213                    if json_batch != batch {
214                        println!("json: {json_batch:?}");
215                        println!("batch: {batch:?}");
216                        return Ok(false);
217                    }
218                }
219                Some(Err(e)) => return Err(e),
220                None => return Ok(false),
221            }
222        }
223
224        Ok(true)
225    }
226
227    /// Convert the stored dictionaries to `Vec[RecordBatch]`
228    pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
229        let schema = self.schema.to_arrow_schema()?;
230
231        let mut dictionaries = HashMap::new();
232        self.dictionaries.iter().for_each(|dict_batches| {
233            dict_batches.iter().for_each(|d| {
234                dictionaries.insert(d.id, d.clone());
235            });
236        });
237
238        let batches: Result<Vec<_>> = self
239            .batches
240            .iter()
241            .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
242            .collect();
243
244        batches
245    }
246}
247
248impl ArrowJsonSchema {
249    /// Compare the Arrow JSON schema with the Arrow `Schema`
250    fn equals_schema(&self, schema: &Schema) -> bool {
251        let field_len = self.fields.len();
252        if field_len != schema.fields().len() {
253            return false;
254        }
255        for i in 0..field_len {
256            let json_field = &self.fields[i];
257            let field = schema.field(i);
258            if !json_field.equals_field(field) {
259                return false;
260            }
261        }
262        true
263    }
264
265    fn to_arrow_schema(&self) -> Result<Schema> {
266        let arrow_fields: Result<Vec<_>> = self
267            .fields
268            .iter()
269            .map(|field| field.to_arrow_field())
270            .collect();
271
272        if let Some(metadatas) = &self.metadata {
273            let mut metadata: HashMap<String, String> = HashMap::new();
274
275            metadatas.iter().for_each(|pair| {
276                let key = pair.get("key").unwrap();
277                let value = pair.get("value").unwrap();
278                metadata.insert(key.clone(), value.clone());
279            });
280
281            Ok(Schema::new_with_metadata(arrow_fields?, metadata))
282        } else {
283            Ok(Schema::new(arrow_fields?))
284        }
285    }
286}
287
288impl ArrowJsonField {
289    /// Compare the Arrow JSON field with the Arrow `Field`
290    fn equals_field(&self, field: &Field) -> bool {
291        // convert to a field
292        match self.to_arrow_field() {
293            Ok(self_field) => {
294                assert_eq!(&self_field, field, "Arrow fields not the same");
295                true
296            }
297            Err(e) => {
298                eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
299                false
300            }
301        }
302    }
303
304    /// Convert to an Arrow Field
305    /// TODO: convert to use an Into
306    fn to_arrow_field(&self) -> Result<Field> {
307        // a bit regressive, but we have to convert the field to JSON in order to convert it
308        let field =
309            serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
310        field_from_json(&field)
311    }
312}
313
314/// Generates a [`RecordBatch`] from an Arrow JSON batch, given a schema
315pub fn record_batch_from_json(
316    schema: &Schema,
317    json_batch: ArrowJsonBatch,
318    json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
319) -> Result<RecordBatch> {
320    let mut columns = vec![];
321
322    for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
323        let col = array_from_json(field, json_col, json_dictionaries)?;
324        columns.push(col);
325    }
326
327    RecordBatch::try_new(Arc::new(schema.clone()), columns)
328}
329
330/// Construct an Arrow array from a partially typed JSON column
331pub fn array_from_json(
332    field: &Field,
333    json_col: ArrowJsonColumn,
334    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
335) -> Result<ArrayRef> {
336    match field.data_type() {
337        DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
338        DataType::Boolean => {
339            let mut b = BooleanBuilder::with_capacity(json_col.count);
340            for (is_valid, value) in json_col
341                .validity
342                .as_ref()
343                .unwrap()
344                .iter()
345                .zip(json_col.data.unwrap())
346            {
347                match is_valid {
348                    1 => b.append_value(value.as_bool().unwrap()),
349                    _ => b.append_null(),
350                };
351            }
352            Ok(Arc::new(b.finish()))
353        }
354        DataType::Int8 => {
355            let mut b = Int8Builder::with_capacity(json_col.count);
356            for (is_valid, value) in json_col
357                .validity
358                .as_ref()
359                .unwrap()
360                .iter()
361                .zip(json_col.data.unwrap())
362            {
363                match is_valid {
364                    1 => b.append_value(value.as_i64().ok_or_else(|| {
365                        ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
366                    })? as i8),
367                    _ => b.append_null(),
368                };
369            }
370            Ok(Arc::new(b.finish()))
371        }
372        DataType::Int16 => {
373            let mut b = Int16Builder::with_capacity(json_col.count);
374            for (is_valid, value) in json_col
375                .validity
376                .as_ref()
377                .unwrap()
378                .iter()
379                .zip(json_col.data.unwrap())
380            {
381                match is_valid {
382                    1 => b.append_value(value.as_i64().unwrap() as i16),
383                    _ => b.append_null(),
384                };
385            }
386            Ok(Arc::new(b.finish()))
387        }
388        DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
389            let mut b = Int32Builder::with_capacity(json_col.count);
390            for (is_valid, value) in json_col
391                .validity
392                .as_ref()
393                .unwrap()
394                .iter()
395                .zip(json_col.data.unwrap())
396            {
397                match is_valid {
398                    1 => b.append_value(value.as_i64().unwrap() as i32),
399                    _ => b.append_null(),
400                };
401            }
402            let array = Arc::new(b.finish()) as ArrayRef;
403            arrow::compute::cast(&array, field.data_type())
404        }
405        DataType::Interval(IntervalUnit::YearMonth) => {
406            let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
407            for (is_valid, value) in json_col
408                .validity
409                .as_ref()
410                .unwrap()
411                .iter()
412                .zip(json_col.data.unwrap())
413            {
414                match is_valid {
415                    1 => b.append_value(value.as_i64().unwrap() as i32),
416                    _ => b.append_null(),
417                };
418            }
419            Ok(Arc::new(b.finish()))
420        }
421        DataType::Int64
422        | DataType::Date64
423        | DataType::Time64(_)
424        | DataType::Timestamp(_, _)
425        | DataType::Duration(_) => {
426            let mut b = Int64Builder::with_capacity(json_col.count);
427            for (is_valid, value) in json_col
428                .validity
429                .as_ref()
430                .unwrap()
431                .iter()
432                .zip(json_col.data.unwrap())
433            {
434                match is_valid {
435                    1 => b.append_value(match value {
436                        Value::Number(n) => n.as_i64().unwrap(),
437                        Value::String(s) => s.parse().expect("Unable to parse string as i64"),
438                        _ => panic!("Unable to parse {value:?} as number"),
439                    }),
440                    _ => b.append_null(),
441                };
442            }
443            let array = Arc::new(b.finish()) as ArrayRef;
444            arrow::compute::cast(&array, field.data_type())
445        }
446        DataType::Interval(IntervalUnit::DayTime) => {
447            let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
448            for (is_valid, value) in json_col
449                .validity
450                .as_ref()
451                .unwrap()
452                .iter()
453                .zip(json_col.data.unwrap())
454            {
455                match is_valid {
456                    1 => b.append_value(match value {
457                        Value::Object(ref map)
458                            if map.contains_key("days") && map.contains_key("milliseconds") =>
459                        {
460                            match field.data_type() {
461                                DataType::Interval(IntervalUnit::DayTime) => {
462                                    let days = map.get("days").unwrap();
463                                    let milliseconds = map.get("milliseconds").unwrap();
464
465                                    match (days, milliseconds) {
466                                        (Value::Number(d), Value::Number(m)) => {
467                                            let days = d.as_i64().unwrap() as _;
468                                            let millis = m.as_i64().unwrap() as _;
469                                            IntervalDayTime::new(days, millis)
470                                        }
471                                        _ => {
472                                            panic!("Unable to parse {value:?} as interval daytime")
473                                        }
474                                    }
475                                }
476                                _ => panic!("Unable to parse {value:?} as interval daytime"),
477                            }
478                        }
479                        _ => panic!("Unable to parse {value:?} as number"),
480                    }),
481                    _ => b.append_null(),
482                };
483            }
484            Ok(Arc::new(b.finish()))
485        }
486        DataType::UInt8 => {
487            let mut b = UInt8Builder::with_capacity(json_col.count);
488            for (is_valid, value) in json_col
489                .validity
490                .as_ref()
491                .unwrap()
492                .iter()
493                .zip(json_col.data.unwrap())
494            {
495                match is_valid {
496                    1 => b.append_value(value.as_u64().unwrap() as u8),
497                    _ => b.append_null(),
498                };
499            }
500            Ok(Arc::new(b.finish()))
501        }
502        DataType::UInt16 => {
503            let mut b = UInt16Builder::with_capacity(json_col.count);
504            for (is_valid, value) in json_col
505                .validity
506                .as_ref()
507                .unwrap()
508                .iter()
509                .zip(json_col.data.unwrap())
510            {
511                match is_valid {
512                    1 => b.append_value(value.as_u64().unwrap() as u16),
513                    _ => b.append_null(),
514                };
515            }
516            Ok(Arc::new(b.finish()))
517        }
518        DataType::UInt32 => {
519            let mut b = UInt32Builder::with_capacity(json_col.count);
520            for (is_valid, value) in json_col
521                .validity
522                .as_ref()
523                .unwrap()
524                .iter()
525                .zip(json_col.data.unwrap())
526            {
527                match is_valid {
528                    1 => b.append_value(value.as_u64().unwrap() as u32),
529                    _ => b.append_null(),
530                };
531            }
532            Ok(Arc::new(b.finish()))
533        }
534        DataType::UInt64 => {
535            let mut b = UInt64Builder::with_capacity(json_col.count);
536            for (is_valid, value) in json_col
537                .validity
538                .as_ref()
539                .unwrap()
540                .iter()
541                .zip(json_col.data.unwrap())
542            {
543                match is_valid {
544                    1 => {
545                        if value.is_string() {
546                            b.append_value(
547                                value
548                                    .as_str()
549                                    .unwrap()
550                                    .parse()
551                                    .expect("Unable to parse string as u64"),
552                            )
553                        } else if value.is_number() {
554                            b.append_value(value.as_u64().expect("Unable to read number as u64"))
555                        } else {
556                            panic!("Unable to parse value {value:?} as u64")
557                        }
558                    }
559                    _ => b.append_null(),
560                };
561            }
562            Ok(Arc::new(b.finish()))
563        }
564        DataType::Interval(IntervalUnit::MonthDayNano) => {
565            let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
566            for (is_valid, value) in json_col
567                .validity
568                .as_ref()
569                .unwrap()
570                .iter()
571                .zip(json_col.data.unwrap())
572            {
573                match is_valid {
574                    1 => b.append_value(match value {
575                        Value::Object(v) => {
576                            let months = v.get("months").unwrap();
577                            let days = v.get("days").unwrap();
578                            let nanoseconds = v.get("nanoseconds").unwrap();
579                            match (months, days, nanoseconds) {
580                                (
581                                    Value::Number(months),
582                                    Value::Number(days),
583                                    Value::Number(nanoseconds),
584                                ) => {
585                                    let months = months.as_i64().unwrap() as i32;
586                                    let days = days.as_i64().unwrap() as i32;
587                                    let nanoseconds = nanoseconds.as_i64().unwrap();
588                                    IntervalMonthDayNano::new(months, days, nanoseconds)
589                                }
590                                (_, _, _) => {
591                                    panic!("Unable to parse {v:?} as MonthDayNano")
592                                }
593                            }
594                        }
595                        _ => panic!("Unable to parse {value:?} as MonthDayNano"),
596                    }),
597                    _ => b.append_null(),
598                };
599            }
600            Ok(Arc::new(b.finish()))
601        }
602        DataType::Float32 => {
603            let mut b = Float32Builder::with_capacity(json_col.count);
604            for (is_valid, value) in json_col
605                .validity
606                .as_ref()
607                .unwrap()
608                .iter()
609                .zip(json_col.data.unwrap())
610            {
611                match is_valid {
612                    1 => b.append_value(value.as_f64().unwrap() as f32),
613                    _ => b.append_null(),
614                };
615            }
616            Ok(Arc::new(b.finish()))
617        }
618        DataType::Float64 => {
619            let mut b = Float64Builder::with_capacity(json_col.count);
620            for (is_valid, value) in json_col
621                .validity
622                .as_ref()
623                .unwrap()
624                .iter()
625                .zip(json_col.data.unwrap())
626            {
627                match is_valid {
628                    1 => b.append_value(value.as_f64().unwrap()),
629                    _ => b.append_null(),
630                };
631            }
632            Ok(Arc::new(b.finish()))
633        }
634        DataType::Binary => {
635            let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
636            for (is_valid, value) in json_col
637                .validity
638                .as_ref()
639                .unwrap()
640                .iter()
641                .zip(json_col.data.unwrap())
642            {
643                match is_valid {
644                    1 => {
645                        let v = decode(value.as_str().unwrap()).unwrap();
646                        b.append_value(&v)
647                    }
648                    _ => b.append_null(),
649                };
650            }
651            Ok(Arc::new(b.finish()))
652        }
653        DataType::LargeBinary => {
654            let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
655            for (is_valid, value) in json_col
656                .validity
657                .as_ref()
658                .unwrap()
659                .iter()
660                .zip(json_col.data.unwrap())
661            {
662                match is_valid {
663                    1 => {
664                        let v = decode(value.as_str().unwrap()).unwrap();
665                        b.append_value(&v)
666                    }
667                    _ => b.append_null(),
668                };
669            }
670            Ok(Arc::new(b.finish()))
671        }
672        DataType::Utf8 => {
673            let mut b = StringBuilder::with_capacity(json_col.count, 1024);
674            for (is_valid, value) in json_col
675                .validity
676                .as_ref()
677                .unwrap()
678                .iter()
679                .zip(json_col.data.unwrap())
680            {
681                match is_valid {
682                    1 => b.append_value(value.as_str().unwrap()),
683                    _ => b.append_null(),
684                };
685            }
686            Ok(Arc::new(b.finish()))
687        }
688        DataType::LargeUtf8 => {
689            let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
690            for (is_valid, value) in json_col
691                .validity
692                .as_ref()
693                .unwrap()
694                .iter()
695                .zip(json_col.data.unwrap())
696            {
697                match is_valid {
698                    1 => b.append_value(value.as_str().unwrap()),
699                    _ => b.append_null(),
700                };
701            }
702            Ok(Arc::new(b.finish()))
703        }
704        DataType::FixedSizeBinary(len) => {
705            let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
706            for (is_valid, value) in json_col
707                .validity
708                .as_ref()
709                .unwrap()
710                .iter()
711                .zip(json_col.data.unwrap())
712            {
713                match is_valid {
714                    1 => {
715                        let v = hex::decode(value.as_str().unwrap()).unwrap();
716                        b.append_value(&v)?
717                    }
718                    _ => b.append_null(),
719                };
720            }
721            Ok(Arc::new(b.finish()))
722        }
723        DataType::List(child_field) => {
724            let null_buf = create_null_buf(&json_col);
725            let children = json_col.children.clone().unwrap();
726            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
727            let offsets: Vec<i32> = json_col
728                .offset
729                .unwrap()
730                .iter()
731                .map(|v| v.as_i64().unwrap() as i32)
732                .collect();
733            let list_data = ArrayData::builder(field.data_type().clone())
734                .len(json_col.count)
735                .offset(0)
736                .add_buffer(Buffer::from(offsets.to_byte_slice()))
737                .add_child_data(child_array.into_data())
738                .null_bit_buffer(Some(null_buf))
739                .build()
740                .unwrap();
741            Ok(Arc::new(ListArray::from(list_data)))
742        }
743        DataType::LargeList(child_field) => {
744            let null_buf = create_null_buf(&json_col);
745            let children = json_col.children.clone().unwrap();
746            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
747            let offsets: Vec<i64> = json_col
748                .offset
749                .unwrap()
750                .iter()
751                .map(|v| match v {
752                    Value::Number(n) => n.as_i64().unwrap(),
753                    Value::String(s) => s.parse::<i64>().unwrap(),
754                    _ => panic!("64-bit offset must be either string or number"),
755                })
756                .collect();
757            let list_data = ArrayData::builder(field.data_type().clone())
758                .len(json_col.count)
759                .offset(0)
760                .add_buffer(Buffer::from(offsets.to_byte_slice()))
761                .add_child_data(child_array.into_data())
762                .null_bit_buffer(Some(null_buf))
763                .build()
764                .unwrap();
765            Ok(Arc::new(LargeListArray::from(list_data)))
766        }
767        DataType::FixedSizeList(child_field, _) => {
768            let children = json_col.children.clone().unwrap();
769            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
770            let null_buf = create_null_buf(&json_col);
771            let list_data = ArrayData::builder(field.data_type().clone())
772                .len(json_col.count)
773                .add_child_data(child_array.into_data())
774                .null_bit_buffer(Some(null_buf))
775                .build()
776                .unwrap();
777            Ok(Arc::new(FixedSizeListArray::from(list_data)))
778        }
779        DataType::Struct(fields) => {
780            // construct struct with null data
781            let null_buf = create_null_buf(&json_col);
782            let mut array_data = ArrayData::builder(field.data_type().clone())
783                .len(json_col.count)
784                .null_bit_buffer(Some(null_buf));
785
786            for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
787                let array = array_from_json(field, col, dictionaries)?;
788                array_data = array_data.add_child_data(array.into_data());
789            }
790
791            let array = StructArray::from(array_data.build().unwrap());
792            Ok(Arc::new(array))
793        }
794        DataType::Dictionary(key_type, value_type) => {
795            #[allow(deprecated)]
796            let dict_id = field.dict_id().ok_or_else(|| {
797                ArrowError::JsonError(format!("Unable to find dict_id for field {field:?}"))
798            })?;
799            // find dictionary
800            let dictionary = dictionaries
801                .ok_or_else(|| {
802                    ArrowError::JsonError(format!(
803                        "Unable to find any dictionaries for field {field:?}"
804                    ))
805                })?
806                .get(&dict_id);
807            match dictionary {
808                Some(dictionary) => dictionary_array_from_json(
809                    field,
810                    json_col,
811                    key_type,
812                    value_type,
813                    dictionary,
814                    dictionaries,
815                ),
816                None => Err(ArrowError::JsonError(format!(
817                    "Unable to find dictionary for field {field:?}"
818                ))),
819            }
820        }
821        DataType::Decimal128(precision, scale) => {
822            let mut b = Decimal128Builder::with_capacity(json_col.count);
823            for (is_valid, value) in json_col
824                .validity
825                .as_ref()
826                .unwrap()
827                .iter()
828                .zip(json_col.data.unwrap())
829            {
830                match is_valid {
831                    1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
832                    _ => b.append_null(),
833                };
834            }
835            Ok(Arc::new(
836                b.finish().with_precision_and_scale(*precision, *scale)?,
837            ))
838        }
839        DataType::Decimal256(precision, scale) => {
840            let mut b = Decimal256Builder::with_capacity(json_col.count);
841            for (is_valid, value) in json_col
842                .validity
843                .as_ref()
844                .unwrap()
845                .iter()
846                .zip(json_col.data.unwrap())
847            {
848                match is_valid {
849                    1 => {
850                        let str = value.as_str().unwrap();
851                        let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
852                        let integer_bytes = integer.to_signed_bytes_le();
853                        let mut bytes = if integer.is_positive() {
854                            [0_u8; 32]
855                        } else {
856                            [255_u8; 32]
857                        };
858                        bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
859                        b.append_value(i256::from_le_bytes(bytes));
860                    }
861                    _ => b.append_null(),
862                }
863            }
864            Ok(Arc::new(
865                b.finish().with_precision_and_scale(*precision, *scale)?,
866            ))
867        }
868        DataType::Map(child_field, _) => {
869            let null_buf = create_null_buf(&json_col);
870            let children = json_col.children.clone().unwrap();
871            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
872            let offsets: Vec<i32> = json_col
873                .offset
874                .unwrap()
875                .iter()
876                .map(|v| v.as_i64().unwrap() as i32)
877                .collect();
878            let array_data = ArrayData::builder(field.data_type().clone())
879                .len(json_col.count)
880                .add_buffer(Buffer::from(offsets.to_byte_slice()))
881                .add_child_data(child_array.into_data())
882                .null_bit_buffer(Some(null_buf))
883                .build()
884                .unwrap();
885
886            let array = MapArray::from(array_data);
887            Ok(Arc::new(array))
888        }
889        DataType::Union(fields, _) => {
890            let type_ids = if let Some(type_id) = json_col.type_id {
891                type_id
892            } else {
893                return Err(ArrowError::JsonError(
894                    "Cannot find expected type_id in json column".to_string(),
895                ));
896            };
897
898            let offset: Option<ScalarBuffer<i32>> = json_col
899                .offset
900                .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
901
902            let mut children = Vec::with_capacity(fields.len());
903            for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
904                let array = array_from_json(field, col, dictionaries)?;
905                children.push(array);
906            }
907
908            let array =
909                UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
910            Ok(Arc::new(array))
911        }
912        t => Err(ArrowError::JsonError(format!(
913            "data type {t:?} not supported"
914        ))),
915    }
916}
917
918/// Construct a [`DictionaryArray`] from a partially typed JSON column
919pub fn dictionary_array_from_json(
920    field: &Field,
921    json_col: ArrowJsonColumn,
922    dict_key: &DataType,
923    dict_value: &DataType,
924    dictionary: &ArrowJsonDictionaryBatch,
925    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
926) -> Result<ArrayRef> {
927    match dict_key {
928        DataType::Int8
929        | DataType::Int16
930        | DataType::Int32
931        | DataType::Int64
932        | DataType::UInt8
933        | DataType::UInt16
934        | DataType::UInt32
935        | DataType::UInt64 => {
936            let null_buf = create_null_buf(&json_col);
937
938            // build the key data into a buffer, then construct values separately
939            #[allow(deprecated)]
940            let key_field = Field::new_dict(
941                "key",
942                dict_key.clone(),
943                field.is_nullable(),
944                #[allow(deprecated)]
945                field
946                    .dict_id()
947                    .expect("Dictionary fields must have a dict_id value"),
948                field
949                    .dict_is_ordered()
950                    .expect("Dictionary fields must have a dict_is_ordered value"),
951            );
952            let keys = array_from_json(&key_field, json_col, None)?;
953            // note: not enough info on nullability of dictionary
954            let value_field = Field::new("value", dict_value.clone(), true);
955            let values = array_from_json(
956                &value_field,
957                dictionary.data.columns[0].clone(),
958                dictionaries,
959            )?;
960
961            // convert key and value to dictionary data
962            let dict_data = ArrayData::builder(field.data_type().clone())
963                .len(keys.len())
964                .add_buffer(keys.to_data().buffers()[0].clone())
965                .null_bit_buffer(Some(null_buf))
966                .add_child_data(values.into_data())
967                .build()
968                .unwrap();
969
970            let array = match dict_key {
971                DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
972                DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
973                DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
974                DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
975                DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
976                DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
977                DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
978                DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
979                _ => unreachable!(),
980            };
981            Ok(array)
982        }
983        _ => Err(ArrowError::JsonError(format!(
984            "Dictionary key type {dict_key:?} not supported"
985        ))),
986    }
987}
988
989/// A helper to create a null buffer from a `Vec<bool>`
990fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
991    let num_bytes = bit_util::ceil(json_col.count, 8);
992    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
993    json_col
994        .validity
995        .clone()
996        .unwrap()
997        .iter()
998        .enumerate()
999        .for_each(|(i, v)| {
1000            let null_slice = null_buf.as_slice_mut();
1001            if *v != 0 {
1002                bit_util::set_bit(null_slice, i);
1003            }
1004        });
1005    null_buf.into()
1006}
1007
1008impl ArrowJsonBatch {
1009    /// Convert a [`RecordBatch`] to an [`ArrowJsonBatch`]
1010    pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1011        let mut json_batch = ArrowJsonBatch {
1012            count: batch.num_rows(),
1013            columns: Vec::with_capacity(batch.num_columns()),
1014        };
1015
1016        for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1017            let json_col = match field.data_type() {
1018                DataType::Int8 => {
1019                    let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1020
1021                    let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1022                    let mut data: Vec<Value> = Vec::with_capacity(col.len());
1023
1024                    for i in 0..col.len() {
1025                        if col.is_null(i) {
1026                            validity.push(1);
1027                            data.push(0i8.into());
1028                        } else {
1029                            validity.push(0);
1030                            data.push(col.value(i).into());
1031                        }
1032                    }
1033
1034                    ArrowJsonColumn {
1035                        name: field.name().clone(),
1036                        count: col.len(),
1037                        validity: Some(validity),
1038                        data: Some(data),
1039                        offset: None,
1040                        type_id: None,
1041                        children: None,
1042                    }
1043                }
1044                _ => ArrowJsonColumn {
1045                    name: field.name().clone(),
1046                    count: col.len(),
1047                    validity: None,
1048                    data: None,
1049                    offset: None,
1050                    type_id: None,
1051                    children: None,
1052                },
1053            };
1054
1055            json_batch.columns.push(json_col);
1056        }
1057
1058        json_batch
1059    }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064    use super::*;
1065
1066    use std::fs::File;
1067    use std::io::Read;
1068
1069    #[test]
1070    fn test_schema_equality() {
1071        let json = r#"
1072        {
1073            "fields": [
1074                {
1075                    "name": "c1",
1076                    "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1077                    "nullable": true,
1078                    "children": []
1079                },
1080                {
1081                    "name": "c2",
1082                    "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1083                    "nullable": true,
1084                    "children": []
1085                },
1086                {
1087                    "name": "c3",
1088                    "type": {"name": "utf8"},
1089                    "nullable": true,
1090                    "children": []
1091                },
1092                {
1093                    "name": "c4",
1094                    "type": {
1095                        "name": "list"
1096                    },
1097                    "nullable": true,
1098                    "children": [
1099                        {
1100                            "name": "custom_item",
1101                            "type": {
1102                                "name": "int",
1103                                "isSigned": true,
1104                                "bitWidth": 32
1105                            },
1106                            "nullable": false,
1107                            "children": []
1108                        }
1109                    ]
1110                }
1111            ]
1112        }"#;
1113        let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1114        let schema = Schema::new(vec![
1115            Field::new("c1", DataType::Int32, true),
1116            Field::new("c2", DataType::Float64, true),
1117            Field::new("c3", DataType::Utf8, true),
1118            Field::new(
1119                "c4",
1120                DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1121                true,
1122            ),
1123        ]);
1124        assert!(json_schema.equals_schema(&schema));
1125    }
1126
1127    #[test]
1128    fn test_arrow_data_equality() {
1129        let secs_tz = Some("Europe/Budapest".into());
1130        let millis_tz = Some("America/New_York".into());
1131        let micros_tz = Some("UTC".into());
1132        let nanos_tz = Some("Africa/Johannesburg".into());
1133
1134        let schema = Schema::new(vec![
1135            Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1136                [("k".to_string(), "v".to_string())]
1137                    .iter()
1138                    .cloned()
1139                    .collect(),
1140            ),
1141            Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1142                [("k2".to_string(), "v2".to_string())]
1143                    .iter()
1144                    .cloned()
1145                    .collect(),
1146            ),
1147            Field::new("bools", DataType::Boolean, true),
1148            Field::new("int8s", DataType::Int8, true),
1149            Field::new("int16s", DataType::Int16, true),
1150            Field::new("int32s", DataType::Int32, true),
1151            Field::new("int64s", DataType::Int64, true),
1152            Field::new("uint8s", DataType::UInt8, true),
1153            Field::new("uint16s", DataType::UInt16, true),
1154            Field::new("uint32s", DataType::UInt32, true),
1155            Field::new("uint64s", DataType::UInt64, true),
1156            Field::new("float32s", DataType::Float32, true),
1157            Field::new("float64s", DataType::Float64, true),
1158            Field::new("date_days", DataType::Date32, true),
1159            Field::new("date_millis", DataType::Date64, true),
1160            Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1161            Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1162            Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1163            Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1164            Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1165            Field::new(
1166                "ts_millis",
1167                DataType::Timestamp(TimeUnit::Millisecond, None),
1168                true,
1169            ),
1170            Field::new(
1171                "ts_micros",
1172                DataType::Timestamp(TimeUnit::Microsecond, None),
1173                true,
1174            ),
1175            Field::new(
1176                "ts_nanos",
1177                DataType::Timestamp(TimeUnit::Nanosecond, None),
1178                true,
1179            ),
1180            Field::new(
1181                "ts_secs_tz",
1182                DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1183                true,
1184            ),
1185            Field::new(
1186                "ts_millis_tz",
1187                DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1188                true,
1189            ),
1190            Field::new(
1191                "ts_micros_tz",
1192                DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1193                true,
1194            ),
1195            Field::new(
1196                "ts_nanos_tz",
1197                DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1198                true,
1199            ),
1200            Field::new("utf8s", DataType::Utf8, true),
1201            Field::new(
1202                "lists",
1203                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1204                true,
1205            ),
1206            Field::new(
1207                "structs",
1208                DataType::Struct(Fields::from(vec![
1209                    Field::new("int32s", DataType::Int32, true),
1210                    Field::new("utf8s", DataType::Utf8, true),
1211                ])),
1212                true,
1213            ),
1214        ]);
1215
1216        let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1217        let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1218        let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1219        let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1220        let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1221        let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1222        let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1223        let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1224        let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1225        let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1226        let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1227        let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1228        let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1229        let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1230        let date_millis = Date64Array::from(vec![
1231            Some(167903550396207),
1232            Some(29923997007884),
1233            Some(30612271819236),
1234        ]);
1235        let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1236        let time_millis =
1237            Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1238        let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1239        let time_nanos =
1240            Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1241        let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1242        let ts_millis =
1243            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1244        let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1245        let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1246        let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1247            .with_timezone_opt(secs_tz);
1248        let ts_millis_tz =
1249            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1250                .with_timezone_opt(millis_tz);
1251        let ts_micros_tz =
1252            TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1253        let ts_nanos_tz =
1254            TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1255                .with_timezone_opt(nanos_tz);
1256        let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1257
1258        let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1259        let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1260        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1261        let list_data = ArrayData::builder(list_data_type)
1262            .len(3)
1263            .add_buffer(value_offsets)
1264            .add_child_data(value_data.into_data())
1265            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1266            .build()
1267            .unwrap();
1268        let lists = ListArray::from(list_data);
1269
1270        let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1271        let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1272        let struct_data_type = DataType::Struct(Fields::from(vec![
1273            Field::new("int32s", DataType::Int32, true),
1274            Field::new("utf8s", DataType::Utf8, true),
1275        ]));
1276        let struct_data = ArrayData::builder(struct_data_type)
1277            .len(3)
1278            .add_child_data(structs_int32s.into_data())
1279            .add_child_data(structs_utf8s.into_data())
1280            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1281            .build()
1282            .unwrap();
1283        let structs = StructArray::from(struct_data);
1284
1285        let record_batch = RecordBatch::try_new(
1286            Arc::new(schema.clone()),
1287            vec![
1288                Arc::new(bools_with_metadata_map),
1289                Arc::new(bools_with_metadata_vec),
1290                Arc::new(bools),
1291                Arc::new(int8s),
1292                Arc::new(int16s),
1293                Arc::new(int32s),
1294                Arc::new(int64s),
1295                Arc::new(uint8s),
1296                Arc::new(uint16s),
1297                Arc::new(uint32s),
1298                Arc::new(uint64s),
1299                Arc::new(float32s),
1300                Arc::new(float64s),
1301                Arc::new(date_days),
1302                Arc::new(date_millis),
1303                Arc::new(time_secs),
1304                Arc::new(time_millis),
1305                Arc::new(time_micros),
1306                Arc::new(time_nanos),
1307                Arc::new(ts_secs),
1308                Arc::new(ts_millis),
1309                Arc::new(ts_micros),
1310                Arc::new(ts_nanos),
1311                Arc::new(ts_secs_tz),
1312                Arc::new(ts_millis_tz),
1313                Arc::new(ts_micros_tz),
1314                Arc::new(ts_nanos_tz),
1315                Arc::new(utf8s),
1316                Arc::new(lists),
1317                Arc::new(structs),
1318            ],
1319        )
1320        .unwrap();
1321        let mut file = File::open("data/integration.json").unwrap();
1322        let mut json = String::new();
1323        file.read_to_string(&mut json).unwrap();
1324        let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1325        // test schemas
1326        assert!(arrow_json.schema.equals_schema(&schema));
1327        // test record batch
1328        assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1329    }
1330}