arrow_integration_test/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for the [Apache Arrow JSON test data format](https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format)
19//!
20//! These utilities define structs that read the integration JSON format for integration testing purposes.
21//!
22//! This is not a canonical format, but provides a human-readable way of verifying language implementations
23
24#![doc(
25    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
26    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
27)]
28#![cfg_attr(docsrs, feature(doc_auto_cfg))]
29#![warn(missing_docs)]
30use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer};
31use hex::decode;
32use num::BigInt;
33use num::Signed;
34use serde::{Deserialize, Serialize};
35use serde_json::{Map as SJMap, Value};
36use std::collections::HashMap;
37use std::sync::Arc;
38
39use arrow::array::*;
40use arrow::buffer::{Buffer, MutableBuffer};
41use arrow::datatypes::*;
42use arrow::error::{ArrowError, Result};
43use arrow::util::bit_util;
44
45mod datatype;
46mod field;
47mod schema;
48
49pub use datatype::*;
50pub use field::*;
51pub use schema::*;
52
53/// A struct that represents an Arrow file with a schema and record batches
54///
55/// See <https://github.com/apache/arrow/blob/master/docs/source/format/Integration.rst#json-test-data-format>
56#[derive(Deserialize, Serialize, Debug)]
57pub struct ArrowJson {
58    /// The Arrow schema for JSON file
59    pub schema: ArrowJsonSchema,
60    /// The `RecordBatch`es in the JSON file
61    pub batches: Vec<ArrowJsonBatch>,
62    /// The dictionaries in the JSON file
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
65}
66
67/// A struct that partially reads the Arrow JSON schema.
68///
69/// Fields are left as JSON `Value` as they vary by `DataType`
70#[derive(Deserialize, Serialize, Debug)]
71pub struct ArrowJsonSchema {
72    /// An array of JSON fields
73    pub fields: Vec<ArrowJsonField>,
74    /// An array of metadata key-value pairs
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub metadata: Option<Vec<HashMap<String, String>>>,
77}
78
79/// Fields are left as JSON `Value` as they vary by `DataType`
80#[derive(Deserialize, Serialize, Debug)]
81pub struct ArrowJsonField {
82    /// The name of the field
83    pub name: String,
84    /// The data type of the field,
85    /// can be any valid JSON value
86    #[serde(rename = "type")]
87    pub field_type: Value,
88    /// Whether the field is nullable
89    pub nullable: bool,
90    /// The children fields
91    pub children: Vec<ArrowJsonField>,
92    /// The dictionary for the field
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub dictionary: Option<ArrowJsonFieldDictionary>,
95    /// The metadata for the field, if any
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub metadata: Option<Value>,
98}
99
100impl From<&FieldRef> for ArrowJsonField {
101    fn from(value: &FieldRef) -> Self {
102        Self::from(value.as_ref())
103    }
104}
105
106impl From<&Field> for ArrowJsonField {
107    fn from(field: &Field) -> Self {
108        let metadata_value = match field.metadata().is_empty() {
109            false => {
110                let mut array = Vec::new();
111                for (k, v) in field.metadata() {
112                    let mut kv_map = SJMap::new();
113                    kv_map.insert(k.clone(), Value::String(v.clone()));
114                    array.push(Value::Object(kv_map));
115                }
116                if !array.is_empty() {
117                    Some(Value::Array(array))
118                } else {
119                    None
120                }
121            }
122            _ => None,
123        };
124
125        Self {
126            name: field.name().to_string(),
127            field_type: data_type_to_json(field.data_type()),
128            nullable: field.is_nullable(),
129            children: vec![],
130            dictionary: None, // TODO: not enough info
131            metadata: metadata_value,
132        }
133    }
134}
135
136/// Represents a dictionary-encoded field in the Arrow JSON format
137#[derive(Deserialize, Serialize, Debug)]
138pub struct ArrowJsonFieldDictionary {
139    /// A unique identifier for the dictionary
140    pub id: i64,
141    /// The type of the dictionary index
142    #[serde(rename = "indexType")]
143    pub index_type: DictionaryIndexType,
144    /// Whether the dictionary is ordered
145    #[serde(rename = "isOrdered")]
146    pub is_ordered: bool,
147}
148
149/// Type of an index for a dictionary-encoded field in the Arrow JSON format
150#[derive(Deserialize, Serialize, Debug)]
151pub struct DictionaryIndexType {
152    /// The name of the dictionary index type
153    pub name: String,
154    /// Whether the dictionary index type is signed
155    #[serde(rename = "isSigned")]
156    pub is_signed: bool,
157    /// The bit width of the dictionary index type
158    #[serde(rename = "bitWidth")]
159    pub bit_width: i64,
160}
161
162/// A struct that partially reads the Arrow JSON record batch
163#[derive(Deserialize, Serialize, Debug, Clone)]
164pub struct ArrowJsonBatch {
165    count: usize,
166    /// The columns in the record batch
167    pub columns: Vec<ArrowJsonColumn>,
168}
169
170/// A struct that partially reads the Arrow JSON dictionary batch
171#[derive(Deserialize, Serialize, Debug, Clone)]
172#[allow(non_snake_case)]
173pub struct ArrowJsonDictionaryBatch {
174    /// The unique identifier for the dictionary
175    pub id: i64,
176    /// The data for the dictionary
177    pub data: ArrowJsonBatch,
178}
179
180/// A struct that partially reads the Arrow JSON column/array
181#[derive(Deserialize, Serialize, Clone, Debug)]
182pub struct ArrowJsonColumn {
183    name: String,
184    /// The number of elements in the column
185    pub count: usize,
186    /// The validity bitmap to determine null values
187    #[serde(rename = "VALIDITY")]
188    pub validity: Option<Vec<u8>>,
189    /// The data values in the column
190    #[serde(rename = "DATA")]
191    pub data: Option<Vec<Value>>,
192    /// The offsets for variable-sized data types
193    #[serde(rename = "OFFSET")]
194    pub offset: Option<Vec<Value>>, // leaving as Value as 64-bit offsets are strings
195    /// The type id for union types
196    #[serde(rename = "TYPE_ID")]
197    pub type_id: Option<Vec<i8>>,
198    /// The children columns for nested types
199    pub children: Option<Vec<ArrowJsonColumn>>,
200}
201
202impl ArrowJson {
203    /// Compare the Arrow JSON with a record batch reader
204    pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> Result<bool> {
205        if !self.schema.equals_schema(&reader.schema()) {
206            return Ok(false);
207        }
208
209        for json_batch in self.get_record_batches()?.into_iter() {
210            let batch = reader.next();
211            match batch {
212                Some(Ok(batch)) => {
213                    if json_batch != batch {
214                        println!("json: {json_batch:?}");
215                        println!("batch: {batch:?}");
216                        return Ok(false);
217                    }
218                }
219                Some(Err(e)) => return Err(e),
220                None => return Ok(false),
221            }
222        }
223
224        Ok(true)
225    }
226
227    /// Convert the stored dictionaries to `Vec[RecordBatch]`
228    pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
229        let schema = self.schema.to_arrow_schema()?;
230
231        let mut dictionaries = HashMap::new();
232        self.dictionaries.iter().for_each(|dict_batches| {
233            dict_batches.iter().for_each(|d| {
234                dictionaries.insert(d.id, d.clone());
235            });
236        });
237
238        let batches: Result<Vec<_>> = self
239            .batches
240            .iter()
241            .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries)))
242            .collect();
243
244        batches
245    }
246}
247
248impl ArrowJsonSchema {
249    /// Compare the Arrow JSON schema with the Arrow `Schema`
250    fn equals_schema(&self, schema: &Schema) -> bool {
251        let field_len = self.fields.len();
252        if field_len != schema.fields().len() {
253            return false;
254        }
255        for i in 0..field_len {
256            let json_field = &self.fields[i];
257            let field = schema.field(i);
258            if !json_field.equals_field(field) {
259                return false;
260            }
261        }
262        true
263    }
264
265    fn to_arrow_schema(&self) -> Result<Schema> {
266        let arrow_fields: Result<Vec<_>> = self
267            .fields
268            .iter()
269            .map(|field| field.to_arrow_field())
270            .collect();
271
272        if let Some(metadatas) = &self.metadata {
273            let mut metadata: HashMap<String, String> = HashMap::new();
274
275            metadatas.iter().for_each(|pair| {
276                let key = pair.get("key").unwrap();
277                let value = pair.get("value").unwrap();
278                metadata.insert(key.clone(), value.clone());
279            });
280
281            Ok(Schema::new_with_metadata(arrow_fields?, metadata))
282        } else {
283            Ok(Schema::new(arrow_fields?))
284        }
285    }
286}
287
288impl ArrowJsonField {
289    /// Compare the Arrow JSON field with the Arrow `Field`
290    fn equals_field(&self, field: &Field) -> bool {
291        // convert to a field
292        match self.to_arrow_field() {
293            Ok(self_field) => {
294                assert_eq!(&self_field, field, "Arrow fields not the same");
295                true
296            }
297            Err(e) => {
298                eprintln!("Encountered error while converting JSON field to Arrow field: {e:?}");
299                false
300            }
301        }
302    }
303
304    /// Convert to an Arrow Field
305    /// TODO: convert to use an Into
306    fn to_arrow_field(&self) -> Result<Field> {
307        // a bit regressive, but we have to convert the field to JSON in order to convert it
308        let field =
309            serde_json::to_value(self).map_err(|error| ArrowError::JsonError(error.to_string()))?;
310        field_from_json(&field)
311    }
312}
313
314/// Generates a [`RecordBatch`] from an Arrow JSON batch, given a schema
315pub fn record_batch_from_json(
316    schema: &Schema,
317    json_batch: ArrowJsonBatch,
318    json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
319) -> Result<RecordBatch> {
320    let mut columns = vec![];
321
322    for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
323        let col = array_from_json(field, json_col, json_dictionaries)?;
324        columns.push(col);
325    }
326
327    RecordBatch::try_new(Arc::new(schema.clone()), columns)
328}
329
330/// Construct an Arrow array from a partially typed JSON column
331pub fn array_from_json(
332    field: &Field,
333    json_col: ArrowJsonColumn,
334    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
335) -> Result<ArrayRef> {
336    match field.data_type() {
337        DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
338        DataType::Boolean => {
339            let mut b = BooleanBuilder::with_capacity(json_col.count);
340            for (is_valid, value) in json_col
341                .validity
342                .as_ref()
343                .unwrap()
344                .iter()
345                .zip(json_col.data.unwrap())
346            {
347                match is_valid {
348                    1 => b.append_value(value.as_bool().unwrap()),
349                    _ => b.append_null(),
350                };
351            }
352            Ok(Arc::new(b.finish()))
353        }
354        DataType::Int8 => {
355            let mut b = Int8Builder::with_capacity(json_col.count);
356            for (is_valid, value) in json_col
357                .validity
358                .as_ref()
359                .unwrap()
360                .iter()
361                .zip(json_col.data.unwrap())
362            {
363                match is_valid {
364                    1 => b.append_value(value.as_i64().ok_or_else(|| {
365                        ArrowError::JsonError(format!("Unable to get {value:?} as int64"))
366                    })? as i8),
367                    _ => b.append_null(),
368                };
369            }
370            Ok(Arc::new(b.finish()))
371        }
372        DataType::Int16 => {
373            let mut b = Int16Builder::with_capacity(json_col.count);
374            for (is_valid, value) in json_col
375                .validity
376                .as_ref()
377                .unwrap()
378                .iter()
379                .zip(json_col.data.unwrap())
380            {
381                match is_valid {
382                    1 => b.append_value(value.as_i64().unwrap() as i16),
383                    _ => b.append_null(),
384                };
385            }
386            Ok(Arc::new(b.finish()))
387        }
388        DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
389            let mut b = Int32Builder::with_capacity(json_col.count);
390            for (is_valid, value) in json_col
391                .validity
392                .as_ref()
393                .unwrap()
394                .iter()
395                .zip(json_col.data.unwrap())
396            {
397                match is_valid {
398                    1 => b.append_value(value.as_i64().unwrap() as i32),
399                    _ => b.append_null(),
400                };
401            }
402            let array = Arc::new(b.finish()) as ArrayRef;
403            arrow::compute::cast(&array, field.data_type())
404        }
405        DataType::Interval(IntervalUnit::YearMonth) => {
406            let mut b = IntervalYearMonthBuilder::with_capacity(json_col.count);
407            for (is_valid, value) in json_col
408                .validity
409                .as_ref()
410                .unwrap()
411                .iter()
412                .zip(json_col.data.unwrap())
413            {
414                match is_valid {
415                    1 => b.append_value(value.as_i64().unwrap() as i32),
416                    _ => b.append_null(),
417                };
418            }
419            Ok(Arc::new(b.finish()))
420        }
421        DataType::Int64
422        | DataType::Date64
423        | DataType::Time64(_)
424        | DataType::Timestamp(_, _)
425        | DataType::Duration(_) => {
426            let mut b = Int64Builder::with_capacity(json_col.count);
427            for (is_valid, value) in json_col
428                .validity
429                .as_ref()
430                .unwrap()
431                .iter()
432                .zip(json_col.data.unwrap())
433            {
434                match is_valid {
435                    1 => b.append_value(match value {
436                        Value::Number(n) => n.as_i64().unwrap(),
437                        Value::String(s) => s.parse().expect("Unable to parse string as i64"),
438                        _ => panic!("Unable to parse {value:?} as number"),
439                    }),
440                    _ => b.append_null(),
441                };
442            }
443            let array = Arc::new(b.finish()) as ArrayRef;
444            arrow::compute::cast(&array, field.data_type())
445        }
446        DataType::Interval(IntervalUnit::DayTime) => {
447            let mut b = IntervalDayTimeBuilder::with_capacity(json_col.count);
448            for (is_valid, value) in json_col
449                .validity
450                .as_ref()
451                .unwrap()
452                .iter()
453                .zip(json_col.data.unwrap())
454            {
455                match is_valid {
456                    1 => b.append_value(match value {
457                        Value::Object(ref map)
458                            if map.contains_key("days") && map.contains_key("milliseconds") =>
459                        {
460                            match field.data_type() {
461                                DataType::Interval(IntervalUnit::DayTime) => {
462                                    let days = map.get("days").unwrap();
463                                    let milliseconds = map.get("milliseconds").unwrap();
464
465                                    match (days, milliseconds) {
466                                        (Value::Number(d), Value::Number(m)) => {
467                                            let days = d.as_i64().unwrap() as _;
468                                            let millis = m.as_i64().unwrap() as _;
469                                            IntervalDayTime::new(days, millis)
470                                        }
471                                        _ => {
472                                            panic!("Unable to parse {value:?} as interval daytime")
473                                        }
474                                    }
475                                }
476                                _ => panic!("Unable to parse {value:?} as interval daytime"),
477                            }
478                        }
479                        _ => panic!("Unable to parse {value:?} as number"),
480                    }),
481                    _ => b.append_null(),
482                };
483            }
484            Ok(Arc::new(b.finish()))
485        }
486        DataType::UInt8 => {
487            let mut b = UInt8Builder::with_capacity(json_col.count);
488            for (is_valid, value) in json_col
489                .validity
490                .as_ref()
491                .unwrap()
492                .iter()
493                .zip(json_col.data.unwrap())
494            {
495                match is_valid {
496                    1 => b.append_value(value.as_u64().unwrap() as u8),
497                    _ => b.append_null(),
498                };
499            }
500            Ok(Arc::new(b.finish()))
501        }
502        DataType::UInt16 => {
503            let mut b = UInt16Builder::with_capacity(json_col.count);
504            for (is_valid, value) in json_col
505                .validity
506                .as_ref()
507                .unwrap()
508                .iter()
509                .zip(json_col.data.unwrap())
510            {
511                match is_valid {
512                    1 => b.append_value(value.as_u64().unwrap() as u16),
513                    _ => b.append_null(),
514                };
515            }
516            Ok(Arc::new(b.finish()))
517        }
518        DataType::UInt32 => {
519            let mut b = UInt32Builder::with_capacity(json_col.count);
520            for (is_valid, value) in json_col
521                .validity
522                .as_ref()
523                .unwrap()
524                .iter()
525                .zip(json_col.data.unwrap())
526            {
527                match is_valid {
528                    1 => b.append_value(value.as_u64().unwrap() as u32),
529                    _ => b.append_null(),
530                };
531            }
532            Ok(Arc::new(b.finish()))
533        }
534        DataType::UInt64 => {
535            let mut b = UInt64Builder::with_capacity(json_col.count);
536            for (is_valid, value) in json_col
537                .validity
538                .as_ref()
539                .unwrap()
540                .iter()
541                .zip(json_col.data.unwrap())
542            {
543                match is_valid {
544                    1 => {
545                        if value.is_string() {
546                            b.append_value(
547                                value
548                                    .as_str()
549                                    .unwrap()
550                                    .parse()
551                                    .expect("Unable to parse string as u64"),
552                            )
553                        } else if value.is_number() {
554                            b.append_value(value.as_u64().expect("Unable to read number as u64"))
555                        } else {
556                            panic!("Unable to parse value {value:?} as u64")
557                        }
558                    }
559                    _ => b.append_null(),
560                };
561            }
562            Ok(Arc::new(b.finish()))
563        }
564        DataType::Interval(IntervalUnit::MonthDayNano) => {
565            let mut b = IntervalMonthDayNanoBuilder::with_capacity(json_col.count);
566            for (is_valid, value) in json_col
567                .validity
568                .as_ref()
569                .unwrap()
570                .iter()
571                .zip(json_col.data.unwrap())
572            {
573                match is_valid {
574                    1 => b.append_value(match value {
575                        Value::Object(v) => {
576                            let months = v.get("months").unwrap();
577                            let days = v.get("days").unwrap();
578                            let nanoseconds = v.get("nanoseconds").unwrap();
579                            match (months, days, nanoseconds) {
580                                (
581                                    Value::Number(months),
582                                    Value::Number(days),
583                                    Value::Number(nanoseconds),
584                                ) => {
585                                    let months = months.as_i64().unwrap() as i32;
586                                    let days = days.as_i64().unwrap() as i32;
587                                    let nanoseconds = nanoseconds.as_i64().unwrap();
588                                    IntervalMonthDayNano::new(months, days, nanoseconds)
589                                }
590                                (_, _, _) => {
591                                    panic!("Unable to parse {v:?} as MonthDayNano")
592                                }
593                            }
594                        }
595                        _ => panic!("Unable to parse {value:?} as MonthDayNano"),
596                    }),
597                    _ => b.append_null(),
598                };
599            }
600            Ok(Arc::new(b.finish()))
601        }
602        DataType::Float32 => {
603            let mut b = Float32Builder::with_capacity(json_col.count);
604            for (is_valid, value) in json_col
605                .validity
606                .as_ref()
607                .unwrap()
608                .iter()
609                .zip(json_col.data.unwrap())
610            {
611                match is_valid {
612                    1 => b.append_value(value.as_f64().unwrap() as f32),
613                    _ => b.append_null(),
614                };
615            }
616            Ok(Arc::new(b.finish()))
617        }
618        DataType::Float64 => {
619            let mut b = Float64Builder::with_capacity(json_col.count);
620            for (is_valid, value) in json_col
621                .validity
622                .as_ref()
623                .unwrap()
624                .iter()
625                .zip(json_col.data.unwrap())
626            {
627                match is_valid {
628                    1 => b.append_value(value.as_f64().unwrap()),
629                    _ => b.append_null(),
630                };
631            }
632            Ok(Arc::new(b.finish()))
633        }
634        DataType::Binary => {
635            let mut b = BinaryBuilder::with_capacity(json_col.count, 1024);
636            for (is_valid, value) in json_col
637                .validity
638                .as_ref()
639                .unwrap()
640                .iter()
641                .zip(json_col.data.unwrap())
642            {
643                match is_valid {
644                    1 => {
645                        let v = decode(value.as_str().unwrap()).unwrap();
646                        b.append_value(&v)
647                    }
648                    _ => b.append_null(),
649                };
650            }
651            Ok(Arc::new(b.finish()))
652        }
653        DataType::LargeBinary => {
654            let mut b = LargeBinaryBuilder::with_capacity(json_col.count, 1024);
655            for (is_valid, value) in json_col
656                .validity
657                .as_ref()
658                .unwrap()
659                .iter()
660                .zip(json_col.data.unwrap())
661            {
662                match is_valid {
663                    1 => {
664                        let v = decode(value.as_str().unwrap()).unwrap();
665                        b.append_value(&v)
666                    }
667                    _ => b.append_null(),
668                };
669            }
670            Ok(Arc::new(b.finish()))
671        }
672        DataType::Utf8 => {
673            let mut b = StringBuilder::with_capacity(json_col.count, 1024);
674            for (is_valid, value) in json_col
675                .validity
676                .as_ref()
677                .unwrap()
678                .iter()
679                .zip(json_col.data.unwrap())
680            {
681                match is_valid {
682                    1 => b.append_value(value.as_str().unwrap()),
683                    _ => b.append_null(),
684                };
685            }
686            Ok(Arc::new(b.finish()))
687        }
688        DataType::LargeUtf8 => {
689            let mut b = LargeStringBuilder::with_capacity(json_col.count, 1024);
690            for (is_valid, value) in json_col
691                .validity
692                .as_ref()
693                .unwrap()
694                .iter()
695                .zip(json_col.data.unwrap())
696            {
697                match is_valid {
698                    1 => b.append_value(value.as_str().unwrap()),
699                    _ => b.append_null(),
700                };
701            }
702            Ok(Arc::new(b.finish()))
703        }
704        DataType::FixedSizeBinary(len) => {
705            let mut b = FixedSizeBinaryBuilder::with_capacity(json_col.count, *len);
706            for (is_valid, value) in json_col
707                .validity
708                .as_ref()
709                .unwrap()
710                .iter()
711                .zip(json_col.data.unwrap())
712            {
713                match is_valid {
714                    1 => {
715                        let v = hex::decode(value.as_str().unwrap()).unwrap();
716                        b.append_value(&v)?
717                    }
718                    _ => b.append_null(),
719                };
720            }
721            Ok(Arc::new(b.finish()))
722        }
723        DataType::List(child_field) => {
724            let null_buf = create_null_buf(&json_col);
725            let children = json_col.children.clone().unwrap();
726            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
727            let offsets: Vec<i32> = json_col
728                .offset
729                .unwrap()
730                .iter()
731                .map(|v| v.as_i64().unwrap() as i32)
732                .collect();
733            let list_data = ArrayData::builder(field.data_type().clone())
734                .len(json_col.count)
735                .offset(0)
736                .add_buffer(Buffer::from(offsets.to_byte_slice()))
737                .add_child_data(child_array.into_data())
738                .null_bit_buffer(Some(null_buf))
739                .build()
740                .unwrap();
741            Ok(Arc::new(ListArray::from(list_data)))
742        }
743        DataType::LargeList(child_field) => {
744            let null_buf = create_null_buf(&json_col);
745            let children = json_col.children.clone().unwrap();
746            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
747            let offsets: Vec<i64> = json_col
748                .offset
749                .unwrap()
750                .iter()
751                .map(|v| match v {
752                    Value::Number(n) => n.as_i64().unwrap(),
753                    Value::String(s) => s.parse::<i64>().unwrap(),
754                    _ => panic!("64-bit offset must be either string or number"),
755                })
756                .collect();
757            let list_data = ArrayData::builder(field.data_type().clone())
758                .len(json_col.count)
759                .offset(0)
760                .add_buffer(Buffer::from(offsets.to_byte_slice()))
761                .add_child_data(child_array.into_data())
762                .null_bit_buffer(Some(null_buf))
763                .build()
764                .unwrap();
765            Ok(Arc::new(LargeListArray::from(list_data)))
766        }
767        DataType::FixedSizeList(child_field, _) => {
768            let children = json_col.children.clone().unwrap();
769            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
770            let null_buf = create_null_buf(&json_col);
771            let list_data = ArrayData::builder(field.data_type().clone())
772                .len(json_col.count)
773                .add_child_data(child_array.into_data())
774                .null_bit_buffer(Some(null_buf))
775                .build()
776                .unwrap();
777            Ok(Arc::new(FixedSizeListArray::from(list_data)))
778        }
779        DataType::Struct(fields) => {
780            // construct struct with null data
781            let null_buf = create_null_buf(&json_col);
782            let mut array_data = ArrayData::builder(field.data_type().clone())
783                .len(json_col.count)
784                .null_bit_buffer(Some(null_buf));
785
786            for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
787                let array = array_from_json(field, col, dictionaries)?;
788                array_data = array_data.add_child_data(array.into_data());
789            }
790
791            let array = StructArray::from(array_data.build().unwrap());
792            Ok(Arc::new(array))
793        }
794        DataType::Dictionary(key_type, value_type) => {
795            #[allow(deprecated)]
796            let dict_id = field.dict_id().ok_or_else(|| {
797                ArrowError::JsonError(format!("Unable to find dict_id for field {field:?}"))
798            })?;
799            // find dictionary
800            let dictionary = dictionaries
801                .ok_or_else(|| {
802                    ArrowError::JsonError(format!(
803                        "Unable to find any dictionaries for field {field:?}"
804                    ))
805                })?
806                .get(&dict_id);
807            match dictionary {
808                Some(dictionary) => dictionary_array_from_json(
809                    field,
810                    json_col,
811                    key_type,
812                    value_type,
813                    dictionary,
814                    dictionaries,
815                ),
816                None => Err(ArrowError::JsonError(format!(
817                    "Unable to find dictionary for field {field:?}"
818                ))),
819            }
820        }
821        DataType::Decimal32(precision, scale) => {
822            let mut b = Decimal32Builder::with_capacity(json_col.count);
823            for (is_valid, value) in json_col
824                .validity
825                .as_ref()
826                .unwrap()
827                .iter()
828                .zip(json_col.data.unwrap())
829            {
830                match is_valid {
831                    1 => b.append_value(value.as_str().unwrap().parse::<i32>().unwrap()),
832                    _ => b.append_null(),
833                };
834            }
835            Ok(Arc::new(
836                b.finish().with_precision_and_scale(*precision, *scale)?,
837            ))
838        }
839        DataType::Decimal64(precision, scale) => {
840            let mut b = Decimal64Builder::with_capacity(json_col.count);
841            for (is_valid, value) in json_col
842                .validity
843                .as_ref()
844                .unwrap()
845                .iter()
846                .zip(json_col.data.unwrap())
847            {
848                match is_valid {
849                    1 => b.append_value(value.as_str().unwrap().parse::<i64>().unwrap()),
850                    _ => b.append_null(),
851                };
852            }
853            Ok(Arc::new(
854                b.finish().with_precision_and_scale(*precision, *scale)?,
855            ))
856        }
857        DataType::Decimal128(precision, scale) => {
858            let mut b = Decimal128Builder::with_capacity(json_col.count);
859            for (is_valid, value) in json_col
860                .validity
861                .as_ref()
862                .unwrap()
863                .iter()
864                .zip(json_col.data.unwrap())
865            {
866                match is_valid {
867                    1 => b.append_value(value.as_str().unwrap().parse::<i128>().unwrap()),
868                    _ => b.append_null(),
869                };
870            }
871            Ok(Arc::new(
872                b.finish().with_precision_and_scale(*precision, *scale)?,
873            ))
874        }
875        DataType::Decimal256(precision, scale) => {
876            let mut b = Decimal256Builder::with_capacity(json_col.count);
877            for (is_valid, value) in json_col
878                .validity
879                .as_ref()
880                .unwrap()
881                .iter()
882                .zip(json_col.data.unwrap())
883            {
884                match is_valid {
885                    1 => {
886                        let str = value.as_str().unwrap();
887                        let integer = BigInt::parse_bytes(str.as_bytes(), 10).unwrap();
888                        let integer_bytes = integer.to_signed_bytes_le();
889                        let mut bytes = if integer.is_positive() {
890                            [0_u8; 32]
891                        } else {
892                            [255_u8; 32]
893                        };
894                        bytes[0..integer_bytes.len()].copy_from_slice(integer_bytes.as_slice());
895                        b.append_value(i256::from_le_bytes(bytes));
896                    }
897                    _ => b.append_null(),
898                }
899            }
900            Ok(Arc::new(
901                b.finish().with_precision_and_scale(*precision, *scale)?,
902            ))
903        }
904        DataType::Map(child_field, _) => {
905            let null_buf = create_null_buf(&json_col);
906            let children = json_col.children.clone().unwrap();
907            let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?;
908            let offsets: Vec<i32> = json_col
909                .offset
910                .unwrap()
911                .iter()
912                .map(|v| v.as_i64().unwrap() as i32)
913                .collect();
914            let array_data = ArrayData::builder(field.data_type().clone())
915                .len(json_col.count)
916                .add_buffer(Buffer::from(offsets.to_byte_slice()))
917                .add_child_data(child_array.into_data())
918                .null_bit_buffer(Some(null_buf))
919                .build()
920                .unwrap();
921
922            let array = MapArray::from(array_data);
923            Ok(Arc::new(array))
924        }
925        DataType::Union(fields, _) => {
926            let type_ids = if let Some(type_id) = json_col.type_id {
927                type_id
928            } else {
929                return Err(ArrowError::JsonError(
930                    "Cannot find expected type_id in json column".to_string(),
931                ));
932            };
933
934            let offset: Option<ScalarBuffer<i32>> = json_col
935                .offset
936                .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
937
938            let mut children = Vec::with_capacity(fields.len());
939            for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
940                let array = array_from_json(field, col, dictionaries)?;
941                children.push(array);
942            }
943
944            let array =
945                UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
946            Ok(Arc::new(array))
947        }
948        t => Err(ArrowError::JsonError(format!(
949            "data type {t:?} not supported"
950        ))),
951    }
952}
953
954/// Construct a [`DictionaryArray`] from a partially typed JSON column
955pub fn dictionary_array_from_json(
956    field: &Field,
957    json_col: ArrowJsonColumn,
958    dict_key: &DataType,
959    dict_value: &DataType,
960    dictionary: &ArrowJsonDictionaryBatch,
961    dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
962) -> Result<ArrayRef> {
963    match dict_key {
964        DataType::Int8
965        | DataType::Int16
966        | DataType::Int32
967        | DataType::Int64
968        | DataType::UInt8
969        | DataType::UInt16
970        | DataType::UInt32
971        | DataType::UInt64 => {
972            let null_buf = create_null_buf(&json_col);
973
974            // build the key data into a buffer, then construct values separately
975            #[allow(deprecated)]
976            let key_field = Field::new_dict(
977                "key",
978                dict_key.clone(),
979                field.is_nullable(),
980                #[allow(deprecated)]
981                field
982                    .dict_id()
983                    .expect("Dictionary fields must have a dict_id value"),
984                field
985                    .dict_is_ordered()
986                    .expect("Dictionary fields must have a dict_is_ordered value"),
987            );
988            let keys = array_from_json(&key_field, json_col, None)?;
989            // note: not enough info on nullability of dictionary
990            let value_field = Field::new("value", dict_value.clone(), true);
991            let values = array_from_json(
992                &value_field,
993                dictionary.data.columns[0].clone(),
994                dictionaries,
995            )?;
996
997            // convert key and value to dictionary data
998            let dict_data = ArrayData::builder(field.data_type().clone())
999                .len(keys.len())
1000                .add_buffer(keys.to_data().buffers()[0].clone())
1001                .null_bit_buffer(Some(null_buf))
1002                .add_child_data(values.into_data())
1003                .build()
1004                .unwrap();
1005
1006            let array = match dict_key {
1007                DataType::Int8 => Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef,
1008                DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)),
1009                DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)),
1010                DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)),
1011                DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)),
1012                DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)),
1013                DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)),
1014                DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)),
1015                _ => unreachable!(),
1016            };
1017            Ok(array)
1018        }
1019        _ => Err(ArrowError::JsonError(format!(
1020            "Dictionary key type {dict_key:?} not supported"
1021        ))),
1022    }
1023}
1024
1025/// A helper to create a null buffer from a `Vec<bool>`
1026fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
1027    let num_bytes = bit_util::ceil(json_col.count, 8);
1028    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
1029    json_col
1030        .validity
1031        .clone()
1032        .unwrap()
1033        .iter()
1034        .enumerate()
1035        .for_each(|(i, v)| {
1036            let null_slice = null_buf.as_slice_mut();
1037            if *v != 0 {
1038                bit_util::set_bit(null_slice, i);
1039            }
1040        });
1041    null_buf.into()
1042}
1043
1044impl ArrowJsonBatch {
1045    /// Convert a [`RecordBatch`] to an [`ArrowJsonBatch`]
1046    pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
1047        let mut json_batch = ArrowJsonBatch {
1048            count: batch.num_rows(),
1049            columns: Vec::with_capacity(batch.num_columns()),
1050        };
1051
1052        for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) {
1053            let json_col = match field.data_type() {
1054                DataType::Int8 => {
1055                    let col = col.as_any().downcast_ref::<Int8Array>().unwrap();
1056
1057                    let mut validity: Vec<u8> = Vec::with_capacity(col.len());
1058                    let mut data: Vec<Value> = Vec::with_capacity(col.len());
1059
1060                    for i in 0..col.len() {
1061                        if col.is_null(i) {
1062                            validity.push(1);
1063                            data.push(0i8.into());
1064                        } else {
1065                            validity.push(0);
1066                            data.push(col.value(i).into());
1067                        }
1068                    }
1069
1070                    ArrowJsonColumn {
1071                        name: field.name().clone(),
1072                        count: col.len(),
1073                        validity: Some(validity),
1074                        data: Some(data),
1075                        offset: None,
1076                        type_id: None,
1077                        children: None,
1078                    }
1079                }
1080                _ => ArrowJsonColumn {
1081                    name: field.name().clone(),
1082                    count: col.len(),
1083                    validity: None,
1084                    data: None,
1085                    offset: None,
1086                    type_id: None,
1087                    children: None,
1088                },
1089            };
1090
1091            json_batch.columns.push(json_col);
1092        }
1093
1094        json_batch
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use super::*;
1101
1102    use std::fs::File;
1103    use std::io::Read;
1104
1105    #[test]
1106    fn test_schema_equality() {
1107        let json = r#"
1108        {
1109            "fields": [
1110                {
1111                    "name": "c1",
1112                    "type": {"name": "int", "isSigned": true, "bitWidth": 32},
1113                    "nullable": true,
1114                    "children": []
1115                },
1116                {
1117                    "name": "c2",
1118                    "type": {"name": "floatingpoint", "precision": "DOUBLE"},
1119                    "nullable": true,
1120                    "children": []
1121                },
1122                {
1123                    "name": "c3",
1124                    "type": {"name": "utf8"},
1125                    "nullable": true,
1126                    "children": []
1127                },
1128                {
1129                    "name": "c4",
1130                    "type": {
1131                        "name": "list"
1132                    },
1133                    "nullable": true,
1134                    "children": [
1135                        {
1136                            "name": "custom_item",
1137                            "type": {
1138                                "name": "int",
1139                                "isSigned": true,
1140                                "bitWidth": 32
1141                            },
1142                            "nullable": false,
1143                            "children": []
1144                        }
1145                    ]
1146                }
1147            ]
1148        }"#;
1149        let json_schema: ArrowJsonSchema = serde_json::from_str(json).unwrap();
1150        let schema = Schema::new(vec![
1151            Field::new("c1", DataType::Int32, true),
1152            Field::new("c2", DataType::Float64, true),
1153            Field::new("c3", DataType::Utf8, true),
1154            Field::new(
1155                "c4",
1156                DataType::List(Arc::new(Field::new("custom_item", DataType::Int32, false))),
1157                true,
1158            ),
1159        ]);
1160        assert!(json_schema.equals_schema(&schema));
1161    }
1162
1163    #[test]
1164    fn test_arrow_data_equality() {
1165        let secs_tz = Some("Europe/Budapest".into());
1166        let millis_tz = Some("America/New_York".into());
1167        let micros_tz = Some("UTC".into());
1168        let nanos_tz = Some("Africa/Johannesburg".into());
1169
1170        let schema = Schema::new(vec![
1171            Field::new("bools-with-metadata-map", DataType::Boolean, true).with_metadata(
1172                [("k".to_string(), "v".to_string())]
1173                    .iter()
1174                    .cloned()
1175                    .collect(),
1176            ),
1177            Field::new("bools-with-metadata-vec", DataType::Boolean, true).with_metadata(
1178                [("k2".to_string(), "v2".to_string())]
1179                    .iter()
1180                    .cloned()
1181                    .collect(),
1182            ),
1183            Field::new("bools", DataType::Boolean, true),
1184            Field::new("int8s", DataType::Int8, true),
1185            Field::new("int16s", DataType::Int16, true),
1186            Field::new("int32s", DataType::Int32, true),
1187            Field::new("int64s", DataType::Int64, true),
1188            Field::new("uint8s", DataType::UInt8, true),
1189            Field::new("uint16s", DataType::UInt16, true),
1190            Field::new("uint32s", DataType::UInt32, true),
1191            Field::new("uint64s", DataType::UInt64, true),
1192            Field::new("float32s", DataType::Float32, true),
1193            Field::new("float64s", DataType::Float64, true),
1194            Field::new("date_days", DataType::Date32, true),
1195            Field::new("date_millis", DataType::Date64, true),
1196            Field::new("time_secs", DataType::Time32(TimeUnit::Second), true),
1197            Field::new("time_millis", DataType::Time32(TimeUnit::Millisecond), true),
1198            Field::new("time_micros", DataType::Time64(TimeUnit::Microsecond), true),
1199            Field::new("time_nanos", DataType::Time64(TimeUnit::Nanosecond), true),
1200            Field::new("ts_secs", DataType::Timestamp(TimeUnit::Second, None), true),
1201            Field::new(
1202                "ts_millis",
1203                DataType::Timestamp(TimeUnit::Millisecond, None),
1204                true,
1205            ),
1206            Field::new(
1207                "ts_micros",
1208                DataType::Timestamp(TimeUnit::Microsecond, None),
1209                true,
1210            ),
1211            Field::new(
1212                "ts_nanos",
1213                DataType::Timestamp(TimeUnit::Nanosecond, None),
1214                true,
1215            ),
1216            Field::new(
1217                "ts_secs_tz",
1218                DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
1219                true,
1220            ),
1221            Field::new(
1222                "ts_millis_tz",
1223                DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
1224                true,
1225            ),
1226            Field::new(
1227                "ts_micros_tz",
1228                DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
1229                true,
1230            ),
1231            Field::new(
1232                "ts_nanos_tz",
1233                DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
1234                true,
1235            ),
1236            Field::new("utf8s", DataType::Utf8, true),
1237            Field::new(
1238                "lists",
1239                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1240                true,
1241            ),
1242            Field::new(
1243                "structs",
1244                DataType::Struct(Fields::from(vec![
1245                    Field::new("int32s", DataType::Int32, true),
1246                    Field::new("utf8s", DataType::Utf8, true),
1247                ])),
1248                true,
1249            ),
1250        ]);
1251
1252        let bools_with_metadata_map = BooleanArray::from(vec![Some(true), None, Some(false)]);
1253        let bools_with_metadata_vec = BooleanArray::from(vec![Some(true), None, Some(false)]);
1254        let bools = BooleanArray::from(vec![Some(true), None, Some(false)]);
1255        let int8s = Int8Array::from(vec![Some(1), None, Some(3)]);
1256        let int16s = Int16Array::from(vec![Some(1), None, Some(3)]);
1257        let int32s = Int32Array::from(vec![Some(1), None, Some(3)]);
1258        let int64s = Int64Array::from(vec![Some(1), None, Some(3)]);
1259        let uint8s = UInt8Array::from(vec![Some(1), None, Some(3)]);
1260        let uint16s = UInt16Array::from(vec![Some(1), None, Some(3)]);
1261        let uint32s = UInt32Array::from(vec![Some(1), None, Some(3)]);
1262        let uint64s = UInt64Array::from(vec![Some(1), None, Some(3)]);
1263        let float32s = Float32Array::from(vec![Some(1.0), None, Some(3.0)]);
1264        let float64s = Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1265        let date_days = Date32Array::from(vec![Some(1196848), None, None]);
1266        let date_millis = Date64Array::from(vec![
1267            Some(167903550396207),
1268            Some(29923997007884),
1269            Some(30612271819236),
1270        ]);
1271        let time_secs = Time32SecondArray::from(vec![Some(27974), Some(78592), Some(43207)]);
1272        let time_millis =
1273            Time32MillisecondArray::from(vec![Some(6613125), Some(74667230), Some(52260079)]);
1274        let time_micros = Time64MicrosecondArray::from(vec![Some(62522958593), None, None]);
1275        let time_nanos =
1276            Time64NanosecondArray::from(vec![Some(73380123595985), None, Some(16584393546415)]);
1277        let ts_secs = TimestampSecondArray::from(vec![None, Some(193438817552), None]);
1278        let ts_millis =
1279            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)]);
1280        let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
1281        let ts_nanos = TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
1282        let ts_secs_tz = TimestampSecondArray::from(vec![None, Some(193438817552), None])
1283            .with_timezone_opt(secs_tz);
1284        let ts_millis_tz =
1285            TimestampMillisecondArray::from(vec![None, Some(38606916383008), Some(58113709376587)])
1286                .with_timezone_opt(millis_tz);
1287        let ts_micros_tz =
1288            TimestampMicrosecondArray::from(vec![None, None, None]).with_timezone_opt(micros_tz);
1289        let ts_nanos_tz =
1290            TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)])
1291                .with_timezone_opt(nanos_tz);
1292        let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]);
1293
1294        let value_data = Int32Array::from(vec![None, Some(2), None, None]);
1295        let value_offsets = Buffer::from_slice_ref([0, 3, 4, 4]);
1296        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1297        let list_data = ArrayData::builder(list_data_type)
1298            .len(3)
1299            .add_buffer(value_offsets)
1300            .add_child_data(value_data.into_data())
1301            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1302            .build()
1303            .unwrap();
1304        let lists = ListArray::from(list_data);
1305
1306        let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
1307        let structs_utf8s = StringArray::from(vec![None, None, Some("aaaaaa")]);
1308        let struct_data_type = DataType::Struct(Fields::from(vec![
1309            Field::new("int32s", DataType::Int32, true),
1310            Field::new("utf8s", DataType::Utf8, true),
1311        ]));
1312        let struct_data = ArrayData::builder(struct_data_type)
1313            .len(3)
1314            .add_child_data(structs_int32s.into_data())
1315            .add_child_data(structs_utf8s.into_data())
1316            .null_bit_buffer(Some(Buffer::from([0b00000011])))
1317            .build()
1318            .unwrap();
1319        let structs = StructArray::from(struct_data);
1320
1321        let record_batch = RecordBatch::try_new(
1322            Arc::new(schema.clone()),
1323            vec![
1324                Arc::new(bools_with_metadata_map),
1325                Arc::new(bools_with_metadata_vec),
1326                Arc::new(bools),
1327                Arc::new(int8s),
1328                Arc::new(int16s),
1329                Arc::new(int32s),
1330                Arc::new(int64s),
1331                Arc::new(uint8s),
1332                Arc::new(uint16s),
1333                Arc::new(uint32s),
1334                Arc::new(uint64s),
1335                Arc::new(float32s),
1336                Arc::new(float64s),
1337                Arc::new(date_days),
1338                Arc::new(date_millis),
1339                Arc::new(time_secs),
1340                Arc::new(time_millis),
1341                Arc::new(time_micros),
1342                Arc::new(time_nanos),
1343                Arc::new(ts_secs),
1344                Arc::new(ts_millis),
1345                Arc::new(ts_micros),
1346                Arc::new(ts_nanos),
1347                Arc::new(ts_secs_tz),
1348                Arc::new(ts_millis_tz),
1349                Arc::new(ts_micros_tz),
1350                Arc::new(ts_nanos_tz),
1351                Arc::new(utf8s),
1352                Arc::new(lists),
1353                Arc::new(structs),
1354            ],
1355        )
1356        .unwrap();
1357        let mut file = File::open("data/integration.json").unwrap();
1358        let mut json = String::new();
1359        file.read_to_string(&mut json).unwrap();
1360        let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
1361        // test schemas
1362        assert!(arrow_json.schema.equals_schema(&schema));
1363        // test record batch
1364        assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
1365    }
1366}