arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use crate::StructMode;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use chrono::Utc;
141use serde_core::Serialize;
142
143use arrow_array::timezone::Tz;
144use arrow_array::types::*;
145use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
146use arrow_data::ArrayData;
147use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
148pub use schema::*;
149
150use crate::reader::boolean_array::BooleanArrayDecoder;
151use crate::reader::decimal_array::DecimalArrayDecoder;
152use crate::reader::list_array::ListArrayDecoder;
153use crate::reader::map_array::MapArrayDecoder;
154use crate::reader::null_array::NullArrayDecoder;
155use crate::reader::primitive_array::PrimitiveArrayDecoder;
156use crate::reader::string_array::StringArrayDecoder;
157use crate::reader::string_view_array::StringViewArrayDecoder;
158use crate::reader::struct_array::StructArrayDecoder;
159use crate::reader::tape::{Tape, TapeDecoder};
160use crate::reader::timestamp_array::TimestampArrayDecoder;
161
162mod boolean_array;
163mod decimal_array;
164mod list_array;
165mod map_array;
166mod null_array;
167mod primitive_array;
168mod schema;
169mod serializer;
170mod string_array;
171mod string_view_array;
172mod struct_array;
173mod tape;
174mod timestamp_array;
175
176/// A builder for [`Reader`] and [`Decoder`]
177pub struct ReaderBuilder {
178    batch_size: usize,
179    coerce_primitive: bool,
180    strict_mode: bool,
181    is_field: bool,
182    struct_mode: StructMode,
183
184    schema: SchemaRef,
185}
186
187impl ReaderBuilder {
188    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
189    ///
190    /// This could be obtained using [`infer_json_schema`] if not known
191    ///
192    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
193    /// In this case, an error is returned when a column is missing from `schema`.
194    ///
195    /// [`infer_json_schema`]: crate::reader::infer_json_schema
196    pub fn new(schema: SchemaRef) -> Self {
197        Self {
198            batch_size: 1024,
199            coerce_primitive: false,
200            strict_mode: false,
201            is_field: false,
202            struct_mode: Default::default(),
203            schema,
204        }
205    }
206
207    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
208    ///
209    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
210    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
211    ///
212    /// ```
213    /// # use std::sync::Arc;
214    /// # use arrow_array::cast::AsArray;
215    /// # use arrow_array::types::Int32Type;
216    /// # use arrow_json::ReaderBuilder;
217    /// # use arrow_schema::{DataType, Field};
218    /// // Root of JSON schema is a numeric type
219    /// let data = "1\n2\n3\n";
220    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
221    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
222    /// let b = reader.next().unwrap().unwrap();
223    /// let values = b.column(0).as_primitive::<Int32Type>().values();
224    /// assert_eq!(values, &[1, 2, 3]);
225    ///
226    /// // Root of JSON schema is a list type
227    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
228    /// let field = Field::new_list("int", field.clone(), true);
229    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
230    /// let b = reader.next().unwrap().unwrap();
231    /// let list = b.column(0).as_list::<i32>();
232    ///
233    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
234    /// let list_values = list.values().as_primitive::<Int32Type>();
235    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
236    /// ```
237    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
238        Self {
239            batch_size: 1024,
240            coerce_primitive: false,
241            strict_mode: false,
242            is_field: true,
243            struct_mode: Default::default(),
244            schema: Arc::new(Schema::new([field.into()])),
245        }
246    }
247
248    /// Sets the batch size in rows to read
249    pub fn with_batch_size(self, batch_size: usize) -> Self {
250        Self { batch_size, ..self }
251    }
252
253    /// Sets if the decoder should coerce primitive values (bool and number) into string
254    /// when the Schema's column is Utf8 or LargeUtf8.
255    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
256        Self {
257            coerce_primitive,
258            ..self
259        }
260    }
261
262    /// Sets if the decoder should return an error if it encounters a column not
263    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
264    /// `strict_mode` is effectively `true`. It is required for all fields of
265    /// the struct to be in the list: without field names, there is no way to
266    /// determine which field is missing.
267    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
268        Self {
269            strict_mode,
270            ..self
271        }
272    }
273
274    /// Set the [`StructMode`] for the reader, which determines whether structs
275    /// can be decoded from JSON as objects or lists. For more details refer to
276    /// the enum documentation. Default is to use `ObjectOnly`.
277    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
278        Self {
279            struct_mode,
280            ..self
281        }
282    }
283
284    /// Create a [`Reader`] with the provided [`BufRead`]
285    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
286        Ok(Reader {
287            reader,
288            decoder: self.build_decoder()?,
289        })
290    }
291
292    /// Create a [`Decoder`]
293    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
294        let (data_type, nullable) = match self.is_field {
295            false => (DataType::Struct(self.schema.fields.clone()), false),
296            true => {
297                let field = &self.schema.fields[0];
298                (field.data_type().clone(), field.is_nullable())
299            }
300        };
301
302        let decoder = make_decoder(
303            data_type,
304            self.coerce_primitive,
305            self.strict_mode,
306            nullable,
307            self.struct_mode,
308        )?;
309
310        let num_fields = self.schema.flattened_fields().len();
311
312        Ok(Decoder {
313            decoder,
314            is_field: self.is_field,
315            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
316            batch_size: self.batch_size,
317            schema: self.schema,
318        })
319    }
320}
321
322/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
323///
324/// Lines consisting solely of ASCII whitespace are ignored
325pub struct Reader<R> {
326    reader: R,
327    decoder: Decoder,
328}
329
330impl<R> std::fmt::Debug for Reader<R> {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        f.debug_struct("Reader")
333            .field("decoder", &self.decoder)
334            .finish()
335    }
336}
337
338impl<R: BufRead> Reader<R> {
339    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
340    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
341        loop {
342            let buf = self.reader.fill_buf()?;
343            if buf.is_empty() {
344                break;
345            }
346            let read = buf.len();
347
348            let decoded = self.decoder.decode(buf)?;
349            self.reader.consume(decoded);
350            if decoded != read {
351                break;
352            }
353        }
354        self.decoder.flush()
355    }
356}
357
358impl<R: BufRead> Iterator for Reader<R> {
359    type Item = Result<RecordBatch, ArrowError>;
360
361    fn next(&mut self) -> Option<Self::Item> {
362        self.read().transpose()
363    }
364}
365
366impl<R: BufRead> RecordBatchReader for Reader<R> {
367    fn schema(&self) -> SchemaRef {
368        self.decoder.schema.clone()
369    }
370}
371
372/// A low-level interface for reading JSON data from a byte stream
373///
374/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
375///
376/// The push-based interface facilitates integration with sources that yield arbitrarily
377/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
378/// object storage
379///
380/// ```
381/// # use std::io::BufRead;
382/// # use arrow_array::RecordBatch;
383/// # use arrow_json::reader::{Decoder, ReaderBuilder};
384/// # use arrow_schema::{ArrowError, SchemaRef};
385/// #
386/// fn read_from_json<R: BufRead>(
387///     mut reader: R,
388///     schema: SchemaRef,
389/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
390///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
391///     let mut next = move || {
392///         loop {
393///             // Decoder is agnostic that buf doesn't contain whole records
394///             let buf = reader.fill_buf()?;
395///             if buf.is_empty() {
396///                 break; // Input exhausted
397///             }
398///             let read = buf.len();
399///             let decoded = decoder.decode(buf)?;
400///
401///             // Consume the number of bytes read
402///             reader.consume(decoded);
403///             if decoded != read {
404///                 break; // Read batch size
405///             }
406///         }
407///         decoder.flush()
408///     };
409///     Ok(std::iter::from_fn(move || next().transpose()))
410/// }
411/// ```
412pub struct Decoder {
413    tape_decoder: TapeDecoder,
414    decoder: Box<dyn ArrayDecoder>,
415    batch_size: usize,
416    is_field: bool,
417    schema: SchemaRef,
418}
419
420impl std::fmt::Debug for Decoder {
421    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422        f.debug_struct("Decoder")
423            .field("schema", &self.schema)
424            .field("batch_size", &self.batch_size)
425            .finish()
426    }
427}
428
429impl Decoder {
430    /// Read JSON objects from `buf`, returning the number of bytes read
431    ///
432    /// This method returns once `batch_size` objects have been parsed since the
433    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
434    /// should be included in the next call to [`Self::decode`]
435    ///
436    /// There is no requirement that `buf` contains a whole number of records, facilitating
437    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
438    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
439        self.tape_decoder.decode(buf)
440    }
441
442    /// Serialize `rows` to this [`Decoder`]
443    ///
444    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
445    /// [`RecordBatch`].
446    ///
447    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
448    /// especially where the schema is known at compile-time, however, this provides a mechanism
449    /// to get something up and running quickly
450    ///
451    /// It can be used with [`serde_json::Value`]
452    ///
453    /// ```
454    /// # use std::sync::Arc;
455    /// # use serde_json::{Value, json};
456    /// # use arrow_array::cast::AsArray;
457    /// # use arrow_array::types::Float32Type;
458    /// # use arrow_json::ReaderBuilder;
459    /// # use arrow_schema::{DataType, Field, Schema};
460    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
461    ///
462    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
463    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
464    ///
465    /// decoder.serialize(&json).unwrap();
466    /// let batch = decoder.flush().unwrap().unwrap();
467    /// assert_eq!(batch.num_rows(), 2);
468    /// assert_eq!(batch.num_columns(), 1);
469    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
470    /// assert_eq!(values, &[2.3, 5.7])
471    /// ```
472    ///
473    /// Or with arbitrary [`Serialize`] types
474    ///
475    /// ```
476    /// # use std::sync::Arc;
477    /// # use arrow_json::ReaderBuilder;
478    /// # use arrow_schema::{DataType, Field, Schema};
479    /// # use serde::Serialize;
480    /// # use arrow_array::cast::AsArray;
481    /// # use arrow_array::types::{Float32Type, Int32Type};
482    /// #
483    /// #[derive(Serialize)]
484    /// struct MyStruct {
485    ///     int32: i32,
486    ///     float: f32,
487    /// }
488    ///
489    /// let schema = Schema::new(vec![
490    ///     Field::new("int32", DataType::Int32, false),
491    ///     Field::new("float", DataType::Float32, false),
492    /// ]);
493    ///
494    /// let rows = vec![
495    ///     MyStruct{ int32: 0, float: 3. },
496    ///     MyStruct{ int32: 4, float: 67.53 },
497    /// ];
498    ///
499    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
500    /// decoder.serialize(&rows).unwrap();
501    ///
502    /// let batch = decoder.flush().unwrap().unwrap();
503    ///
504    /// // Expect batch containing two columns
505    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
506    /// assert_eq!(int32.values(), &[0, 4]);
507    ///
508    /// let float = batch.column(1).as_primitive::<Float32Type>();
509    /// assert_eq!(float.values(), &[3., 67.53]);
510    /// ```
511    ///
512    /// Or even complex nested types
513    ///
514    /// ```
515    /// # use std::collections::BTreeMap;
516    /// # use std::sync::Arc;
517    /// # use arrow_array::StructArray;
518    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
519    /// # use arrow_json::ReaderBuilder;
520    /// # use arrow_schema::{DataType, Field, Fields, Schema};
521    /// # use serde::Serialize;
522    /// #
523    /// #[derive(Serialize)]
524    /// struct MyStruct {
525    ///     int32: i32,
526    ///     list: Vec<f64>,
527    ///     nested: Vec<Option<Nested>>,
528    /// }
529    ///
530    /// impl MyStruct {
531    ///     /// Returns the [`Fields`] for [`MyStruct`]
532    ///     fn fields() -> Fields {
533    ///         let nested = DataType::Struct(Nested::fields());
534    ///         Fields::from([
535    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
536    ///             Arc::new(Field::new_list(
537    ///                 "list",
538    ///                 Field::new("element", DataType::Float64, false),
539    ///                 false,
540    ///             )),
541    ///             Arc::new(Field::new_list(
542    ///                 "nested",
543    ///                 Field::new("element", nested, true),
544    ///                 true,
545    ///             )),
546    ///         ])
547    ///     }
548    /// }
549    ///
550    /// #[derive(Serialize)]
551    /// struct Nested {
552    ///     map: BTreeMap<String, Vec<String>>
553    /// }
554    ///
555    /// impl Nested {
556    ///     /// Returns the [`Fields`] for [`Nested`]
557    ///     fn fields() -> Fields {
558    ///         let element = Field::new("element", DataType::Utf8, false);
559    ///         Fields::from([
560    ///             Arc::new(Field::new_map(
561    ///                 "map",
562    ///                 "entries",
563    ///                 Field::new("key", DataType::Utf8, false),
564    ///                 Field::new_list("value", element, false),
565    ///                 false, // sorted
566    ///                 false, // nullable
567    ///             ))
568    ///         ])
569    ///     }
570    /// }
571    ///
572    /// let data = vec![
573    ///     MyStruct {
574    ///         int32: 34,
575    ///         list: vec![1., 2., 34.],
576    ///         nested: vec![
577    ///             None,
578    ///             Some(Nested {
579    ///                 map: vec![
580    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
581    ///                     ("key2".to_string(), vec!["baz".to_string()])
582    ///                 ].into_iter().collect()
583    ///             })
584    ///         ]
585    ///     },
586    ///     MyStruct {
587    ///         int32: 56,
588    ///         list: vec![],
589    ///         nested: vec![]
590    ///     },
591    ///     MyStruct {
592    ///         int32: 24,
593    ///         list: vec![-1., 245.],
594    ///         nested: vec![None]
595    ///     }
596    /// ];
597    ///
598    /// let schema = Schema::new(MyStruct::fields());
599    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
600    /// decoder.serialize(&data).unwrap();
601    /// let batch = decoder.flush().unwrap().unwrap();
602    /// assert_eq!(batch.num_rows(), 3);
603    /// assert_eq!(batch.num_columns(), 3);
604    ///
605    /// // Convert to StructArray to format
606    /// let s = StructArray::from(batch);
607    /// let options = FormatOptions::default().with_null("null");
608    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
609    ///
610    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
611    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
612    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
613    /// ```
614    ///
615    /// Note: this ignores any batch size setting, and always decodes all rows
616    ///
617    /// [serde]: https://docs.rs/serde/latest/serde/
618    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
619        self.tape_decoder.serialize(rows)
620    }
621
622    /// True if the decoder is currently part way through decoding a record.
623    pub fn has_partial_record(&self) -> bool {
624        self.tape_decoder.has_partial_row()
625    }
626
627    /// The number of unflushed records, including the partially decoded record (if any).
628    pub fn len(&self) -> usize {
629        self.tape_decoder.num_buffered_rows()
630    }
631
632    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
633    pub fn is_empty(&self) -> bool {
634        self.len() == 0
635    }
636
637    /// Flushes the currently buffered data to a [`RecordBatch`]
638    ///
639    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
640    ///
641    /// Note: This will return an error if called part way through decoding a record,
642    /// i.e. [`Self::has_partial_record`] is true.
643    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
644        let tape = self.tape_decoder.finish()?;
645
646        if tape.num_rows() == 0 {
647            return Ok(None);
648        }
649
650        // First offset is null sentinel
651        let mut next_object = 1;
652        let pos: Vec<_> = (0..tape.num_rows())
653            .map(|_| {
654                let next = tape.next(next_object, "row").unwrap();
655                std::mem::replace(&mut next_object, next)
656            })
657            .collect();
658
659        let decoded = self.decoder.decode(&tape, &pos)?;
660        self.tape_decoder.clear();
661
662        let batch = match self.is_field {
663            true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
664            false => {
665                RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
666            }
667        };
668
669        Ok(Some(batch))
670    }
671}
672
673trait ArrayDecoder: Send {
674    /// Decode elements from `tape` starting at the indexes contained in `pos`
675    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
676}
677
678macro_rules! primitive_decoder {
679    ($t:ty, $data_type:expr) => {
680        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
681    };
682}
683
684fn make_decoder(
685    data_type: DataType,
686    coerce_primitive: bool,
687    strict_mode: bool,
688    is_nullable: bool,
689    struct_mode: StructMode,
690) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
691    downcast_integer! {
692        data_type => (primitive_decoder, data_type),
693        DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
694        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
695        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
696        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
697        DataType::Timestamp(TimeUnit::Second, None) => {
698            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
699        },
700        DataType::Timestamp(TimeUnit::Millisecond, None) => {
701            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
702        },
703        DataType::Timestamp(TimeUnit::Microsecond, None) => {
704            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
705        },
706        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
707            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
708        },
709        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
710            let tz: Tz = tz.parse()?;
711            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
712        },
713        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
714            let tz: Tz = tz.parse()?;
715            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
716        },
717        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
718            let tz: Tz = tz.parse()?;
719            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
720        },
721        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
722            let tz: Tz = tz.parse()?;
723            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
724        },
725        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
726        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
727        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
728        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
729        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
730        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
731        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
732        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
733        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
734        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
735        DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
736        DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
737        DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
738        DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
739        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
740        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
741        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
742        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
743        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
744        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
745        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
746        DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
747            Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
748        }
749        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
750        d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use serde_json::json;
757    use std::fs::File;
758    use std::io::{BufReader, Cursor, Seek};
759
760    use arrow_array::cast::AsArray;
761    use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
762    use arrow_buffer::{ArrowNativeType, Buffer};
763    use arrow_cast::display::{ArrayFormatter, FormatOptions};
764    use arrow_data::ArrayDataBuilder;
765    use arrow_schema::{Field, Fields};
766
767    use super::*;
768
769    fn do_read(
770        buf: &str,
771        batch_size: usize,
772        coerce_primitive: bool,
773        strict_mode: bool,
774        schema: SchemaRef,
775    ) -> Vec<RecordBatch> {
776        let mut unbuffered = vec![];
777
778        // Test with different batch sizes to test for boundary conditions
779        for batch_size in [1, 3, 100, batch_size] {
780            unbuffered = ReaderBuilder::new(schema.clone())
781                .with_batch_size(batch_size)
782                .with_coerce_primitive(coerce_primitive)
783                .build(Cursor::new(buf.as_bytes()))
784                .unwrap()
785                .collect::<Result<Vec<_>, _>>()
786                .unwrap();
787
788            for b in unbuffered.iter().take(unbuffered.len() - 1) {
789                assert_eq!(b.num_rows(), batch_size)
790            }
791
792            // Test with different buffer sizes to test for boundary conditions
793            for b in [1, 3, 5] {
794                let buffered = ReaderBuilder::new(schema.clone())
795                    .with_batch_size(batch_size)
796                    .with_coerce_primitive(coerce_primitive)
797                    .with_strict_mode(strict_mode)
798                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
799                    .unwrap()
800                    .collect::<Result<Vec<_>, _>>()
801                    .unwrap();
802                assert_eq!(unbuffered, buffered);
803            }
804        }
805
806        unbuffered
807    }
808
809    #[test]
810    fn test_basic() {
811        let buf = r#"
812        {"a": 1, "b": 2, "c": true, "d": 1}
813        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
814
815        {"b": 6, "a": 2.0, "d": 45}
816        {"b": "5", "a": 2}
817        {"b": 4e0}
818        {"b": 7, "a": null}
819        "#;
820
821        let schema = Arc::new(Schema::new(vec![
822            Field::new("a", DataType::Int64, true),
823            Field::new("b", DataType::Int32, true),
824            Field::new("c", DataType::Boolean, true),
825            Field::new("d", DataType::Date32, true),
826            Field::new("e", DataType::Date64, true),
827        ]));
828
829        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
830        assert!(decoder.is_empty());
831        assert_eq!(decoder.len(), 0);
832        assert!(!decoder.has_partial_record());
833        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
834        assert!(!decoder.is_empty());
835        assert_eq!(decoder.len(), 6);
836        assert!(!decoder.has_partial_record());
837        let batch = decoder.flush().unwrap().unwrap();
838        assert_eq!(batch.num_rows(), 6);
839        assert!(decoder.is_empty());
840        assert_eq!(decoder.len(), 0);
841        assert!(!decoder.has_partial_record());
842
843        let batches = do_read(buf, 1024, false, false, schema);
844        assert_eq!(batches.len(), 1);
845
846        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
847        assert_eq!(col1.null_count(), 2);
848        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
849        assert!(col1.is_null(4));
850        assert!(col1.is_null(5));
851
852        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
853        assert_eq!(col2.null_count(), 0);
854        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
855
856        let col3 = batches[0].column(2).as_boolean();
857        assert_eq!(col3.null_count(), 4);
858        assert!(col3.value(0));
859        assert!(!col3.is_null(0));
860        assert!(!col3.value(1));
861        assert!(!col3.is_null(1));
862
863        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
864        assert_eq!(col4.null_count(), 3);
865        assert!(col4.is_null(3));
866        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
867
868        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
869        assert_eq!(col5.null_count(), 5);
870        assert!(col5.is_null(0));
871        assert!(col5.is_null(2));
872        assert!(col5.is_null(3));
873        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
874    }
875
876    #[test]
877    fn test_string() {
878        let buf = r#"
879        {"a": "1", "b": "2"}
880        {"a": "hello", "b": "shoo"}
881        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
882
883        {"b": null}
884        {"b": "", "a": null}
885
886        "#;
887        let schema = Arc::new(Schema::new(vec![
888            Field::new("a", DataType::Utf8, true),
889            Field::new("b", DataType::LargeUtf8, true),
890        ]));
891
892        let batches = do_read(buf, 1024, false, false, schema);
893        assert_eq!(batches.len(), 1);
894
895        let col1 = batches[0].column(0).as_string::<i32>();
896        assert_eq!(col1.null_count(), 2);
897        assert_eq!(col1.value(0), "1");
898        assert_eq!(col1.value(1), "hello");
899        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
900        assert!(col1.is_null(3));
901        assert!(col1.is_null(4));
902
903        let col2 = batches[0].column(1).as_string::<i64>();
904        assert_eq!(col2.null_count(), 1);
905        assert_eq!(col2.value(0), "2");
906        assert_eq!(col2.value(1), "shoo");
907        assert_eq!(col2.value(2), "\t😁foo");
908        assert!(col2.is_null(3));
909        assert_eq!(col2.value(4), "");
910    }
911
912    #[test]
913    fn test_long_string_view_allocation() {
914        // The JSON input contains field "a" with different string lengths.
915        // According to the implementation in the decoder:
916        // - For a string, capacity is only increased if its length > 12 bytes.
917        // Therefore, for:
918        // Row 1: "short" (5 bytes) -> capacity += 0
919        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
920        // Row 3: "hello" (5 bytes) -> capacity += 0
921        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
922        // Expected total capacity = 24 + 17 = 41
923        let expected_capacity: usize = 41;
924
925        let buf = r#"
926        {"a": "short", "b": "dummy"}
927        {"a": "this is definitely long", "b": "dummy"}
928        {"a": "hello", "b": "dummy"}
929        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
930        "#;
931
932        let schema = Arc::new(Schema::new(vec![
933            Field::new("a", DataType::Utf8View, true),
934            Field::new("b", DataType::LargeUtf8, true),
935        ]));
936
937        let batches = do_read(buf, 1024, false, false, schema);
938        assert_eq!(batches.len(), 1, "Expected one record batch");
939
940        // Get the first column ("a") as a StringViewArray.
941        let col_a = batches[0].column(0);
942        let string_view_array = col_a
943            .as_any()
944            .downcast_ref::<StringViewArray>()
945            .expect("Column should be a StringViewArray");
946
947        // Retrieve the underlying data buffer from the array.
948        // The builder pre-allocates capacity based on the sum of lengths for long strings.
949        let data_buffer = string_view_array.to_data().buffers()[0].len();
950
951        // Check that the allocated capacity is at least what we expected.
952        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
953        assert!(
954            data_buffer >= expected_capacity,
955            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
956        );
957
958        // Additionally, verify that the decoded values are correct.
959        assert_eq!(string_view_array.value(0), "short");
960        assert_eq!(string_view_array.value(1), "this is definitely long");
961        assert_eq!(string_view_array.value(2), "hello");
962        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
963    }
964
965    /// Test the memory capacity allocation logic when converting numeric types to strings.
966    #[test]
967    fn test_numeric_view_allocation() {
968        // For numeric types, the expected capacity calculation is as follows:
969        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
970        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
971        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
972        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
973        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
974        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
975        let expected_capacity: usize = 33;
976
977        let buf = r#"
978    {"n": 123456789}
979    {"n": 1000000000000}
980    {"n": 3.1415}
981    {"n": 2.718281828459045}
982    "#;
983
984        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
985
986        let batches = do_read(buf, 1024, true, false, schema);
987        assert_eq!(batches.len(), 1, "Expected one record batch");
988
989        let col_n = batches[0].column(0);
990        let string_view_array = col_n
991            .as_any()
992            .downcast_ref::<StringViewArray>()
993            .expect("Column should be a StringViewArray");
994
995        // Check that the underlying data buffer capacity is at least the expected value.
996        let data_buffer = string_view_array.to_data().buffers()[0].len();
997        assert!(
998            data_buffer >= expected_capacity,
999            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1000        );
1001
1002        // Verify that the converted string values are correct.
1003        // Note: The format of the number converted to a string should match the actual implementation.
1004        assert_eq!(string_view_array.value(0), "123456789");
1005        assert_eq!(string_view_array.value(1), "1000000000000");
1006        assert_eq!(string_view_array.value(2), "3.1415");
1007        assert_eq!(string_view_array.value(3), "2.718281828459045");
1008    }
1009
1010    #[test]
1011    fn test_string_with_uft8view() {
1012        let buf = r#"
1013        {"a": "1", "b": "2"}
1014        {"a": "hello", "b": "shoo"}
1015        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1016
1017        {"b": null}
1018        {"b": "", "a": null}
1019
1020        "#;
1021        let schema = Arc::new(Schema::new(vec![
1022            Field::new("a", DataType::Utf8View, true),
1023            Field::new("b", DataType::LargeUtf8, true),
1024        ]));
1025
1026        let batches = do_read(buf, 1024, false, false, schema);
1027        assert_eq!(batches.len(), 1);
1028
1029        let col1 = batches[0].column(0).as_string_view();
1030        assert_eq!(col1.null_count(), 2);
1031        assert_eq!(col1.value(0), "1");
1032        assert_eq!(col1.value(1), "hello");
1033        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1034        assert!(col1.is_null(3));
1035        assert!(col1.is_null(4));
1036        assert_eq!(col1.data_type(), &DataType::Utf8View);
1037
1038        let col2 = batches[0].column(1).as_string::<i64>();
1039        assert_eq!(col2.null_count(), 1);
1040        assert_eq!(col2.value(0), "2");
1041        assert_eq!(col2.value(1), "shoo");
1042        assert_eq!(col2.value(2), "\t😁foo");
1043        assert!(col2.is_null(3));
1044        assert_eq!(col2.value(4), "");
1045    }
1046
1047    #[test]
1048    fn test_complex() {
1049        let buf = r#"
1050           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1051           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1052           {"list": null, "nested": {"a": null}}
1053        "#;
1054
1055        let schema = Arc::new(Schema::new(vec![
1056            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1057            Field::new_struct(
1058                "nested",
1059                vec![
1060                    Field::new("a", DataType::Int32, true),
1061                    Field::new("b", DataType::Int32, true),
1062                ],
1063                true,
1064            ),
1065            Field::new_struct(
1066                "nested_list",
1067                vec![Field::new_list(
1068                    "list2",
1069                    Field::new_struct(
1070                        "element",
1071                        vec![Field::new("c", DataType::Int32, false)],
1072                        false,
1073                    ),
1074                    true,
1075                )],
1076                true,
1077            ),
1078        ]));
1079
1080        let batches = do_read(buf, 1024, false, false, schema);
1081        assert_eq!(batches.len(), 1);
1082
1083        let list = batches[0].column(0).as_list::<i32>();
1084        assert_eq!(list.len(), 3);
1085        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1086        assert_eq!(list.null_count(), 1);
1087        assert!(list.is_null(2));
1088        let list_values = list.values().as_primitive::<Int32Type>();
1089        assert_eq!(list_values.values(), &[5, 6]);
1090
1091        let nested = batches[0].column(1).as_struct();
1092        let a = nested.column(0).as_primitive::<Int32Type>();
1093        assert_eq!(list.null_count(), 1);
1094        assert_eq!(a.values(), &[1, 7, 0]);
1095        assert!(list.is_null(2));
1096
1097        let b = nested.column(1).as_primitive::<Int32Type>();
1098        assert_eq!(b.null_count(), 2);
1099        assert_eq!(b.len(), 3);
1100        assert_eq!(b.value(0), 2);
1101        assert!(b.is_null(1));
1102        assert!(b.is_null(2));
1103
1104        let nested_list = batches[0].column(2).as_struct();
1105        assert_eq!(nested_list.len(), 3);
1106        assert_eq!(nested_list.null_count(), 1);
1107        assert!(nested_list.is_null(2));
1108
1109        let list2 = nested_list.column(0).as_list::<i32>();
1110        assert_eq!(list2.len(), 3);
1111        assert_eq!(list2.null_count(), 1);
1112        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1113        assert!(list2.is_null(2));
1114
1115        let list2_values = list2.values().as_struct();
1116
1117        let c = list2_values.column(0).as_primitive::<Int32Type>();
1118        assert_eq!(c.values(), &[3, 4]);
1119    }
1120
1121    #[test]
1122    fn test_projection() {
1123        let buf = r#"
1124           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1125           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1126        "#;
1127
1128        let schema = Arc::new(Schema::new(vec![
1129            Field::new_struct(
1130                "nested",
1131                vec![Field::new("a", DataType::Int32, false)],
1132                true,
1133            ),
1134            Field::new_struct(
1135                "nested_list",
1136                vec![Field::new_list(
1137                    "list2",
1138                    Field::new_struct(
1139                        "element",
1140                        vec![Field::new("d", DataType::Int32, true)],
1141                        false,
1142                    ),
1143                    true,
1144                )],
1145                true,
1146            ),
1147        ]));
1148
1149        let batches = do_read(buf, 1024, false, false, schema);
1150        assert_eq!(batches.len(), 1);
1151
1152        let nested = batches[0].column(0).as_struct();
1153        assert_eq!(nested.num_columns(), 1);
1154        let a = nested.column(0).as_primitive::<Int32Type>();
1155        assert_eq!(a.null_count(), 0);
1156        assert_eq!(a.values(), &[1, 7]);
1157
1158        let nested_list = batches[0].column(1).as_struct();
1159        assert_eq!(nested_list.num_columns(), 1);
1160        assert_eq!(nested_list.null_count(), 0);
1161
1162        let list2 = nested_list.column(0).as_list::<i32>();
1163        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1164        assert_eq!(list2.null_count(), 0);
1165
1166        let child = list2.values().as_struct();
1167        assert_eq!(child.num_columns(), 1);
1168        assert_eq!(child.len(), 2);
1169        assert_eq!(child.null_count(), 0);
1170
1171        let c = child.column(0).as_primitive::<Int32Type>();
1172        assert_eq!(c.values(), &[5, 0]);
1173        assert_eq!(c.null_count(), 1);
1174        assert!(c.is_null(1));
1175    }
1176
1177    #[test]
1178    fn test_map() {
1179        let buf = r#"
1180           {"map": {"a": ["foo", null]}}
1181           {"map": {"a": [null], "b": []}}
1182           {"map": {"c": null, "a": ["baz"]}}
1183        "#;
1184        let map = Field::new_map(
1185            "map",
1186            "entries",
1187            Field::new("key", DataType::Utf8, false),
1188            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1189            false,
1190            true,
1191        );
1192
1193        let schema = Arc::new(Schema::new(vec![map]));
1194
1195        let batches = do_read(buf, 1024, false, false, schema);
1196        assert_eq!(batches.len(), 1);
1197
1198        let map = batches[0].column(0).as_map();
1199        let map_keys = map.keys().as_string::<i32>();
1200        let map_values = map.values().as_list::<i32>();
1201        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1202
1203        let k: Vec<_> = map_keys.iter().flatten().collect();
1204        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1205
1206        let list_values = map_values.values().as_string::<i32>();
1207        let lv: Vec<_> = list_values.iter().collect();
1208        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1209        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1210        assert_eq!(map_values.null_count(), 1);
1211        assert!(map_values.is_null(3));
1212
1213        let options = FormatOptions::default().with_null("null");
1214        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1215        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1216        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1217        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1218    }
1219
1220    #[test]
1221    fn test_not_coercing_primitive_into_string_without_flag() {
1222        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1223
1224        let buf = r#"{"a": 1}"#;
1225        let err = ReaderBuilder::new(schema.clone())
1226            .with_batch_size(1024)
1227            .build(Cursor::new(buf.as_bytes()))
1228            .unwrap()
1229            .read()
1230            .unwrap_err();
1231
1232        assert_eq!(
1233            err.to_string(),
1234            "Json error: whilst decoding field 'a': expected string got 1"
1235        );
1236
1237        let buf = r#"{"a": true}"#;
1238        let err = ReaderBuilder::new(schema)
1239            .with_batch_size(1024)
1240            .build(Cursor::new(buf.as_bytes()))
1241            .unwrap()
1242            .read()
1243            .unwrap_err();
1244
1245        assert_eq!(
1246            err.to_string(),
1247            "Json error: whilst decoding field 'a': expected string got true"
1248        );
1249    }
1250
1251    #[test]
1252    fn test_coercing_primitive_into_string() {
1253        let buf = r#"
1254        {"a": 1, "b": 2, "c": true}
1255        {"a": 2E0, "b": 4, "c": false}
1256
1257        {"b": 6, "a": 2.0}
1258        {"b": "5", "a": 2}
1259        {"b": 4e0}
1260        {"b": 7, "a": null}
1261        "#;
1262
1263        let schema = Arc::new(Schema::new(vec![
1264            Field::new("a", DataType::Utf8, true),
1265            Field::new("b", DataType::Utf8, true),
1266            Field::new("c", DataType::Utf8, true),
1267        ]));
1268
1269        let batches = do_read(buf, 1024, true, false, schema);
1270        assert_eq!(batches.len(), 1);
1271
1272        let col1 = batches[0].column(0).as_string::<i32>();
1273        assert_eq!(col1.null_count(), 2);
1274        assert_eq!(col1.value(0), "1");
1275        assert_eq!(col1.value(1), "2E0");
1276        assert_eq!(col1.value(2), "2.0");
1277        assert_eq!(col1.value(3), "2");
1278        assert!(col1.is_null(4));
1279        assert!(col1.is_null(5));
1280
1281        let col2 = batches[0].column(1).as_string::<i32>();
1282        assert_eq!(col2.null_count(), 0);
1283        assert_eq!(col2.value(0), "2");
1284        assert_eq!(col2.value(1), "4");
1285        assert_eq!(col2.value(2), "6");
1286        assert_eq!(col2.value(3), "5");
1287        assert_eq!(col2.value(4), "4e0");
1288        assert_eq!(col2.value(5), "7");
1289
1290        let col3 = batches[0].column(2).as_string::<i32>();
1291        assert_eq!(col3.null_count(), 4);
1292        assert_eq!(col3.value(0), "true");
1293        assert_eq!(col3.value(1), "false");
1294        assert!(col3.is_null(2));
1295        assert!(col3.is_null(3));
1296        assert!(col3.is_null(4));
1297        assert!(col3.is_null(5));
1298    }
1299
1300    fn test_decimal<T: DecimalType>(data_type: DataType) {
1301        let buf = r#"
1302        {"a": 1, "b": 2, "c": 38.30}
1303        {"a": 2, "b": 4, "c": 123.456}
1304
1305        {"b": 1337, "a": "2.0452"}
1306        {"b": "5", "a": "11034.2"}
1307        {"b": 40}
1308        {"b": 1234, "a": null}
1309        "#;
1310
1311        let schema = Arc::new(Schema::new(vec![
1312            Field::new("a", data_type.clone(), true),
1313            Field::new("b", data_type.clone(), true),
1314            Field::new("c", data_type, true),
1315        ]));
1316
1317        let batches = do_read(buf, 1024, true, false, schema);
1318        assert_eq!(batches.len(), 1);
1319
1320        let col1 = batches[0].column(0).as_primitive::<T>();
1321        assert_eq!(col1.null_count(), 2);
1322        assert!(col1.is_null(4));
1323        assert!(col1.is_null(5));
1324        assert_eq!(
1325            col1.values(),
1326            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1327        );
1328
1329        let col2 = batches[0].column(1).as_primitive::<T>();
1330        assert_eq!(col2.null_count(), 0);
1331        assert_eq!(
1332            col2.values(),
1333            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1334        );
1335
1336        let col3 = batches[0].column(2).as_primitive::<T>();
1337        assert_eq!(col3.null_count(), 4);
1338        assert!(!col3.is_null(0));
1339        assert!(!col3.is_null(1));
1340        assert!(col3.is_null(2));
1341        assert!(col3.is_null(3));
1342        assert!(col3.is_null(4));
1343        assert!(col3.is_null(5));
1344        assert_eq!(
1345            col3.values(),
1346            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1347        );
1348    }
1349
1350    #[test]
1351    fn test_decimals() {
1352        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1353        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1354        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1355        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1356    }
1357
1358    fn test_timestamp<T: ArrowTimestampType>() {
1359        let buf = r#"
1360        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1361        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1362
1363        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1364        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1365        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1366        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1367        "#;
1368
1369        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1370        let schema = Arc::new(Schema::new(vec![
1371            Field::new("a", T::DATA_TYPE, true),
1372            Field::new("b", T::DATA_TYPE, true),
1373            Field::new("c", T::DATA_TYPE, true),
1374            Field::new("d", with_timezone, true),
1375        ]));
1376
1377        let batches = do_read(buf, 1024, true, false, schema);
1378        assert_eq!(batches.len(), 1);
1379
1380        let unit_in_nanos: i64 = match T::UNIT {
1381            TimeUnit::Second => 1_000_000_000,
1382            TimeUnit::Millisecond => 1_000_000,
1383            TimeUnit::Microsecond => 1_000,
1384            TimeUnit::Nanosecond => 1,
1385        };
1386
1387        let col1 = batches[0].column(0).as_primitive::<T>();
1388        assert_eq!(col1.null_count(), 4);
1389        assert!(col1.is_null(2));
1390        assert!(col1.is_null(3));
1391        assert!(col1.is_null(4));
1392        assert!(col1.is_null(5));
1393        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1394
1395        let col2 = batches[0].column(1).as_primitive::<T>();
1396        assert_eq!(col2.null_count(), 1);
1397        assert!(col2.is_null(5));
1398        assert_eq!(
1399            col2.values(),
1400            &[
1401                1599572549190855000 / unit_in_nanos,
1402                1599572549190855000 / unit_in_nanos,
1403                1599572549000000000 / unit_in_nanos,
1404                40,
1405                1234,
1406                0
1407            ]
1408        );
1409
1410        let col3 = batches[0].column(2).as_primitive::<T>();
1411        assert_eq!(col3.null_count(), 0);
1412        assert_eq!(
1413            col3.values(),
1414            &[
1415                38,
1416                123,
1417                854702816123000000 / unit_in_nanos,
1418                1599572549190855000 / unit_in_nanos,
1419                854702816123000000 / unit_in_nanos,
1420                854738816123000000 / unit_in_nanos
1421            ]
1422        );
1423
1424        let col4 = batches[0].column(3).as_primitive::<T>();
1425
1426        assert_eq!(col4.null_count(), 0);
1427        assert_eq!(
1428            col4.values(),
1429            &[
1430                854674016123000000 / unit_in_nanos,
1431                123,
1432                854702816123000000 / unit_in_nanos,
1433                854720816123000000 / unit_in_nanos,
1434                854674016000000000 / unit_in_nanos,
1435                854640000000000000 / unit_in_nanos
1436            ]
1437        );
1438    }
1439
1440    #[test]
1441    fn test_timestamps() {
1442        test_timestamp::<TimestampSecondType>();
1443        test_timestamp::<TimestampMillisecondType>();
1444        test_timestamp::<TimestampMicrosecondType>();
1445        test_timestamp::<TimestampNanosecondType>();
1446    }
1447
1448    fn test_time<T: ArrowTemporalType>() {
1449        let buf = r#"
1450        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1451        {"a": 2, "b": "23:59:59", "c": 123.456}
1452
1453        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1454        {"b": 40, "c": "13:42:29.190855"}
1455        {"b": 1234, "a": null, "c": "09:26:56.123"}
1456        {"c": "14:26:56.123"}
1457        "#;
1458
1459        let unit = match T::DATA_TYPE {
1460            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1461            _ => unreachable!(),
1462        };
1463
1464        let unit_in_nanos = match unit {
1465            TimeUnit::Second => 1_000_000_000,
1466            TimeUnit::Millisecond => 1_000_000,
1467            TimeUnit::Microsecond => 1_000,
1468            TimeUnit::Nanosecond => 1,
1469        };
1470
1471        let schema = Arc::new(Schema::new(vec![
1472            Field::new("a", T::DATA_TYPE, true),
1473            Field::new("b", T::DATA_TYPE, true),
1474            Field::new("c", T::DATA_TYPE, true),
1475        ]));
1476
1477        let batches = do_read(buf, 1024, true, false, schema);
1478        assert_eq!(batches.len(), 1);
1479
1480        let col1 = batches[0].column(0).as_primitive::<T>();
1481        assert_eq!(col1.null_count(), 4);
1482        assert!(col1.is_null(2));
1483        assert!(col1.is_null(3));
1484        assert!(col1.is_null(4));
1485        assert!(col1.is_null(5));
1486        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1487
1488        let col2 = batches[0].column(1).as_primitive::<T>();
1489        assert_eq!(col2.null_count(), 1);
1490        assert!(col2.is_null(5));
1491        assert_eq!(
1492            col2.values(),
1493            &[
1494                34016123000000 / unit_in_nanos,
1495                86399000000000 / unit_in_nanos,
1496                64800000000000 / unit_in_nanos,
1497                40,
1498                1234,
1499                0
1500            ]
1501            .map(T::Native::usize_as)
1502        );
1503
1504        let col3 = batches[0].column(2).as_primitive::<T>();
1505        assert_eq!(col3.null_count(), 0);
1506        assert_eq!(
1507            col3.values(),
1508            &[
1509                38,
1510                123,
1511                34016123000000 / unit_in_nanos,
1512                49349190855000 / unit_in_nanos,
1513                34016123000000 / unit_in_nanos,
1514                52016123000000 / unit_in_nanos
1515            ]
1516            .map(T::Native::usize_as)
1517        );
1518    }
1519
1520    #[test]
1521    fn test_times() {
1522        test_time::<Time32MillisecondType>();
1523        test_time::<Time32SecondType>();
1524        test_time::<Time64MicrosecondType>();
1525        test_time::<Time64NanosecondType>();
1526    }
1527
1528    fn test_duration<T: ArrowTemporalType>() {
1529        let buf = r#"
1530        {"a": 1, "b": "2"}
1531        {"a": 3, "b": null}
1532        "#;
1533
1534        let schema = Arc::new(Schema::new(vec![
1535            Field::new("a", T::DATA_TYPE, true),
1536            Field::new("b", T::DATA_TYPE, true),
1537        ]));
1538
1539        let batches = do_read(buf, 1024, true, false, schema);
1540        assert_eq!(batches.len(), 1);
1541
1542        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1543        assert_eq!(col_a.null_count(), 0);
1544        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1545
1546        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1547        assert_eq!(col2.null_count(), 1);
1548        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1549    }
1550
1551    #[test]
1552    fn test_durations() {
1553        test_duration::<DurationNanosecondType>();
1554        test_duration::<DurationMicrosecondType>();
1555        test_duration::<DurationMillisecondType>();
1556        test_duration::<DurationSecondType>();
1557    }
1558
1559    #[test]
1560    fn test_delta_checkpoint() {
1561        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1562        let schema = Arc::new(Schema::new(vec![
1563            Field::new_struct(
1564                "protocol",
1565                vec![
1566                    Field::new("minReaderVersion", DataType::Int32, true),
1567                    Field::new("minWriterVersion", DataType::Int32, true),
1568                ],
1569                true,
1570            ),
1571            Field::new_struct(
1572                "add",
1573                vec![Field::new_map(
1574                    "partitionValues",
1575                    "key_value",
1576                    Field::new("key", DataType::Utf8, false),
1577                    Field::new("value", DataType::Utf8, true),
1578                    false,
1579                    false,
1580                )],
1581                true,
1582            ),
1583        ]));
1584
1585        let batches = do_read(json, 1024, true, false, schema);
1586        assert_eq!(batches.len(), 1);
1587
1588        let s: StructArray = batches.into_iter().next().unwrap().into();
1589        let opts = FormatOptions::default().with_null("null");
1590        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1591        assert_eq!(
1592            formatter.value(0).to_string(),
1593            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1594        );
1595    }
1596
1597    #[test]
1598    fn struct_nullability() {
1599        let do_test = |child: DataType| {
1600            // Test correctly enforced nullability
1601            let non_null = r#"{"foo": {}}"#;
1602            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1603                "foo",
1604                vec![Field::new("bar", child, false)],
1605                true,
1606            )]));
1607            let mut reader = ReaderBuilder::new(schema.clone())
1608                .build(Cursor::new(non_null.as_bytes()))
1609                .unwrap();
1610            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1611
1612            let null = r#"{"foo": {bar: null}}"#;
1613            let mut reader = ReaderBuilder::new(schema.clone())
1614                .build(Cursor::new(null.as_bytes()))
1615                .unwrap();
1616            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1617
1618            // Test nulls in nullable parent can mask nulls in non-nullable child
1619            let null = r#"{"foo": null}"#;
1620            let mut reader = ReaderBuilder::new(schema)
1621                .build(Cursor::new(null.as_bytes()))
1622                .unwrap();
1623            let batch = reader.next().unwrap().unwrap();
1624            assert_eq!(batch.num_columns(), 1);
1625            let foo = batch.column(0).as_struct();
1626            assert_eq!(foo.len(), 1);
1627            assert!(foo.is_null(0));
1628            assert_eq!(foo.num_columns(), 1);
1629
1630            let bar = foo.column(0);
1631            assert_eq!(bar.len(), 1);
1632            // Non-nullable child can still contain null as masked by parent
1633            assert!(bar.is_null(0));
1634        };
1635
1636        do_test(DataType::Boolean);
1637        do_test(DataType::Int32);
1638        do_test(DataType::Utf8);
1639        do_test(DataType::Decimal128(2, 1));
1640        do_test(DataType::Timestamp(
1641            TimeUnit::Microsecond,
1642            Some("+00:00".into()),
1643        ));
1644    }
1645
1646    #[test]
1647    fn test_truncation() {
1648        let buf = r#"
1649        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1650        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1651        {"i64": -9223372036854775808, "u64": 0 }
1652        {"i64": "-9223372036854775808", "u64": 0 }
1653        "#;
1654
1655        let schema = Arc::new(Schema::new(vec![
1656            Field::new("i64", DataType::Int64, true),
1657            Field::new("u64", DataType::UInt64, true),
1658        ]));
1659
1660        let batches = do_read(buf, 1024, true, false, schema);
1661        assert_eq!(batches.len(), 1);
1662
1663        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1664        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1665
1666        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1667        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1668    }
1669
1670    #[test]
1671    fn test_timestamp_truncation() {
1672        let buf = r#"
1673        {"time": 9223372036854775807 }
1674        {"time": -9223372036854775808 }
1675        {"time": 9e5 }
1676        "#;
1677
1678        let schema = Arc::new(Schema::new(vec![Field::new(
1679            "time",
1680            DataType::Timestamp(TimeUnit::Nanosecond, None),
1681            true,
1682        )]));
1683
1684        let batches = do_read(buf, 1024, true, false, schema);
1685        assert_eq!(batches.len(), 1);
1686
1687        let i64 = batches[0]
1688            .column(0)
1689            .as_primitive::<TimestampNanosecondType>();
1690        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1691    }
1692
1693    #[test]
1694    fn test_strict_mode_no_missing_columns_in_schema() {
1695        let buf = r#"
1696        {"a": 1, "b": "2", "c": true}
1697        {"a": 2E0, "b": "4", "c": false}
1698        "#;
1699
1700        let schema = Arc::new(Schema::new(vec![
1701            Field::new("a", DataType::Int16, false),
1702            Field::new("b", DataType::Utf8, false),
1703            Field::new("c", DataType::Boolean, false),
1704        ]));
1705
1706        let batches = do_read(buf, 1024, true, true, schema);
1707        assert_eq!(batches.len(), 1);
1708
1709        let buf = r#"
1710        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1711        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1712        "#;
1713
1714        let schema = Arc::new(Schema::new(vec![
1715            Field::new("a", DataType::Int16, false),
1716            Field::new("b", DataType::Utf8, false),
1717            Field::new_struct(
1718                "c",
1719                vec![
1720                    Field::new("a", DataType::Boolean, false),
1721                    Field::new("b", DataType::Int16, false),
1722                ],
1723                false,
1724            ),
1725        ]));
1726
1727        let batches = do_read(buf, 1024, true, true, schema);
1728        assert_eq!(batches.len(), 1);
1729    }
1730
1731    #[test]
1732    fn test_strict_mode_missing_columns_in_schema() {
1733        let buf = r#"
1734        {"a": 1, "b": "2", "c": true}
1735        {"a": 2E0, "b": "4", "c": false}
1736        "#;
1737
1738        let schema = Arc::new(Schema::new(vec![
1739            Field::new("a", DataType::Int16, true),
1740            Field::new("c", DataType::Boolean, true),
1741        ]));
1742
1743        let err = ReaderBuilder::new(schema)
1744            .with_batch_size(1024)
1745            .with_strict_mode(true)
1746            .build(Cursor::new(buf.as_bytes()))
1747            .unwrap()
1748            .read()
1749            .unwrap_err();
1750
1751        assert_eq!(
1752            err.to_string(),
1753            "Json error: column 'b' missing from schema"
1754        );
1755
1756        let buf = r#"
1757        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1758        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1759        "#;
1760
1761        let schema = Arc::new(Schema::new(vec![
1762            Field::new("a", DataType::Int16, false),
1763            Field::new("b", DataType::Utf8, false),
1764            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1765        ]));
1766
1767        let err = ReaderBuilder::new(schema)
1768            .with_batch_size(1024)
1769            .with_strict_mode(true)
1770            .build(Cursor::new(buf.as_bytes()))
1771            .unwrap()
1772            .read()
1773            .unwrap_err();
1774
1775        assert_eq!(
1776            err.to_string(),
1777            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1778        );
1779    }
1780
1781    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1782        let file = File::open(path).unwrap();
1783        let mut reader = BufReader::new(file);
1784        let schema = schema.unwrap_or_else(|| {
1785            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1786            reader.rewind().unwrap();
1787            schema
1788        });
1789        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1790        builder.build(reader).unwrap()
1791    }
1792
1793    #[test]
1794    fn test_json_basic() {
1795        let mut reader = read_file("test/data/basic.json", None);
1796        let batch = reader.next().unwrap().unwrap();
1797
1798        assert_eq!(8, batch.num_columns());
1799        assert_eq!(12, batch.num_rows());
1800
1801        let schema = reader.schema();
1802        let batch_schema = batch.schema();
1803        assert_eq!(schema, batch_schema);
1804
1805        let a = schema.column_with_name("a").unwrap();
1806        assert_eq!(0, a.0);
1807        assert_eq!(&DataType::Int64, a.1.data_type());
1808        let b = schema.column_with_name("b").unwrap();
1809        assert_eq!(1, b.0);
1810        assert_eq!(&DataType::Float64, b.1.data_type());
1811        let c = schema.column_with_name("c").unwrap();
1812        assert_eq!(2, c.0);
1813        assert_eq!(&DataType::Boolean, c.1.data_type());
1814        let d = schema.column_with_name("d").unwrap();
1815        assert_eq!(3, d.0);
1816        assert_eq!(&DataType::Utf8, d.1.data_type());
1817
1818        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1819        assert_eq!(1, aa.value(0));
1820        assert_eq!(-10, aa.value(1));
1821        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1822        assert_eq!(2.0, bb.value(0));
1823        assert_eq!(-3.5, bb.value(1));
1824        let cc = batch.column(c.0).as_boolean();
1825        assert!(!cc.value(0));
1826        assert!(cc.value(10));
1827        let dd = batch.column(d.0).as_string::<i32>();
1828        assert_eq!("4", dd.value(0));
1829        assert_eq!("text", dd.value(8));
1830    }
1831
1832    #[test]
1833    fn test_json_empty_projection() {
1834        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1835        let batch = reader.next().unwrap().unwrap();
1836
1837        assert_eq!(0, batch.num_columns());
1838        assert_eq!(12, batch.num_rows());
1839    }
1840
1841    #[test]
1842    fn test_json_basic_with_nulls() {
1843        let mut reader = read_file("test/data/basic_nulls.json", None);
1844        let batch = reader.next().unwrap().unwrap();
1845
1846        assert_eq!(4, batch.num_columns());
1847        assert_eq!(12, batch.num_rows());
1848
1849        let schema = reader.schema();
1850        let batch_schema = batch.schema();
1851        assert_eq!(schema, batch_schema);
1852
1853        let a = schema.column_with_name("a").unwrap();
1854        assert_eq!(&DataType::Int64, a.1.data_type());
1855        let b = schema.column_with_name("b").unwrap();
1856        assert_eq!(&DataType::Float64, b.1.data_type());
1857        let c = schema.column_with_name("c").unwrap();
1858        assert_eq!(&DataType::Boolean, c.1.data_type());
1859        let d = schema.column_with_name("d").unwrap();
1860        assert_eq!(&DataType::Utf8, d.1.data_type());
1861
1862        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1863        assert!(aa.is_valid(0));
1864        assert!(!aa.is_valid(1));
1865        assert!(!aa.is_valid(11));
1866        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1867        assert!(bb.is_valid(0));
1868        assert!(!bb.is_valid(2));
1869        assert!(!bb.is_valid(11));
1870        let cc = batch.column(c.0).as_boolean();
1871        assert!(cc.is_valid(0));
1872        assert!(!cc.is_valid(4));
1873        assert!(!cc.is_valid(11));
1874        let dd = batch.column(d.0).as_string::<i32>();
1875        assert!(!dd.is_valid(0));
1876        assert!(dd.is_valid(1));
1877        assert!(!dd.is_valid(4));
1878        assert!(!dd.is_valid(11));
1879    }
1880
1881    #[test]
1882    fn test_json_basic_schema() {
1883        let schema = Schema::new(vec![
1884            Field::new("a", DataType::Int64, true),
1885            Field::new("b", DataType::Float32, false),
1886            Field::new("c", DataType::Boolean, false),
1887            Field::new("d", DataType::Utf8, false),
1888        ]);
1889
1890        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1891        let reader_schema = reader.schema();
1892        assert_eq!(reader_schema.as_ref(), &schema);
1893        let batch = reader.next().unwrap().unwrap();
1894
1895        assert_eq!(4, batch.num_columns());
1896        assert_eq!(12, batch.num_rows());
1897
1898        let schema = batch.schema();
1899
1900        let a = schema.column_with_name("a").unwrap();
1901        assert_eq!(&DataType::Int64, a.1.data_type());
1902        let b = schema.column_with_name("b").unwrap();
1903        assert_eq!(&DataType::Float32, b.1.data_type());
1904        let c = schema.column_with_name("c").unwrap();
1905        assert_eq!(&DataType::Boolean, c.1.data_type());
1906        let d = schema.column_with_name("d").unwrap();
1907        assert_eq!(&DataType::Utf8, d.1.data_type());
1908
1909        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1910        assert_eq!(1, aa.value(0));
1911        assert_eq!(100000000000000, aa.value(11));
1912        let bb = batch.column(b.0).as_primitive::<Float32Type>();
1913        assert_eq!(2.0, bb.value(0));
1914        assert_eq!(-3.5, bb.value(1));
1915    }
1916
1917    #[test]
1918    fn test_json_basic_schema_projection() {
1919        let schema = Schema::new(vec![
1920            Field::new("a", DataType::Int64, true),
1921            Field::new("c", DataType::Boolean, false),
1922        ]);
1923
1924        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1925        let batch = reader.next().unwrap().unwrap();
1926
1927        assert_eq!(2, batch.num_columns());
1928        assert_eq!(2, batch.schema().fields().len());
1929        assert_eq!(12, batch.num_rows());
1930
1931        assert_eq!(batch.schema().as_ref(), &schema);
1932
1933        let a = schema.column_with_name("a").unwrap();
1934        assert_eq!(0, a.0);
1935        assert_eq!(&DataType::Int64, a.1.data_type());
1936        let c = schema.column_with_name("c").unwrap();
1937        assert_eq!(1, c.0);
1938        assert_eq!(&DataType::Boolean, c.1.data_type());
1939    }
1940
1941    #[test]
1942    fn test_json_arrays() {
1943        let mut reader = read_file("test/data/arrays.json", None);
1944        let batch = reader.next().unwrap().unwrap();
1945
1946        assert_eq!(4, batch.num_columns());
1947        assert_eq!(3, batch.num_rows());
1948
1949        let schema = batch.schema();
1950
1951        let a = schema.column_with_name("a").unwrap();
1952        assert_eq!(&DataType::Int64, a.1.data_type());
1953        let b = schema.column_with_name("b").unwrap();
1954        assert_eq!(
1955            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1956            b.1.data_type()
1957        );
1958        let c = schema.column_with_name("c").unwrap();
1959        assert_eq!(
1960            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1961            c.1.data_type()
1962        );
1963        let d = schema.column_with_name("d").unwrap();
1964        assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967        assert_eq!(1, aa.value(0));
1968        assert_eq!(-10, aa.value(1));
1969        assert_eq!(1627668684594000000, aa.value(2));
1970        let bb = batch.column(b.0).as_list::<i32>();
1971        let bb = bb.values().as_primitive::<Float64Type>();
1972        assert_eq!(9, bb.len());
1973        assert_eq!(2.0, bb.value(0));
1974        assert_eq!(-6.1, bb.value(5));
1975        assert!(!bb.is_valid(7));
1976
1977        let cc = batch
1978            .column(c.0)
1979            .as_any()
1980            .downcast_ref::<ListArray>()
1981            .unwrap();
1982        let cc = cc.values().as_boolean();
1983        assert_eq!(6, cc.len());
1984        assert!(!cc.value(0));
1985        assert!(!cc.value(4));
1986        assert!(!cc.is_valid(5));
1987    }
1988
1989    #[test]
1990    fn test_empty_json_arrays() {
1991        let json_content = r#"
1992            {"items": []}
1993            {"items": null}
1994            {}
1995            "#;
1996
1997        let schema = Arc::new(Schema::new(vec![Field::new(
1998            "items",
1999            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2000            true,
2001        )]));
2002
2003        let batches = do_read(json_content, 1024, false, false, schema);
2004        assert_eq!(batches.len(), 1);
2005
2006        let col1 = batches[0].column(0).as_list::<i32>();
2007        assert_eq!(col1.null_count(), 2);
2008        assert!(col1.value(0).is_empty());
2009        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2010        assert!(col1.is_null(1));
2011        assert!(col1.is_null(2));
2012    }
2013
2014    #[test]
2015    fn test_nested_empty_json_arrays() {
2016        let json_content = r#"
2017            {"items": [[],[]]}
2018            {"items": [[null, null],[null]]}
2019            "#;
2020
2021        let schema = Arc::new(Schema::new(vec![Field::new(
2022            "items",
2023            DataType::List(FieldRef::new(Field::new_list_field(
2024                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2025                true,
2026            ))),
2027            true,
2028        )]));
2029
2030        let batches = do_read(json_content, 1024, false, false, schema);
2031        assert_eq!(batches.len(), 1);
2032
2033        let col1 = batches[0].column(0).as_list::<i32>();
2034        assert_eq!(col1.null_count(), 0);
2035        assert_eq!(col1.value(0).len(), 2);
2036        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2037        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2038
2039        assert_eq!(col1.value(1).len(), 2);
2040        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2041        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2042    }
2043
2044    #[test]
2045    fn test_nested_list_json_arrays() {
2046        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2047        let a_struct_field = Field::new_struct(
2048            "a",
2049            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2050            true,
2051        );
2052        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2053        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2054        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2055        let json_content = r#"
2056        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2057        {"a": [{"b": false, "c": null}]}
2058        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2059        {"a": null}
2060        {"a": []}
2061        {"a": [null]}
2062        "#;
2063        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2064
2065        // build expected output
2066        let d = StringArray::from(vec![
2067            Some("a_text"),
2068            Some("b_text"),
2069            None,
2070            Some("c_text"),
2071            Some("d_text"),
2072            None,
2073            None,
2074        ]);
2075        let c = ArrayDataBuilder::new(c_field.data_type().clone())
2076            .len(7)
2077            .add_child_data(d.to_data())
2078            .null_bit_buffer(Some(Buffer::from([0b00111011])))
2079            .build()
2080            .unwrap();
2081        let b = BooleanArray::from(vec![
2082            Some(true),
2083            Some(false),
2084            Some(false),
2085            Some(true),
2086            None,
2087            Some(true),
2088            None,
2089        ]);
2090        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2091            .len(7)
2092            .add_child_data(b.to_data())
2093            .add_child_data(c.clone())
2094            .null_bit_buffer(Some(Buffer::from([0b00111111])))
2095            .build()
2096            .unwrap();
2097        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2098            .len(6)
2099            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2100            .add_child_data(a)
2101            .null_bit_buffer(Some(Buffer::from([0b00110111])))
2102            .build()
2103            .unwrap();
2104        let expected = make_array(a_list);
2105
2106        // compare `a` with result from json reader
2107        let batch = reader.next().unwrap().unwrap();
2108        let read = batch.column(0);
2109        assert_eq!(read.len(), 6);
2110        // compare the arrays the long way around, to better detect differences
2111        let read: &ListArray = read.as_list::<i32>();
2112        let expected = expected.as_list::<i32>();
2113        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2114        // compare list null buffers
2115        assert_eq!(read.nulls(), expected.nulls());
2116        // build struct from list
2117        let struct_array = read.values().as_struct();
2118        let expected_struct_array = expected.values().as_struct();
2119
2120        assert_eq!(7, struct_array.len());
2121        assert_eq!(1, struct_array.null_count());
2122        assert_eq!(7, expected_struct_array.len());
2123        assert_eq!(1, expected_struct_array.null_count());
2124        // test struct's nulls
2125        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2126        // test struct's fields
2127        let read_b = struct_array.column(0);
2128        assert_eq!(read_b.as_ref(), &b);
2129        let read_c = struct_array.column(1);
2130        assert_eq!(read_c.to_data(), c);
2131        let read_c = read_c.as_struct();
2132        let read_d = read_c.column(0);
2133        assert_eq!(read_d.as_ref(), &d);
2134
2135        assert_eq!(read, expected);
2136    }
2137
2138    #[test]
2139    fn test_skip_empty_lines() {
2140        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2141        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2142        let json_content = "
2143        {\"a\": 1}
2144        {\"a\": 2}
2145        {\"a\": 3}";
2146        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2147        let batch = reader.next().unwrap().unwrap();
2148
2149        assert_eq!(1, batch.num_columns());
2150        assert_eq!(3, batch.num_rows());
2151
2152        let schema = reader.schema();
2153        let c = schema.column_with_name("a").unwrap();
2154        assert_eq!(&DataType::Int64, c.1.data_type());
2155    }
2156
2157    #[test]
2158    fn test_with_multiple_batches() {
2159        let file = File::open("test/data/basic_nulls.json").unwrap();
2160        let mut reader = BufReader::new(file);
2161        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2162        reader.rewind().unwrap();
2163
2164        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2165        let mut reader = builder.build(reader).unwrap();
2166
2167        let mut num_records = Vec::new();
2168        while let Some(rb) = reader.next().transpose().unwrap() {
2169            num_records.push(rb.num_rows());
2170        }
2171
2172        assert_eq!(vec![5, 5, 2], num_records);
2173    }
2174
2175    #[test]
2176    fn test_timestamp_from_json_seconds() {
2177        let schema = Schema::new(vec![Field::new(
2178            "a",
2179            DataType::Timestamp(TimeUnit::Second, None),
2180            true,
2181        )]);
2182
2183        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2184        let batch = reader.next().unwrap().unwrap();
2185
2186        assert_eq!(1, batch.num_columns());
2187        assert_eq!(12, batch.num_rows());
2188
2189        let schema = reader.schema();
2190        let batch_schema = batch.schema();
2191        assert_eq!(schema, batch_schema);
2192
2193        let a = schema.column_with_name("a").unwrap();
2194        assert_eq!(
2195            &DataType::Timestamp(TimeUnit::Second, None),
2196            a.1.data_type()
2197        );
2198
2199        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2200        assert!(aa.is_valid(0));
2201        assert!(!aa.is_valid(1));
2202        assert!(!aa.is_valid(2));
2203        assert_eq!(1, aa.value(0));
2204        assert_eq!(1, aa.value(3));
2205        assert_eq!(5, aa.value(7));
2206    }
2207
2208    #[test]
2209    fn test_timestamp_from_json_milliseconds() {
2210        let schema = Schema::new(vec![Field::new(
2211            "a",
2212            DataType::Timestamp(TimeUnit::Millisecond, None),
2213            true,
2214        )]);
2215
2216        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2217        let batch = reader.next().unwrap().unwrap();
2218
2219        assert_eq!(1, batch.num_columns());
2220        assert_eq!(12, batch.num_rows());
2221
2222        let schema = reader.schema();
2223        let batch_schema = batch.schema();
2224        assert_eq!(schema, batch_schema);
2225
2226        let a = schema.column_with_name("a").unwrap();
2227        assert_eq!(
2228            &DataType::Timestamp(TimeUnit::Millisecond, None),
2229            a.1.data_type()
2230        );
2231
2232        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2233        assert!(aa.is_valid(0));
2234        assert!(!aa.is_valid(1));
2235        assert!(!aa.is_valid(2));
2236        assert_eq!(1, aa.value(0));
2237        assert_eq!(1, aa.value(3));
2238        assert_eq!(5, aa.value(7));
2239    }
2240
2241    #[test]
2242    fn test_date_from_json_milliseconds() {
2243        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2244
2245        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2246        let batch = reader.next().unwrap().unwrap();
2247
2248        assert_eq!(1, batch.num_columns());
2249        assert_eq!(12, batch.num_rows());
2250
2251        let schema = reader.schema();
2252        let batch_schema = batch.schema();
2253        assert_eq!(schema, batch_schema);
2254
2255        let a = schema.column_with_name("a").unwrap();
2256        assert_eq!(&DataType::Date64, a.1.data_type());
2257
2258        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2259        assert!(aa.is_valid(0));
2260        assert!(!aa.is_valid(1));
2261        assert!(!aa.is_valid(2));
2262        assert_eq!(1, aa.value(0));
2263        assert_eq!(1, aa.value(3));
2264        assert_eq!(5, aa.value(7));
2265    }
2266
2267    #[test]
2268    fn test_time_from_json_nanoseconds() {
2269        let schema = Schema::new(vec![Field::new(
2270            "a",
2271            DataType::Time64(TimeUnit::Nanosecond),
2272            true,
2273        )]);
2274
2275        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2276        let batch = reader.next().unwrap().unwrap();
2277
2278        assert_eq!(1, batch.num_columns());
2279        assert_eq!(12, batch.num_rows());
2280
2281        let schema = reader.schema();
2282        let batch_schema = batch.schema();
2283        assert_eq!(schema, batch_schema);
2284
2285        let a = schema.column_with_name("a").unwrap();
2286        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2287
2288        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2289        assert!(aa.is_valid(0));
2290        assert!(!aa.is_valid(1));
2291        assert!(!aa.is_valid(2));
2292        assert_eq!(1, aa.value(0));
2293        assert_eq!(1, aa.value(3));
2294        assert_eq!(5, aa.value(7));
2295    }
2296
2297    #[test]
2298    fn test_json_iterator() {
2299        let file = File::open("test/data/basic.json").unwrap();
2300        let mut reader = BufReader::new(file);
2301        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2302        reader.rewind().unwrap();
2303
2304        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2305        let reader = builder.build(reader).unwrap();
2306        let schema = reader.schema();
2307        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2308
2309        let mut sum_num_rows = 0;
2310        let mut num_batches = 0;
2311        let mut sum_a = 0;
2312        for batch in reader {
2313            let batch = batch.unwrap();
2314            assert_eq!(8, batch.num_columns());
2315            sum_num_rows += batch.num_rows();
2316            num_batches += 1;
2317            let batch_schema = batch.schema();
2318            assert_eq!(schema, batch_schema);
2319            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2320            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2321        }
2322        assert_eq!(12, sum_num_rows);
2323        assert_eq!(3, num_batches);
2324        assert_eq!(100000000000011, sum_a);
2325    }
2326
2327    #[test]
2328    fn test_decoder_error() {
2329        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2330            "a",
2331            vec![Field::new("child", DataType::Int32, false)],
2332            true,
2333        )]));
2334
2335        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2336        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2337        assert!(decoder.tape_decoder.has_partial_row());
2338        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2339        let _ = decoder.flush().unwrap_err();
2340        assert!(decoder.tape_decoder.has_partial_row());
2341        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2342
2343        let parse_err = |s: &str| {
2344            ReaderBuilder::new(schema.clone())
2345                .build(Cursor::new(s.as_bytes()))
2346                .unwrap()
2347                .next()
2348                .unwrap()
2349                .unwrap_err()
2350                .to_string()
2351        };
2352
2353        let err = parse_err(r#"{"a": 123}"#);
2354        assert_eq!(
2355            err,
2356            "Json error: whilst decoding field 'a': expected { got 123"
2357        );
2358
2359        let err = parse_err(r#"{"a": ["bar"]}"#);
2360        assert_eq!(
2361            err,
2362            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2363        );
2364
2365        let err = parse_err(r#"{"a": []}"#);
2366        assert_eq!(
2367            err,
2368            "Json error: whilst decoding field 'a': expected { got []"
2369        );
2370
2371        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2372        assert_eq!(
2373            err,
2374            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2375        );
2376
2377        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2378        assert_eq!(
2379            err,
2380            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2381        );
2382
2383        let err = parse_err(r#"{"a": true}"#);
2384        assert_eq!(
2385            err,
2386            "Json error: whilst decoding field 'a': expected { got true"
2387        );
2388
2389        let err = parse_err(r#"{"a": false}"#);
2390        assert_eq!(
2391            err,
2392            "Json error: whilst decoding field 'a': expected { got false"
2393        );
2394
2395        let err = parse_err(r#"{"a": "foo"}"#);
2396        assert_eq!(
2397            err,
2398            "Json error: whilst decoding field 'a': expected { got \"foo\""
2399        );
2400
2401        let err = parse_err(r#"{"a": {"child": false}}"#);
2402        assert_eq!(
2403            err,
2404            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2405        );
2406
2407        let err = parse_err(r#"{"a": {"child": []}}"#);
2408        assert_eq!(
2409            err,
2410            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2411        );
2412
2413        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2414        assert_eq!(
2415            err,
2416            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2417        );
2418
2419        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2420        assert_eq!(
2421            err,
2422            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2423        );
2424    }
2425
2426    #[test]
2427    fn test_serialize_timestamp() {
2428        let json = vec![
2429            json!({"timestamp": 1681319393}),
2430            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2431        ];
2432        let schema = Schema::new(vec![Field::new(
2433            "timestamp",
2434            DataType::Timestamp(TimeUnit::Second, None),
2435            true,
2436        )]);
2437        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2438            .build_decoder()
2439            .unwrap();
2440        decoder.serialize(&json).unwrap();
2441        let batch = decoder.flush().unwrap().unwrap();
2442        assert_eq!(batch.num_rows(), 2);
2443        assert_eq!(batch.num_columns(), 1);
2444        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2445        assert_eq!(values.values(), &[1681319393, -7200]);
2446    }
2447
2448    #[test]
2449    fn test_serialize_decimal() {
2450        let json = vec![
2451            json!({"decimal": 1.234}),
2452            json!({"decimal": "1.234"}),
2453            json!({"decimal": 1234}),
2454            json!({"decimal": "1234"}),
2455        ];
2456        let schema = Schema::new(vec![Field::new(
2457            "decimal",
2458            DataType::Decimal128(10, 3),
2459            true,
2460        )]);
2461        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2462            .build_decoder()
2463            .unwrap();
2464        decoder.serialize(&json).unwrap();
2465        let batch = decoder.flush().unwrap().unwrap();
2466        assert_eq!(batch.num_rows(), 4);
2467        assert_eq!(batch.num_columns(), 1);
2468        let values = batch.column(0).as_primitive::<Decimal128Type>();
2469        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2470    }
2471
2472    #[test]
2473    fn test_serde_field() {
2474        let field = Field::new("int", DataType::Int32, true);
2475        let mut decoder = ReaderBuilder::new_with_field(field)
2476            .build_decoder()
2477            .unwrap();
2478        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2479        let b = decoder.flush().unwrap().unwrap();
2480        let values = b.column(0).as_primitive::<Int32Type>().values();
2481        assert_eq!(values, &[1, 2, 3, 4]);
2482    }
2483
2484    #[test]
2485    fn test_serde_large_numbers() {
2486        let field = Field::new("int", DataType::Int64, true);
2487        let mut decoder = ReaderBuilder::new_with_field(field)
2488            .build_decoder()
2489            .unwrap();
2490
2491        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2492        let b = decoder.flush().unwrap().unwrap();
2493        let values = b.column(0).as_primitive::<Int64Type>().values();
2494        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2495
2496        let field = Field::new(
2497            "int",
2498            DataType::Timestamp(TimeUnit::Microsecond, None),
2499            true,
2500        );
2501        let mut decoder = ReaderBuilder::new_with_field(field)
2502            .build_decoder()
2503            .unwrap();
2504
2505        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2506        let b = decoder.flush().unwrap().unwrap();
2507        let values = b
2508            .column(0)
2509            .as_primitive::<TimestampMicrosecondType>()
2510            .values();
2511        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2512    }
2513
2514    #[test]
2515    fn test_coercing_primitive_into_string_decoder() {
2516        let buf = &format!(
2517            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2518            (i32::MAX as i64 + 10),
2519            i64::MAX - 10
2520        );
2521        let schema = Schema::new(vec![
2522            Field::new("a", DataType::Float64, true),
2523            Field::new("b", DataType::Utf8, true),
2524            Field::new("c", DataType::Utf8, true),
2525        ]);
2526        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2527        let schema_ref = Arc::new(schema);
2528
2529        // read record batches
2530        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2531        let mut decoder = reader.build_decoder().unwrap();
2532        decoder.serialize(json_array.as_slice()).unwrap();
2533        let batch = decoder.flush().unwrap().unwrap();
2534        assert_eq!(
2535            batch,
2536            RecordBatch::try_new(
2537                schema_ref,
2538                vec![
2539                    Arc::new(Float64Array::from(vec![
2540                        1.0,
2541                        2.0,
2542                        (i32::MAX as i64 + 10) as f64,
2543                        (i64::MAX - 10) as f64
2544                    ])),
2545                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2546                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2547                ]
2548            )
2549            .unwrap()
2550        );
2551    }
2552
2553    // Parse the given `row` in `struct_mode` as a type given by fields.
2554    //
2555    // If as_struct == true, wrap the fields in a Struct field with name "r".
2556    // If as_struct == false, wrap the fields in a Schema.
2557    fn _parse_structs(
2558        row: &str,
2559        struct_mode: StructMode,
2560        fields: Fields,
2561        as_struct: bool,
2562    ) -> Result<RecordBatch, ArrowError> {
2563        let builder = if as_struct {
2564            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2565        } else {
2566            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2567        };
2568        builder
2569            .with_struct_mode(struct_mode)
2570            .build(Cursor::new(row.as_bytes()))
2571            .unwrap()
2572            .next()
2573            .unwrap()
2574    }
2575
2576    #[test]
2577    fn test_struct_decoding_list_length() {
2578        use arrow_array::array;
2579
2580        let row = "[1, 2]";
2581
2582        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2583        let too_few_fields = Fields::from(fields.clone());
2584        fields.push(Field::new("b", DataType::Int32, true));
2585        let correct_fields = Fields::from(fields.clone());
2586        fields.push(Field::new("c", DataType::Int32, true));
2587        let too_many_fields = Fields::from(fields.clone());
2588
2589        let parse = |fields: Fields, as_struct: bool| {
2590            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2591        };
2592
2593        let expected_row = StructArray::new(
2594            correct_fields.clone(),
2595            vec![
2596                Arc::new(array::Int32Array::from(vec![1])),
2597                Arc::new(array::Int32Array::from(vec![2])),
2598            ],
2599            None,
2600        );
2601        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2602
2603        assert_eq!(
2604            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2605            "Json error: found extra columns for 1 fields".to_string()
2606        );
2607        assert_eq!(
2608            parse(too_few_fields, false).unwrap_err().to_string(),
2609            "Json error: found extra columns for 1 fields".to_string()
2610        );
2611        assert_eq!(
2612            parse(correct_fields.clone(), true).unwrap(),
2613            RecordBatch::try_new(
2614                Arc::new(Schema::new(vec![row_field])),
2615                vec![Arc::new(expected_row.clone())]
2616            )
2617            .unwrap()
2618        );
2619        assert_eq!(
2620            parse(correct_fields, false).unwrap(),
2621            RecordBatch::from(expected_row)
2622        );
2623        assert_eq!(
2624            parse(too_many_fields.clone(), true)
2625                .unwrap_err()
2626                .to_string(),
2627            "Json error: found 2 columns for 3 fields".to_string()
2628        );
2629        assert_eq!(
2630            parse(too_many_fields, false).unwrap_err().to_string(),
2631            "Json error: found 2 columns for 3 fields".to_string()
2632        );
2633    }
2634
2635    #[test]
2636    fn test_struct_decoding() {
2637        use arrow_array::builder;
2638
2639        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2640        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2641        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2642
2643        let struct_fields = Fields::from(vec![
2644            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2645            Field::new_map(
2646                "c",
2647                "entries",
2648                Field::new("keys", DataType::Utf8, false),
2649                Field::new("values", DataType::Int32, true),
2650                false,
2651                false,
2652            ),
2653        ]);
2654
2655        let list_array =
2656            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2657
2658        let map_array = {
2659            let mut map_builder = builder::MapBuilder::new(
2660                None,
2661                builder::StringBuilder::new(),
2662                builder::Int32Builder::new(),
2663            );
2664            map_builder.keys().append_value("d");
2665            map_builder.values().append_value(3);
2666            map_builder.append(true).unwrap();
2667            map_builder.finish()
2668        };
2669
2670        let struct_array = StructArray::new(
2671            struct_fields.clone(),
2672            vec![Arc::new(list_array), Arc::new(map_array)],
2673            None,
2674        );
2675
2676        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2677        let schema = Arc::new(Schema::new(fields.clone()));
2678        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2679
2680        let parse = |row: &str, struct_mode: StructMode| {
2681            _parse_structs(row, struct_mode, fields.clone(), false)
2682        };
2683
2684        assert_eq!(
2685            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2686            expected
2687        );
2688        assert_eq!(
2689            parse(nested_list_json, StructMode::ObjectOnly)
2690                .unwrap_err()
2691                .to_string(),
2692            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2693        );
2694        assert_eq!(
2695            parse(nested_mixed_json, StructMode::ObjectOnly)
2696                .unwrap_err()
2697                .to_string(),
2698            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2699        );
2700
2701        assert_eq!(
2702            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2703            expected
2704        );
2705        assert_eq!(
2706            parse(nested_object_json, StructMode::ListOnly)
2707                .unwrap_err()
2708                .to_string(),
2709            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2710        );
2711        assert_eq!(
2712            parse(nested_mixed_json, StructMode::ListOnly)
2713                .unwrap_err()
2714                .to_string(),
2715            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2716        );
2717    }
2718
2719    // Test cases:
2720    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2721    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2722    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2723    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2724    #[test]
2725    fn test_struct_decoding_empty_list() {
2726        let int_field = Field::new("a", DataType::Int32, true);
2727        let struct_field = Field::new(
2728            "r",
2729            DataType::Struct(Fields::from(vec![int_field.clone()])),
2730            true,
2731        );
2732
2733        let parse = |row: &str, as_struct: bool, field: Field| {
2734            _parse_structs(
2735                row,
2736                StructMode::ListOnly,
2737                Fields::from(vec![field]),
2738                as_struct,
2739            )
2740        };
2741
2742        // Missing fields
2743        assert_eq!(
2744            parse("[]", true, struct_field.clone())
2745                .unwrap_err()
2746                .to_string(),
2747            "Json error: found 0 columns for 1 fields".to_owned()
2748        );
2749        assert_eq!(
2750            parse("[]", false, int_field.clone())
2751                .unwrap_err()
2752                .to_string(),
2753            "Json error: found 0 columns for 1 fields".to_owned()
2754        );
2755        assert_eq!(
2756            parse("[]", false, struct_field.clone())
2757                .unwrap_err()
2758                .to_string(),
2759            "Json error: found 0 columns for 1 fields".to_owned()
2760        );
2761        assert_eq!(
2762            parse("[[]]", false, struct_field.clone())
2763                .unwrap_err()
2764                .to_string(),
2765            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2766        );
2767    }
2768
2769    #[test]
2770    fn test_decode_list_struct_with_wrong_types() {
2771        let int_field = Field::new("a", DataType::Int32, true);
2772        let struct_field = Field::new(
2773            "r",
2774            DataType::Struct(Fields::from(vec![int_field.clone()])),
2775            true,
2776        );
2777
2778        let parse = |row: &str, as_struct: bool, field: Field| {
2779            _parse_structs(
2780                row,
2781                StructMode::ListOnly,
2782                Fields::from(vec![field]),
2783                as_struct,
2784            )
2785        };
2786
2787        // Wrong values
2788        assert_eq!(
2789            parse(r#"[["a"]]"#, false, struct_field.clone())
2790                .unwrap_err()
2791                .to_string(),
2792            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2793        );
2794        assert_eq!(
2795            parse(r#"[["a"]]"#, true, struct_field.clone())
2796                .unwrap_err()
2797                .to_string(),
2798            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2799        );
2800        assert_eq!(
2801            parse(r#"["a"]"#, true, int_field.clone())
2802                .unwrap_err()
2803                .to_string(),
2804            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2805        );
2806        assert_eq!(
2807            parse(r#"["a"]"#, false, int_field.clone())
2808                .unwrap_err()
2809                .to_string(),
2810            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2811        );
2812    }
2813}