Skip to main content

arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use crate::StructMode;
137use crate::reader::binary_array::{
138    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::borrow::Cow;
141use std::io::BufRead;
142use std::sync::Arc;
143
144use chrono::Utc;
145use serde_core::Serialize;
146
147use arrow_array::timezone::Tz;
148use arrow_array::types::*;
149use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
150use arrow_data::ArrayData;
151use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
152pub use schema::*;
153
154use crate::reader::boolean_array::BooleanArrayDecoder;
155use crate::reader::decimal_array::DecimalArrayDecoder;
156use crate::reader::list_array::ListArrayDecoder;
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::string_array::StringArrayDecoder;
161use crate::reader::string_view_array::StringViewArrayDecoder;
162use crate::reader::struct_array::StructArrayDecoder;
163use crate::reader::tape::{Tape, TapeDecoder};
164use crate::reader::timestamp_array::TimestampArrayDecoder;
165
166mod binary_array;
167mod boolean_array;
168mod decimal_array;
169mod list_array;
170mod map_array;
171mod null_array;
172mod primitive_array;
173mod schema;
174mod serializer;
175mod string_array;
176mod string_view_array;
177mod struct_array;
178mod tape;
179mod timestamp_array;
180
181/// A builder for [`Reader`] and [`Decoder`]
182pub struct ReaderBuilder {
183    batch_size: usize,
184    coerce_primitive: bool,
185    strict_mode: bool,
186    is_field: bool,
187    struct_mode: StructMode,
188
189    schema: SchemaRef,
190}
191
192impl ReaderBuilder {
193    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
194    ///
195    /// This could be obtained using [`infer_json_schema`] if not known
196    ///
197    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
198    /// In this case, an error is returned when a column is missing from `schema`.
199    ///
200    /// [`infer_json_schema`]: crate::reader::infer_json_schema
201    pub fn new(schema: SchemaRef) -> Self {
202        Self {
203            batch_size: 1024,
204            coerce_primitive: false,
205            strict_mode: false,
206            is_field: false,
207            struct_mode: Default::default(),
208            schema,
209        }
210    }
211
212    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
213    ///
214    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
215    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
216    ///
217    /// ```
218    /// # use std::sync::Arc;
219    /// # use arrow_array::cast::AsArray;
220    /// # use arrow_array::types::Int32Type;
221    /// # use arrow_json::ReaderBuilder;
222    /// # use arrow_schema::{DataType, Field};
223    /// // Root of JSON schema is a numeric type
224    /// let data = "1\n2\n3\n";
225    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
226    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
227    /// let b = reader.next().unwrap().unwrap();
228    /// let values = b.column(0).as_primitive::<Int32Type>().values();
229    /// assert_eq!(values, &[1, 2, 3]);
230    ///
231    /// // Root of JSON schema is a list type
232    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
233    /// let field = Field::new_list("int", field.clone(), true);
234    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
235    /// let b = reader.next().unwrap().unwrap();
236    /// let list = b.column(0).as_list::<i32>();
237    ///
238    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
239    /// let list_values = list.values().as_primitive::<Int32Type>();
240    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
241    /// ```
242    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
243        Self {
244            batch_size: 1024,
245            coerce_primitive: false,
246            strict_mode: false,
247            is_field: true,
248            struct_mode: Default::default(),
249            schema: Arc::new(Schema::new([field.into()])),
250        }
251    }
252
253    /// Sets the batch size in rows to read
254    pub fn with_batch_size(self, batch_size: usize) -> Self {
255        Self { batch_size, ..self }
256    }
257
258    /// Sets if the decoder should coerce primitive values (bool and number) into string
259    /// when the Schema's column is Utf8 or LargeUtf8.
260    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
261        Self {
262            coerce_primitive,
263            ..self
264        }
265    }
266
267    /// Sets if the decoder should return an error if it encounters a column not
268    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
269    /// `strict_mode` is effectively `true`. It is required for all fields of
270    /// the struct to be in the list: without field names, there is no way to
271    /// determine which field is missing.
272    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
273        Self {
274            strict_mode,
275            ..self
276        }
277    }
278
279    /// Set the [`StructMode`] for the reader, which determines whether structs
280    /// can be decoded from JSON as objects or lists. For more details refer to
281    /// the enum documentation. Default is to use `ObjectOnly`.
282    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
283        Self {
284            struct_mode,
285            ..self
286        }
287    }
288
289    /// Create a [`Reader`] with the provided [`BufRead`]
290    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
291        Ok(Reader {
292            reader,
293            decoder: self.build_decoder()?,
294        })
295    }
296
297    /// Create a [`Decoder`]
298    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
299        let (data_type, nullable) = if self.is_field {
300            let field = &self.schema.fields[0];
301            let data_type = Cow::Borrowed(field.data_type());
302            (data_type, field.is_nullable())
303        } else {
304            let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
305            (data_type, false)
306        };
307
308        let ctx = DecoderContext {
309            coerce_primitive: self.coerce_primitive,
310            strict_mode: self.strict_mode,
311            struct_mode: self.struct_mode,
312        };
313        let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
314
315        let num_fields = self.schema.flattened_fields().len();
316
317        Ok(Decoder {
318            decoder,
319            is_field: self.is_field,
320            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
321            batch_size: self.batch_size,
322            schema: self.schema,
323        })
324    }
325}
326
327/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
328///
329/// Lines consisting solely of ASCII whitespace are ignored
330pub struct Reader<R> {
331    reader: R,
332    decoder: Decoder,
333}
334
335impl<R> std::fmt::Debug for Reader<R> {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        f.debug_struct("Reader")
338            .field("decoder", &self.decoder)
339            .finish()
340    }
341}
342
343impl<R: BufRead> Reader<R> {
344    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
345    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
346        loop {
347            let buf = self.reader.fill_buf()?;
348            if buf.is_empty() {
349                break;
350            }
351            let read = buf.len();
352
353            let decoded = self.decoder.decode(buf)?;
354            self.reader.consume(decoded);
355            if decoded != read {
356                break;
357            }
358        }
359        self.decoder.flush()
360    }
361}
362
363impl<R: BufRead> Iterator for Reader<R> {
364    type Item = Result<RecordBatch, ArrowError>;
365
366    fn next(&mut self) -> Option<Self::Item> {
367        self.read().transpose()
368    }
369}
370
371impl<R: BufRead> RecordBatchReader for Reader<R> {
372    fn schema(&self) -> SchemaRef {
373        self.decoder.schema.clone()
374    }
375}
376
377/// A low-level interface for reading JSON data from a byte stream
378///
379/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
380///
381/// The push-based interface facilitates integration with sources that yield arbitrarily
382/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
383/// object storage
384///
385/// ```
386/// # use std::io::BufRead;
387/// # use arrow_array::RecordBatch;
388/// # use arrow_json::reader::{Decoder, ReaderBuilder};
389/// # use arrow_schema::{ArrowError, SchemaRef};
390/// #
391/// fn read_from_json<R: BufRead>(
392///     mut reader: R,
393///     schema: SchemaRef,
394/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
395///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
396///     let mut next = move || {
397///         loop {
398///             // Decoder is agnostic that buf doesn't contain whole records
399///             let buf = reader.fill_buf()?;
400///             if buf.is_empty() {
401///                 break; // Input exhausted
402///             }
403///             let read = buf.len();
404///             let decoded = decoder.decode(buf)?;
405///
406///             // Consume the number of bytes read
407///             reader.consume(decoded);
408///             if decoded != read {
409///                 break; // Read batch size
410///             }
411///         }
412///         decoder.flush()
413///     };
414///     Ok(std::iter::from_fn(move || next().transpose()))
415/// }
416/// ```
417pub struct Decoder {
418    tape_decoder: TapeDecoder,
419    decoder: Box<dyn ArrayDecoder>,
420    batch_size: usize,
421    is_field: bool,
422    schema: SchemaRef,
423}
424
425impl std::fmt::Debug for Decoder {
426    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427        f.debug_struct("Decoder")
428            .field("schema", &self.schema)
429            .field("batch_size", &self.batch_size)
430            .finish()
431    }
432}
433
434impl Decoder {
435    /// Read JSON objects from `buf`, returning the number of bytes read
436    ///
437    /// This method returns once `batch_size` objects have been parsed since the
438    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
439    /// should be included in the next call to [`Self::decode`]
440    ///
441    /// There is no requirement that `buf` contains a whole number of records, facilitating
442    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
443    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
444        self.tape_decoder.decode(buf)
445    }
446
447    /// Serialize `rows` to this [`Decoder`]
448    ///
449    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
450    /// [`RecordBatch`].
451    ///
452    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
453    /// especially where the schema is known at compile-time, however, this provides a mechanism
454    /// to get something up and running quickly
455    ///
456    /// It can be used with [`serde_json::Value`]
457    ///
458    /// ```
459    /// # use std::sync::Arc;
460    /// # use serde_json::{Value, json};
461    /// # use arrow_array::cast::AsArray;
462    /// # use arrow_array::types::Float32Type;
463    /// # use arrow_json::ReaderBuilder;
464    /// # use arrow_schema::{DataType, Field, Schema};
465    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
466    ///
467    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
468    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
469    ///
470    /// decoder.serialize(&json).unwrap();
471    /// let batch = decoder.flush().unwrap().unwrap();
472    /// assert_eq!(batch.num_rows(), 2);
473    /// assert_eq!(batch.num_columns(), 1);
474    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
475    /// assert_eq!(values, &[2.3, 5.7])
476    /// ```
477    ///
478    /// Or with arbitrary [`Serialize`] types
479    ///
480    /// ```
481    /// # use std::sync::Arc;
482    /// # use arrow_json::ReaderBuilder;
483    /// # use arrow_schema::{DataType, Field, Schema};
484    /// # use serde::Serialize;
485    /// # use arrow_array::cast::AsArray;
486    /// # use arrow_array::types::{Float32Type, Int32Type};
487    /// #
488    /// #[derive(Serialize)]
489    /// struct MyStruct {
490    ///     int32: i32,
491    ///     float: f32,
492    /// }
493    ///
494    /// let schema = Schema::new(vec![
495    ///     Field::new("int32", DataType::Int32, false),
496    ///     Field::new("float", DataType::Float32, false),
497    /// ]);
498    ///
499    /// let rows = vec![
500    ///     MyStruct{ int32: 0, float: 3. },
501    ///     MyStruct{ int32: 4, float: 67.53 },
502    /// ];
503    ///
504    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
505    /// decoder.serialize(&rows).unwrap();
506    ///
507    /// let batch = decoder.flush().unwrap().unwrap();
508    ///
509    /// // Expect batch containing two columns
510    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
511    /// assert_eq!(int32.values(), &[0, 4]);
512    ///
513    /// let float = batch.column(1).as_primitive::<Float32Type>();
514    /// assert_eq!(float.values(), &[3., 67.53]);
515    /// ```
516    ///
517    /// Or even complex nested types
518    ///
519    /// ```
520    /// # use std::collections::BTreeMap;
521    /// # use std::sync::Arc;
522    /// # use arrow_array::StructArray;
523    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
524    /// # use arrow_json::ReaderBuilder;
525    /// # use arrow_schema::{DataType, Field, Fields, Schema};
526    /// # use serde::Serialize;
527    /// #
528    /// #[derive(Serialize)]
529    /// struct MyStruct {
530    ///     int32: i32,
531    ///     list: Vec<f64>,
532    ///     nested: Vec<Option<Nested>>,
533    /// }
534    ///
535    /// impl MyStruct {
536    ///     /// Returns the [`Fields`] for [`MyStruct`]
537    ///     fn fields() -> Fields {
538    ///         let nested = DataType::Struct(Nested::fields());
539    ///         Fields::from([
540    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
541    ///             Arc::new(Field::new_list(
542    ///                 "list",
543    ///                 Field::new("element", DataType::Float64, false),
544    ///                 false,
545    ///             )),
546    ///             Arc::new(Field::new_list(
547    ///                 "nested",
548    ///                 Field::new("element", nested, true),
549    ///                 true,
550    ///             )),
551    ///         ])
552    ///     }
553    /// }
554    ///
555    /// #[derive(Serialize)]
556    /// struct Nested {
557    ///     map: BTreeMap<String, Vec<String>>
558    /// }
559    ///
560    /// impl Nested {
561    ///     /// Returns the [`Fields`] for [`Nested`]
562    ///     fn fields() -> Fields {
563    ///         let element = Field::new("element", DataType::Utf8, false);
564    ///         Fields::from([
565    ///             Arc::new(Field::new_map(
566    ///                 "map",
567    ///                 "entries",
568    ///                 Field::new("key", DataType::Utf8, false),
569    ///                 Field::new_list("value", element, false),
570    ///                 false, // sorted
571    ///                 false, // nullable
572    ///             ))
573    ///         ])
574    ///     }
575    /// }
576    ///
577    /// let data = vec![
578    ///     MyStruct {
579    ///         int32: 34,
580    ///         list: vec![1., 2., 34.],
581    ///         nested: vec![
582    ///             None,
583    ///             Some(Nested {
584    ///                 map: vec![
585    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
586    ///                     ("key2".to_string(), vec!["baz".to_string()])
587    ///                 ].into_iter().collect()
588    ///             })
589    ///         ]
590    ///     },
591    ///     MyStruct {
592    ///         int32: 56,
593    ///         list: vec![],
594    ///         nested: vec![]
595    ///     },
596    ///     MyStruct {
597    ///         int32: 24,
598    ///         list: vec![-1., 245.],
599    ///         nested: vec![None]
600    ///     }
601    /// ];
602    ///
603    /// let schema = Schema::new(MyStruct::fields());
604    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
605    /// decoder.serialize(&data).unwrap();
606    /// let batch = decoder.flush().unwrap().unwrap();
607    /// assert_eq!(batch.num_rows(), 3);
608    /// assert_eq!(batch.num_columns(), 3);
609    ///
610    /// // Convert to StructArray to format
611    /// let s = StructArray::from(batch);
612    /// let options = FormatOptions::default().with_null("null");
613    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
614    ///
615    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
616    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
617    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
618    /// ```
619    ///
620    /// Note: this ignores any batch size setting, and always decodes all rows
621    ///
622    /// [serde]: https://docs.rs/serde/latest/serde/
623    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
624        self.tape_decoder.serialize(rows)
625    }
626
627    /// True if the decoder is currently part way through decoding a record.
628    pub fn has_partial_record(&self) -> bool {
629        self.tape_decoder.has_partial_row()
630    }
631
632    /// The number of unflushed records, including the partially decoded record (if any).
633    pub fn len(&self) -> usize {
634        self.tape_decoder.num_buffered_rows()
635    }
636
637    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
638    pub fn is_empty(&self) -> bool {
639        self.len() == 0
640    }
641
642    /// Flushes the currently buffered data to a [`RecordBatch`]
643    ///
644    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
645    ///
646    /// Note: This will return an error if called part way through decoding a record,
647    /// i.e. [`Self::has_partial_record`] is true.
648    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
649        let tape = self.tape_decoder.finish()?;
650
651        if tape.num_rows() == 0 {
652            return Ok(None);
653        }
654
655        // First offset is null sentinel
656        let mut next_object = 1;
657        let pos: Vec<_> = (0..tape.num_rows())
658            .map(|_| {
659                let next = tape.next(next_object, "row").unwrap();
660                std::mem::replace(&mut next_object, next)
661            })
662            .collect();
663
664        let decoded = self.decoder.decode(&tape, &pos)?;
665        self.tape_decoder.clear();
666
667        let batch = match self.is_field {
668            true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
669            false => {
670                RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
671            }
672        };
673
674        Ok(Some(batch))
675    }
676}
677
678trait ArrayDecoder: Send {
679    /// Decode elements from `tape` starting at the indexes contained in `pos`
680    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
681}
682
683/// Context for decoder creation, containing configuration.
684///
685/// This context is passed through the decoder creation process and contains
686/// all the configuration needed to create decoders recursively.
687pub struct DecoderContext {
688    /// Whether to coerce primitives to strings
689    coerce_primitive: bool,
690    /// Whether to validate struct fields strictly
691    strict_mode: bool,
692    /// How to decode struct fields
693    struct_mode: StructMode,
694}
695
696impl DecoderContext {
697    /// Returns whether to coerce primitive types (e.g., number to string)
698    pub fn coerce_primitive(&self) -> bool {
699        self.coerce_primitive
700    }
701
702    /// Returns whether to validate struct fields strictly
703    pub fn strict_mode(&self) -> bool {
704        self.strict_mode
705    }
706
707    /// Returns how to decode struct fields
708    pub fn struct_mode(&self) -> StructMode {
709        self.struct_mode
710    }
711
712    /// Create a decoder for a type.
713    ///
714    /// This is the standard way to create child decoders from within a decoder
715    /// implementation.
716    fn make_decoder(
717        &self,
718        data_type: &DataType,
719        is_nullable: bool,
720    ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
721        make_decoder(self, data_type, is_nullable)
722    }
723}
724
725macro_rules! primitive_decoder {
726    ($t:ty, $data_type:expr) => {
727        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
728    };
729}
730
731fn make_decoder(
732    ctx: &DecoderContext,
733    data_type: &DataType,
734    is_nullable: bool,
735) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
736    let coerce_primitive = ctx.coerce_primitive();
737    downcast_integer! {
738        *data_type => (primitive_decoder, data_type),
739        DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
740        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
741        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
742        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
743        DataType::Timestamp(TimeUnit::Second, None) => {
744            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
745        },
746        DataType::Timestamp(TimeUnit::Millisecond, None) => {
747            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
748        },
749        DataType::Timestamp(TimeUnit::Microsecond, None) => {
750            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
751        },
752        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
753            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
754        },
755        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
756            let tz: Tz = tz.parse()?;
757            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
758        },
759        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
760            let tz: Tz = tz.parse()?;
761            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
762        },
763        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
764            let tz: Tz = tz.parse()?;
765            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
766        },
767        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
768            let tz: Tz = tz.parse()?;
769            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
770        },
771        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
772        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
773        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
774        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
775        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
776        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
777        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
778        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
779        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
780        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
781        DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
782        DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
783        DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
784        DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
785        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
786        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
787        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
788        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
789        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
790        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
791        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
792        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
793        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
794        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
795        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
796        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
797        _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
798    }
799}
800
801#[cfg(test)]
802mod tests {
803    use serde_json::json;
804    use std::fs::File;
805    use std::io::{BufReader, Cursor, Seek};
806
807    use arrow_array::cast::AsArray;
808    use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
809    use arrow_buffer::{ArrowNativeType, Buffer};
810    use arrow_cast::display::{ArrayFormatter, FormatOptions};
811    use arrow_data::ArrayDataBuilder;
812    use arrow_schema::{Field, Fields};
813
814    use super::*;
815
816    fn do_read(
817        buf: &str,
818        batch_size: usize,
819        coerce_primitive: bool,
820        strict_mode: bool,
821        schema: SchemaRef,
822    ) -> Vec<RecordBatch> {
823        let mut unbuffered = vec![];
824
825        // Test with different batch sizes to test for boundary conditions
826        for batch_size in [1, 3, 100, batch_size] {
827            unbuffered = ReaderBuilder::new(schema.clone())
828                .with_batch_size(batch_size)
829                .with_coerce_primitive(coerce_primitive)
830                .build(Cursor::new(buf.as_bytes()))
831                .unwrap()
832                .collect::<Result<Vec<_>, _>>()
833                .unwrap();
834
835            for b in unbuffered.iter().take(unbuffered.len() - 1) {
836                assert_eq!(b.num_rows(), batch_size)
837            }
838
839            // Test with different buffer sizes to test for boundary conditions
840            for b in [1, 3, 5] {
841                let buffered = ReaderBuilder::new(schema.clone())
842                    .with_batch_size(batch_size)
843                    .with_coerce_primitive(coerce_primitive)
844                    .with_strict_mode(strict_mode)
845                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
846                    .unwrap()
847                    .collect::<Result<Vec<_>, _>>()
848                    .unwrap();
849                assert_eq!(unbuffered, buffered);
850            }
851        }
852
853        unbuffered
854    }
855
856    #[test]
857    fn test_basic() {
858        let buf = r#"
859        {"a": 1, "b": 2, "c": true, "d": 1}
860        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
861
862        {"b": 6, "a": 2.0, "d": 45}
863        {"b": "5", "a": 2}
864        {"b": 4e0}
865        {"b": 7, "a": null}
866        "#;
867
868        let schema = Arc::new(Schema::new(vec![
869            Field::new("a", DataType::Int64, true),
870            Field::new("b", DataType::Int32, true),
871            Field::new("c", DataType::Boolean, true),
872            Field::new("d", DataType::Date32, true),
873            Field::new("e", DataType::Date64, true),
874        ]));
875
876        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
877        assert!(decoder.is_empty());
878        assert_eq!(decoder.len(), 0);
879        assert!(!decoder.has_partial_record());
880        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
881        assert!(!decoder.is_empty());
882        assert_eq!(decoder.len(), 6);
883        assert!(!decoder.has_partial_record());
884        let batch = decoder.flush().unwrap().unwrap();
885        assert_eq!(batch.num_rows(), 6);
886        assert!(decoder.is_empty());
887        assert_eq!(decoder.len(), 0);
888        assert!(!decoder.has_partial_record());
889
890        let batches = do_read(buf, 1024, false, false, schema);
891        assert_eq!(batches.len(), 1);
892
893        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
894        assert_eq!(col1.null_count(), 2);
895        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
896        assert!(col1.is_null(4));
897        assert!(col1.is_null(5));
898
899        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
900        assert_eq!(col2.null_count(), 0);
901        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
902
903        let col3 = batches[0].column(2).as_boolean();
904        assert_eq!(col3.null_count(), 4);
905        assert!(col3.value(0));
906        assert!(!col3.is_null(0));
907        assert!(!col3.value(1));
908        assert!(!col3.is_null(1));
909
910        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
911        assert_eq!(col4.null_count(), 3);
912        assert!(col4.is_null(3));
913        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
914
915        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
916        assert_eq!(col5.null_count(), 5);
917        assert!(col5.is_null(0));
918        assert!(col5.is_null(2));
919        assert!(col5.is_null(3));
920        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
921    }
922
923    #[test]
924    fn test_string() {
925        let buf = r#"
926        {"a": "1", "b": "2"}
927        {"a": "hello", "b": "shoo"}
928        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
929
930        {"b": null}
931        {"b": "", "a": null}
932
933        "#;
934        let schema = Arc::new(Schema::new(vec![
935            Field::new("a", DataType::Utf8, true),
936            Field::new("b", DataType::LargeUtf8, true),
937        ]));
938
939        let batches = do_read(buf, 1024, false, false, schema);
940        assert_eq!(batches.len(), 1);
941
942        let col1 = batches[0].column(0).as_string::<i32>();
943        assert_eq!(col1.null_count(), 2);
944        assert_eq!(col1.value(0), "1");
945        assert_eq!(col1.value(1), "hello");
946        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
947        assert!(col1.is_null(3));
948        assert!(col1.is_null(4));
949
950        let col2 = batches[0].column(1).as_string::<i64>();
951        assert_eq!(col2.null_count(), 1);
952        assert_eq!(col2.value(0), "2");
953        assert_eq!(col2.value(1), "shoo");
954        assert_eq!(col2.value(2), "\t😁foo");
955        assert!(col2.is_null(3));
956        assert_eq!(col2.value(4), "");
957    }
958
959    #[test]
960    fn test_long_string_view_allocation() {
961        // The JSON input contains field "a" with different string lengths.
962        // According to the implementation in the decoder:
963        // - For a string, capacity is only increased if its length > 12 bytes.
964        // Therefore, for:
965        // Row 1: "short" (5 bytes) -> capacity += 0
966        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
967        // Row 3: "hello" (5 bytes) -> capacity += 0
968        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
969        // Expected total capacity = 24 + 17 = 41
970        let expected_capacity: usize = 41;
971
972        let buf = r#"
973        {"a": "short", "b": "dummy"}
974        {"a": "this is definitely long", "b": "dummy"}
975        {"a": "hello", "b": "dummy"}
976        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
977        "#;
978
979        let schema = Arc::new(Schema::new(vec![
980            Field::new("a", DataType::Utf8View, true),
981            Field::new("b", DataType::LargeUtf8, true),
982        ]));
983
984        let batches = do_read(buf, 1024, false, false, schema);
985        assert_eq!(batches.len(), 1, "Expected one record batch");
986
987        // Get the first column ("a") as a StringViewArray.
988        let col_a = batches[0].column(0);
989        let string_view_array = col_a
990            .as_any()
991            .downcast_ref::<StringViewArray>()
992            .expect("Column should be a StringViewArray");
993
994        // Retrieve the underlying data buffer from the array.
995        // The builder pre-allocates capacity based on the sum of lengths for long strings.
996        let data_buffer = string_view_array.to_data().buffers()[0].len();
997
998        // Check that the allocated capacity is at least what we expected.
999        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
1000        assert!(
1001            data_buffer >= expected_capacity,
1002            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1003        );
1004
1005        // Additionally, verify that the decoded values are correct.
1006        assert_eq!(string_view_array.value(0), "short");
1007        assert_eq!(string_view_array.value(1), "this is definitely long");
1008        assert_eq!(string_view_array.value(2), "hello");
1009        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1010    }
1011
1012    /// Test the memory capacity allocation logic when converting numeric types to strings.
1013    #[test]
1014    fn test_numeric_view_allocation() {
1015        // For numeric types, the expected capacity calculation is as follows:
1016        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
1017        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
1018        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
1019        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
1020        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
1021        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
1022        let expected_capacity: usize = 33;
1023
1024        let buf = r#"
1025    {"n": 123456789}
1026    {"n": 1000000000000}
1027    {"n": 3.1415}
1028    {"n": 2.718281828459045}
1029    "#;
1030
1031        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1032
1033        let batches = do_read(buf, 1024, true, false, schema);
1034        assert_eq!(batches.len(), 1, "Expected one record batch");
1035
1036        let col_n = batches[0].column(0);
1037        let string_view_array = col_n
1038            .as_any()
1039            .downcast_ref::<StringViewArray>()
1040            .expect("Column should be a StringViewArray");
1041
1042        // Check that the underlying data buffer capacity is at least the expected value.
1043        let data_buffer = string_view_array.to_data().buffers()[0].len();
1044        assert!(
1045            data_buffer >= expected_capacity,
1046            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1047        );
1048
1049        // Verify that the converted string values are correct.
1050        // Note: The format of the number converted to a string should match the actual implementation.
1051        assert_eq!(string_view_array.value(0), "123456789");
1052        assert_eq!(string_view_array.value(1), "1000000000000");
1053        assert_eq!(string_view_array.value(2), "3.1415");
1054        assert_eq!(string_view_array.value(3), "2.718281828459045");
1055    }
1056
1057    #[test]
1058    fn test_string_with_uft8view() {
1059        let buf = r#"
1060        {"a": "1", "b": "2"}
1061        {"a": "hello", "b": "shoo"}
1062        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1063
1064        {"b": null}
1065        {"b": "", "a": null}
1066
1067        "#;
1068        let schema = Arc::new(Schema::new(vec![
1069            Field::new("a", DataType::Utf8View, true),
1070            Field::new("b", DataType::LargeUtf8, true),
1071        ]));
1072
1073        let batches = do_read(buf, 1024, false, false, schema);
1074        assert_eq!(batches.len(), 1);
1075
1076        let col1 = batches[0].column(0).as_string_view();
1077        assert_eq!(col1.null_count(), 2);
1078        assert_eq!(col1.value(0), "1");
1079        assert_eq!(col1.value(1), "hello");
1080        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1081        assert!(col1.is_null(3));
1082        assert!(col1.is_null(4));
1083        assert_eq!(col1.data_type(), &DataType::Utf8View);
1084
1085        let col2 = batches[0].column(1).as_string::<i64>();
1086        assert_eq!(col2.null_count(), 1);
1087        assert_eq!(col2.value(0), "2");
1088        assert_eq!(col2.value(1), "shoo");
1089        assert_eq!(col2.value(2), "\t😁foo");
1090        assert!(col2.is_null(3));
1091        assert_eq!(col2.value(4), "");
1092    }
1093
1094    #[test]
1095    fn test_complex() {
1096        let buf = r#"
1097           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1098           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1099           {"list": null, "nested": {"a": null}}
1100        "#;
1101
1102        let schema = Arc::new(Schema::new(vec![
1103            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1104            Field::new_struct(
1105                "nested",
1106                vec![
1107                    Field::new("a", DataType::Int32, true),
1108                    Field::new("b", DataType::Int32, true),
1109                ],
1110                true,
1111            ),
1112            Field::new_struct(
1113                "nested_list",
1114                vec![Field::new_list(
1115                    "list2",
1116                    Field::new_struct(
1117                        "element",
1118                        vec![Field::new("c", DataType::Int32, false)],
1119                        false,
1120                    ),
1121                    true,
1122                )],
1123                true,
1124            ),
1125        ]));
1126
1127        let batches = do_read(buf, 1024, false, false, schema);
1128        assert_eq!(batches.len(), 1);
1129
1130        let list = batches[0].column(0).as_list::<i32>();
1131        assert_eq!(list.len(), 3);
1132        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1133        assert_eq!(list.null_count(), 1);
1134        assert!(list.is_null(2));
1135        let list_values = list.values().as_primitive::<Int32Type>();
1136        assert_eq!(list_values.values(), &[5, 6]);
1137
1138        let nested = batches[0].column(1).as_struct();
1139        let a = nested.column(0).as_primitive::<Int32Type>();
1140        assert_eq!(list.null_count(), 1);
1141        assert_eq!(a.values(), &[1, 7, 0]);
1142        assert!(list.is_null(2));
1143
1144        let b = nested.column(1).as_primitive::<Int32Type>();
1145        assert_eq!(b.null_count(), 2);
1146        assert_eq!(b.len(), 3);
1147        assert_eq!(b.value(0), 2);
1148        assert!(b.is_null(1));
1149        assert!(b.is_null(2));
1150
1151        let nested_list = batches[0].column(2).as_struct();
1152        assert_eq!(nested_list.len(), 3);
1153        assert_eq!(nested_list.null_count(), 1);
1154        assert!(nested_list.is_null(2));
1155
1156        let list2 = nested_list.column(0).as_list::<i32>();
1157        assert_eq!(list2.len(), 3);
1158        assert_eq!(list2.null_count(), 1);
1159        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1160        assert!(list2.is_null(2));
1161
1162        let list2_values = list2.values().as_struct();
1163
1164        let c = list2_values.column(0).as_primitive::<Int32Type>();
1165        assert_eq!(c.values(), &[3, 4]);
1166    }
1167
1168    #[test]
1169    fn test_projection() {
1170        let buf = r#"
1171           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1172           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1173        "#;
1174
1175        let schema = Arc::new(Schema::new(vec![
1176            Field::new_struct(
1177                "nested",
1178                vec![Field::new("a", DataType::Int32, false)],
1179                true,
1180            ),
1181            Field::new_struct(
1182                "nested_list",
1183                vec![Field::new_list(
1184                    "list2",
1185                    Field::new_struct(
1186                        "element",
1187                        vec![Field::new("d", DataType::Int32, true)],
1188                        false,
1189                    ),
1190                    true,
1191                )],
1192                true,
1193            ),
1194        ]));
1195
1196        let batches = do_read(buf, 1024, false, false, schema);
1197        assert_eq!(batches.len(), 1);
1198
1199        let nested = batches[0].column(0).as_struct();
1200        assert_eq!(nested.num_columns(), 1);
1201        let a = nested.column(0).as_primitive::<Int32Type>();
1202        assert_eq!(a.null_count(), 0);
1203        assert_eq!(a.values(), &[1, 7]);
1204
1205        let nested_list = batches[0].column(1).as_struct();
1206        assert_eq!(nested_list.num_columns(), 1);
1207        assert_eq!(nested_list.null_count(), 0);
1208
1209        let list2 = nested_list.column(0).as_list::<i32>();
1210        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1211        assert_eq!(list2.null_count(), 0);
1212
1213        let child = list2.values().as_struct();
1214        assert_eq!(child.num_columns(), 1);
1215        assert_eq!(child.len(), 2);
1216        assert_eq!(child.null_count(), 0);
1217
1218        let c = child.column(0).as_primitive::<Int32Type>();
1219        assert_eq!(c.values(), &[5, 0]);
1220        assert_eq!(c.null_count(), 1);
1221        assert!(c.is_null(1));
1222    }
1223
1224    #[test]
1225    fn test_map() {
1226        let buf = r#"
1227           {"map": {"a": ["foo", null]}}
1228           {"map": {"a": [null], "b": []}}
1229           {"map": {"c": null, "a": ["baz"]}}
1230        "#;
1231        let map = Field::new_map(
1232            "map",
1233            "entries",
1234            Field::new("key", DataType::Utf8, false),
1235            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1236            false,
1237            true,
1238        );
1239
1240        let schema = Arc::new(Schema::new(vec![map]));
1241
1242        let batches = do_read(buf, 1024, false, false, schema);
1243        assert_eq!(batches.len(), 1);
1244
1245        let map = batches[0].column(0).as_map();
1246        let map_keys = map.keys().as_string::<i32>();
1247        let map_values = map.values().as_list::<i32>();
1248        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1249
1250        let k: Vec<_> = map_keys.iter().flatten().collect();
1251        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1252
1253        let list_values = map_values.values().as_string::<i32>();
1254        let lv: Vec<_> = list_values.iter().collect();
1255        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1256        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1257        assert_eq!(map_values.null_count(), 1);
1258        assert!(map_values.is_null(3));
1259
1260        let options = FormatOptions::default().with_null("null");
1261        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1262        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1263        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1264        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1265    }
1266
1267    #[test]
1268    fn test_not_coercing_primitive_into_string_without_flag() {
1269        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1270
1271        let buf = r#"{"a": 1}"#;
1272        let err = ReaderBuilder::new(schema.clone())
1273            .with_batch_size(1024)
1274            .build(Cursor::new(buf.as_bytes()))
1275            .unwrap()
1276            .read()
1277            .unwrap_err();
1278
1279        assert_eq!(
1280            err.to_string(),
1281            "Json error: whilst decoding field 'a': expected string got 1"
1282        );
1283
1284        let buf = r#"{"a": true}"#;
1285        let err = ReaderBuilder::new(schema)
1286            .with_batch_size(1024)
1287            .build(Cursor::new(buf.as_bytes()))
1288            .unwrap()
1289            .read()
1290            .unwrap_err();
1291
1292        assert_eq!(
1293            err.to_string(),
1294            "Json error: whilst decoding field 'a': expected string got true"
1295        );
1296    }
1297
1298    #[test]
1299    fn test_coercing_primitive_into_string() {
1300        let buf = r#"
1301        {"a": 1, "b": 2, "c": true}
1302        {"a": 2E0, "b": 4, "c": false}
1303
1304        {"b": 6, "a": 2.0}
1305        {"b": "5", "a": 2}
1306        {"b": 4e0}
1307        {"b": 7, "a": null}
1308        "#;
1309
1310        let schema = Arc::new(Schema::new(vec![
1311            Field::new("a", DataType::Utf8, true),
1312            Field::new("b", DataType::Utf8, true),
1313            Field::new("c", DataType::Utf8, true),
1314        ]));
1315
1316        let batches = do_read(buf, 1024, true, false, schema);
1317        assert_eq!(batches.len(), 1);
1318
1319        let col1 = batches[0].column(0).as_string::<i32>();
1320        assert_eq!(col1.null_count(), 2);
1321        assert_eq!(col1.value(0), "1");
1322        assert_eq!(col1.value(1), "2E0");
1323        assert_eq!(col1.value(2), "2.0");
1324        assert_eq!(col1.value(3), "2");
1325        assert!(col1.is_null(4));
1326        assert!(col1.is_null(5));
1327
1328        let col2 = batches[0].column(1).as_string::<i32>();
1329        assert_eq!(col2.null_count(), 0);
1330        assert_eq!(col2.value(0), "2");
1331        assert_eq!(col2.value(1), "4");
1332        assert_eq!(col2.value(2), "6");
1333        assert_eq!(col2.value(3), "5");
1334        assert_eq!(col2.value(4), "4e0");
1335        assert_eq!(col2.value(5), "7");
1336
1337        let col3 = batches[0].column(2).as_string::<i32>();
1338        assert_eq!(col3.null_count(), 4);
1339        assert_eq!(col3.value(0), "true");
1340        assert_eq!(col3.value(1), "false");
1341        assert!(col3.is_null(2));
1342        assert!(col3.is_null(3));
1343        assert!(col3.is_null(4));
1344        assert!(col3.is_null(5));
1345    }
1346
1347    fn test_decimal<T: DecimalType>(data_type: DataType) {
1348        let buf = r#"
1349        {"a": 1, "b": 2, "c": 38.30}
1350        {"a": 2, "b": 4, "c": 123.456}
1351
1352        {"b": 1337, "a": "2.0452"}
1353        {"b": "5", "a": "11034.2"}
1354        {"b": 40}
1355        {"b": 1234, "a": null}
1356        "#;
1357
1358        let schema = Arc::new(Schema::new(vec![
1359            Field::new("a", data_type.clone(), true),
1360            Field::new("b", data_type.clone(), true),
1361            Field::new("c", data_type, true),
1362        ]));
1363
1364        let batches = do_read(buf, 1024, true, false, schema);
1365        assert_eq!(batches.len(), 1);
1366
1367        let col1 = batches[0].column(0).as_primitive::<T>();
1368        assert_eq!(col1.null_count(), 2);
1369        assert!(col1.is_null(4));
1370        assert!(col1.is_null(5));
1371        assert_eq!(
1372            col1.values(),
1373            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1374        );
1375
1376        let col2 = batches[0].column(1).as_primitive::<T>();
1377        assert_eq!(col2.null_count(), 0);
1378        assert_eq!(
1379            col2.values(),
1380            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1381        );
1382
1383        let col3 = batches[0].column(2).as_primitive::<T>();
1384        assert_eq!(col3.null_count(), 4);
1385        assert!(!col3.is_null(0));
1386        assert!(!col3.is_null(1));
1387        assert!(col3.is_null(2));
1388        assert!(col3.is_null(3));
1389        assert!(col3.is_null(4));
1390        assert!(col3.is_null(5));
1391        assert_eq!(
1392            col3.values(),
1393            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1394        );
1395    }
1396
1397    #[test]
1398    fn test_decimals() {
1399        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1400        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1401        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1402        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1403    }
1404
1405    fn test_timestamp<T: ArrowTimestampType>() {
1406        let buf = r#"
1407        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1408        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1409
1410        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1411        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1412        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1413        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1414        "#;
1415
1416        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1417        let schema = Arc::new(Schema::new(vec![
1418            Field::new("a", T::DATA_TYPE, true),
1419            Field::new("b", T::DATA_TYPE, true),
1420            Field::new("c", T::DATA_TYPE, true),
1421            Field::new("d", with_timezone, true),
1422        ]));
1423
1424        let batches = do_read(buf, 1024, true, false, schema);
1425        assert_eq!(batches.len(), 1);
1426
1427        let unit_in_nanos: i64 = match T::UNIT {
1428            TimeUnit::Second => 1_000_000_000,
1429            TimeUnit::Millisecond => 1_000_000,
1430            TimeUnit::Microsecond => 1_000,
1431            TimeUnit::Nanosecond => 1,
1432        };
1433
1434        let col1 = batches[0].column(0).as_primitive::<T>();
1435        assert_eq!(col1.null_count(), 4);
1436        assert!(col1.is_null(2));
1437        assert!(col1.is_null(3));
1438        assert!(col1.is_null(4));
1439        assert!(col1.is_null(5));
1440        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1441
1442        let col2 = batches[0].column(1).as_primitive::<T>();
1443        assert_eq!(col2.null_count(), 1);
1444        assert!(col2.is_null(5));
1445        assert_eq!(
1446            col2.values(),
1447            &[
1448                1599572549190855000 / unit_in_nanos,
1449                1599572549190855000 / unit_in_nanos,
1450                1599572549000000000 / unit_in_nanos,
1451                40,
1452                1234,
1453                0
1454            ]
1455        );
1456
1457        let col3 = batches[0].column(2).as_primitive::<T>();
1458        assert_eq!(col3.null_count(), 0);
1459        assert_eq!(
1460            col3.values(),
1461            &[
1462                38,
1463                123,
1464                854702816123000000 / unit_in_nanos,
1465                1599572549190855000 / unit_in_nanos,
1466                854702816123000000 / unit_in_nanos,
1467                854738816123000000 / unit_in_nanos
1468            ]
1469        );
1470
1471        let col4 = batches[0].column(3).as_primitive::<T>();
1472
1473        assert_eq!(col4.null_count(), 0);
1474        assert_eq!(
1475            col4.values(),
1476            &[
1477                854674016123000000 / unit_in_nanos,
1478                123,
1479                854702816123000000 / unit_in_nanos,
1480                854720816123000000 / unit_in_nanos,
1481                854674016000000000 / unit_in_nanos,
1482                854640000000000000 / unit_in_nanos
1483            ]
1484        );
1485    }
1486
1487    #[test]
1488    fn test_timestamps() {
1489        test_timestamp::<TimestampSecondType>();
1490        test_timestamp::<TimestampMillisecondType>();
1491        test_timestamp::<TimestampMicrosecondType>();
1492        test_timestamp::<TimestampNanosecondType>();
1493    }
1494
1495    fn test_time<T: ArrowTemporalType>() {
1496        let buf = r#"
1497        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1498        {"a": 2, "b": "23:59:59", "c": 123.456}
1499
1500        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1501        {"b": 40, "c": "13:42:29.190855"}
1502        {"b": 1234, "a": null, "c": "09:26:56.123"}
1503        {"c": "14:26:56.123"}
1504        "#;
1505
1506        let unit = match T::DATA_TYPE {
1507            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1508            _ => unreachable!(),
1509        };
1510
1511        let unit_in_nanos = match unit {
1512            TimeUnit::Second => 1_000_000_000,
1513            TimeUnit::Millisecond => 1_000_000,
1514            TimeUnit::Microsecond => 1_000,
1515            TimeUnit::Nanosecond => 1,
1516        };
1517
1518        let schema = Arc::new(Schema::new(vec![
1519            Field::new("a", T::DATA_TYPE, true),
1520            Field::new("b", T::DATA_TYPE, true),
1521            Field::new("c", T::DATA_TYPE, true),
1522        ]));
1523
1524        let batches = do_read(buf, 1024, true, false, schema);
1525        assert_eq!(batches.len(), 1);
1526
1527        let col1 = batches[0].column(0).as_primitive::<T>();
1528        assert_eq!(col1.null_count(), 4);
1529        assert!(col1.is_null(2));
1530        assert!(col1.is_null(3));
1531        assert!(col1.is_null(4));
1532        assert!(col1.is_null(5));
1533        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1534
1535        let col2 = batches[0].column(1).as_primitive::<T>();
1536        assert_eq!(col2.null_count(), 1);
1537        assert!(col2.is_null(5));
1538        assert_eq!(
1539            col2.values(),
1540            &[
1541                34016123000000 / unit_in_nanos,
1542                86399000000000 / unit_in_nanos,
1543                64800000000000 / unit_in_nanos,
1544                40,
1545                1234,
1546                0
1547            ]
1548            .map(T::Native::usize_as)
1549        );
1550
1551        let col3 = batches[0].column(2).as_primitive::<T>();
1552        assert_eq!(col3.null_count(), 0);
1553        assert_eq!(
1554            col3.values(),
1555            &[
1556                38,
1557                123,
1558                34016123000000 / unit_in_nanos,
1559                49349190855000 / unit_in_nanos,
1560                34016123000000 / unit_in_nanos,
1561                52016123000000 / unit_in_nanos
1562            ]
1563            .map(T::Native::usize_as)
1564        );
1565    }
1566
1567    #[test]
1568    fn test_times() {
1569        test_time::<Time32MillisecondType>();
1570        test_time::<Time32SecondType>();
1571        test_time::<Time64MicrosecondType>();
1572        test_time::<Time64NanosecondType>();
1573    }
1574
1575    fn test_duration<T: ArrowTemporalType>() {
1576        let buf = r#"
1577        {"a": 1, "b": "2"}
1578        {"a": 3, "b": null}
1579        "#;
1580
1581        let schema = Arc::new(Schema::new(vec![
1582            Field::new("a", T::DATA_TYPE, true),
1583            Field::new("b", T::DATA_TYPE, true),
1584        ]));
1585
1586        let batches = do_read(buf, 1024, true, false, schema);
1587        assert_eq!(batches.len(), 1);
1588
1589        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1590        assert_eq!(col_a.null_count(), 0);
1591        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1592
1593        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1594        assert_eq!(col2.null_count(), 1);
1595        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1596    }
1597
1598    #[test]
1599    fn test_durations() {
1600        test_duration::<DurationNanosecondType>();
1601        test_duration::<DurationMicrosecondType>();
1602        test_duration::<DurationMillisecondType>();
1603        test_duration::<DurationSecondType>();
1604    }
1605
1606    #[test]
1607    fn test_delta_checkpoint() {
1608        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1609        let schema = Arc::new(Schema::new(vec![
1610            Field::new_struct(
1611                "protocol",
1612                vec![
1613                    Field::new("minReaderVersion", DataType::Int32, true),
1614                    Field::new("minWriterVersion", DataType::Int32, true),
1615                ],
1616                true,
1617            ),
1618            Field::new_struct(
1619                "add",
1620                vec![Field::new_map(
1621                    "partitionValues",
1622                    "key_value",
1623                    Field::new("key", DataType::Utf8, false),
1624                    Field::new("value", DataType::Utf8, true),
1625                    false,
1626                    false,
1627                )],
1628                true,
1629            ),
1630        ]));
1631
1632        let batches = do_read(json, 1024, true, false, schema);
1633        assert_eq!(batches.len(), 1);
1634
1635        let s: StructArray = batches.into_iter().next().unwrap().into();
1636        let opts = FormatOptions::default().with_null("null");
1637        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1638        assert_eq!(
1639            formatter.value(0).to_string(),
1640            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1641        );
1642    }
1643
1644    #[test]
1645    fn struct_nullability() {
1646        let do_test = |child: DataType| {
1647            // Test correctly enforced nullability
1648            let non_null = r#"{"foo": {}}"#;
1649            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1650                "foo",
1651                vec![Field::new("bar", child, false)],
1652                true,
1653            )]));
1654            let mut reader = ReaderBuilder::new(schema.clone())
1655                .build(Cursor::new(non_null.as_bytes()))
1656                .unwrap();
1657            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1658
1659            let null = r#"{"foo": {bar: null}}"#;
1660            let mut reader = ReaderBuilder::new(schema.clone())
1661                .build(Cursor::new(null.as_bytes()))
1662                .unwrap();
1663            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1664
1665            // Test nulls in nullable parent can mask nulls in non-nullable child
1666            let null = r#"{"foo": null}"#;
1667            let mut reader = ReaderBuilder::new(schema)
1668                .build(Cursor::new(null.as_bytes()))
1669                .unwrap();
1670            let batch = reader.next().unwrap().unwrap();
1671            assert_eq!(batch.num_columns(), 1);
1672            let foo = batch.column(0).as_struct();
1673            assert_eq!(foo.len(), 1);
1674            assert!(foo.is_null(0));
1675            assert_eq!(foo.num_columns(), 1);
1676
1677            let bar = foo.column(0);
1678            assert_eq!(bar.len(), 1);
1679            // Non-nullable child can still contain null as masked by parent
1680            assert!(bar.is_null(0));
1681        };
1682
1683        do_test(DataType::Boolean);
1684        do_test(DataType::Int32);
1685        do_test(DataType::Utf8);
1686        do_test(DataType::Decimal128(2, 1));
1687        do_test(DataType::Timestamp(
1688            TimeUnit::Microsecond,
1689            Some("+00:00".into()),
1690        ));
1691    }
1692
1693    #[test]
1694    fn test_truncation() {
1695        let buf = r#"
1696        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1697        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1698        {"i64": -9223372036854775808, "u64": 0 }
1699        {"i64": "-9223372036854775808", "u64": 0 }
1700        "#;
1701
1702        let schema = Arc::new(Schema::new(vec![
1703            Field::new("i64", DataType::Int64, true),
1704            Field::new("u64", DataType::UInt64, true),
1705        ]));
1706
1707        let batches = do_read(buf, 1024, true, false, schema);
1708        assert_eq!(batches.len(), 1);
1709
1710        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1711        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1712
1713        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1714        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1715    }
1716
1717    #[test]
1718    fn test_timestamp_truncation() {
1719        let buf = r#"
1720        {"time": 9223372036854775807 }
1721        {"time": -9223372036854775808 }
1722        {"time": 9e5 }
1723        "#;
1724
1725        let schema = Arc::new(Schema::new(vec![Field::new(
1726            "time",
1727            DataType::Timestamp(TimeUnit::Nanosecond, None),
1728            true,
1729        )]));
1730
1731        let batches = do_read(buf, 1024, true, false, schema);
1732        assert_eq!(batches.len(), 1);
1733
1734        let i64 = batches[0]
1735            .column(0)
1736            .as_primitive::<TimestampNanosecondType>();
1737        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1738    }
1739
1740    #[test]
1741    fn test_strict_mode_no_missing_columns_in_schema() {
1742        let buf = r#"
1743        {"a": 1, "b": "2", "c": true}
1744        {"a": 2E0, "b": "4", "c": false}
1745        "#;
1746
1747        let schema = Arc::new(Schema::new(vec![
1748            Field::new("a", DataType::Int16, false),
1749            Field::new("b", DataType::Utf8, false),
1750            Field::new("c", DataType::Boolean, false),
1751        ]));
1752
1753        let batches = do_read(buf, 1024, true, true, schema);
1754        assert_eq!(batches.len(), 1);
1755
1756        let buf = r#"
1757        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1758        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1759        "#;
1760
1761        let schema = Arc::new(Schema::new(vec![
1762            Field::new("a", DataType::Int16, false),
1763            Field::new("b", DataType::Utf8, false),
1764            Field::new_struct(
1765                "c",
1766                vec![
1767                    Field::new("a", DataType::Boolean, false),
1768                    Field::new("b", DataType::Int16, false),
1769                ],
1770                false,
1771            ),
1772        ]));
1773
1774        let batches = do_read(buf, 1024, true, true, schema);
1775        assert_eq!(batches.len(), 1);
1776    }
1777
1778    #[test]
1779    fn test_strict_mode_missing_columns_in_schema() {
1780        let buf = r#"
1781        {"a": 1, "b": "2", "c": true}
1782        {"a": 2E0, "b": "4", "c": false}
1783        "#;
1784
1785        let schema = Arc::new(Schema::new(vec![
1786            Field::new("a", DataType::Int16, true),
1787            Field::new("c", DataType::Boolean, true),
1788        ]));
1789
1790        let err = ReaderBuilder::new(schema)
1791            .with_batch_size(1024)
1792            .with_strict_mode(true)
1793            .build(Cursor::new(buf.as_bytes()))
1794            .unwrap()
1795            .read()
1796            .unwrap_err();
1797
1798        assert_eq!(
1799            err.to_string(),
1800            "Json error: column 'b' missing from schema"
1801        );
1802
1803        let buf = r#"
1804        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1805        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1806        "#;
1807
1808        let schema = Arc::new(Schema::new(vec![
1809            Field::new("a", DataType::Int16, false),
1810            Field::new("b", DataType::Utf8, false),
1811            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1812        ]));
1813
1814        let err = ReaderBuilder::new(schema)
1815            .with_batch_size(1024)
1816            .with_strict_mode(true)
1817            .build(Cursor::new(buf.as_bytes()))
1818            .unwrap()
1819            .read()
1820            .unwrap_err();
1821
1822        assert_eq!(
1823            err.to_string(),
1824            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1825        );
1826    }
1827
1828    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1829        let file = File::open(path).unwrap();
1830        let mut reader = BufReader::new(file);
1831        let schema = schema.unwrap_or_else(|| {
1832            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1833            reader.rewind().unwrap();
1834            schema
1835        });
1836        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1837        builder.build(reader).unwrap()
1838    }
1839
1840    #[test]
1841    fn test_json_basic() {
1842        let mut reader = read_file("test/data/basic.json", None);
1843        let batch = reader.next().unwrap().unwrap();
1844
1845        assert_eq!(8, batch.num_columns());
1846        assert_eq!(12, batch.num_rows());
1847
1848        let schema = reader.schema();
1849        let batch_schema = batch.schema();
1850        assert_eq!(schema, batch_schema);
1851
1852        let a = schema.column_with_name("a").unwrap();
1853        assert_eq!(0, a.0);
1854        assert_eq!(&DataType::Int64, a.1.data_type());
1855        let b = schema.column_with_name("b").unwrap();
1856        assert_eq!(1, b.0);
1857        assert_eq!(&DataType::Float64, b.1.data_type());
1858        let c = schema.column_with_name("c").unwrap();
1859        assert_eq!(2, c.0);
1860        assert_eq!(&DataType::Boolean, c.1.data_type());
1861        let d = schema.column_with_name("d").unwrap();
1862        assert_eq!(3, d.0);
1863        assert_eq!(&DataType::Utf8, d.1.data_type());
1864
1865        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1866        assert_eq!(1, aa.value(0));
1867        assert_eq!(-10, aa.value(1));
1868        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1869        assert_eq!(2.0, bb.value(0));
1870        assert_eq!(-3.5, bb.value(1));
1871        let cc = batch.column(c.0).as_boolean();
1872        assert!(!cc.value(0));
1873        assert!(cc.value(10));
1874        let dd = batch.column(d.0).as_string::<i32>();
1875        assert_eq!("4", dd.value(0));
1876        assert_eq!("text", dd.value(8));
1877    }
1878
1879    #[test]
1880    fn test_json_empty_projection() {
1881        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1882        let batch = reader.next().unwrap().unwrap();
1883
1884        assert_eq!(0, batch.num_columns());
1885        assert_eq!(12, batch.num_rows());
1886    }
1887
1888    #[test]
1889    fn test_json_basic_with_nulls() {
1890        let mut reader = read_file("test/data/basic_nulls.json", None);
1891        let batch = reader.next().unwrap().unwrap();
1892
1893        assert_eq!(4, batch.num_columns());
1894        assert_eq!(12, batch.num_rows());
1895
1896        let schema = reader.schema();
1897        let batch_schema = batch.schema();
1898        assert_eq!(schema, batch_schema);
1899
1900        let a = schema.column_with_name("a").unwrap();
1901        assert_eq!(&DataType::Int64, a.1.data_type());
1902        let b = schema.column_with_name("b").unwrap();
1903        assert_eq!(&DataType::Float64, b.1.data_type());
1904        let c = schema.column_with_name("c").unwrap();
1905        assert_eq!(&DataType::Boolean, c.1.data_type());
1906        let d = schema.column_with_name("d").unwrap();
1907        assert_eq!(&DataType::Utf8, d.1.data_type());
1908
1909        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1910        assert!(aa.is_valid(0));
1911        assert!(!aa.is_valid(1));
1912        assert!(!aa.is_valid(11));
1913        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1914        assert!(bb.is_valid(0));
1915        assert!(!bb.is_valid(2));
1916        assert!(!bb.is_valid(11));
1917        let cc = batch.column(c.0).as_boolean();
1918        assert!(cc.is_valid(0));
1919        assert!(!cc.is_valid(4));
1920        assert!(!cc.is_valid(11));
1921        let dd = batch.column(d.0).as_string::<i32>();
1922        assert!(!dd.is_valid(0));
1923        assert!(dd.is_valid(1));
1924        assert!(!dd.is_valid(4));
1925        assert!(!dd.is_valid(11));
1926    }
1927
1928    #[test]
1929    fn test_json_basic_schema() {
1930        let schema = Schema::new(vec![
1931            Field::new("a", DataType::Int64, true),
1932            Field::new("b", DataType::Float32, false),
1933            Field::new("c", DataType::Boolean, false),
1934            Field::new("d", DataType::Utf8, false),
1935        ]);
1936
1937        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1938        let reader_schema = reader.schema();
1939        assert_eq!(reader_schema.as_ref(), &schema);
1940        let batch = reader.next().unwrap().unwrap();
1941
1942        assert_eq!(4, batch.num_columns());
1943        assert_eq!(12, batch.num_rows());
1944
1945        let schema = batch.schema();
1946
1947        let a = schema.column_with_name("a").unwrap();
1948        assert_eq!(&DataType::Int64, a.1.data_type());
1949        let b = schema.column_with_name("b").unwrap();
1950        assert_eq!(&DataType::Float32, b.1.data_type());
1951        let c = schema.column_with_name("c").unwrap();
1952        assert_eq!(&DataType::Boolean, c.1.data_type());
1953        let d = schema.column_with_name("d").unwrap();
1954        assert_eq!(&DataType::Utf8, d.1.data_type());
1955
1956        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1957        assert_eq!(1, aa.value(0));
1958        assert_eq!(100000000000000, aa.value(11));
1959        let bb = batch.column(b.0).as_primitive::<Float32Type>();
1960        assert_eq!(2.0, bb.value(0));
1961        assert_eq!(-3.5, bb.value(1));
1962    }
1963
1964    #[test]
1965    fn test_json_basic_schema_projection() {
1966        let schema = Schema::new(vec![
1967            Field::new("a", DataType::Int64, true),
1968            Field::new("c", DataType::Boolean, false),
1969        ]);
1970
1971        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1972        let batch = reader.next().unwrap().unwrap();
1973
1974        assert_eq!(2, batch.num_columns());
1975        assert_eq!(2, batch.schema().fields().len());
1976        assert_eq!(12, batch.num_rows());
1977
1978        assert_eq!(batch.schema().as_ref(), &schema);
1979
1980        let a = schema.column_with_name("a").unwrap();
1981        assert_eq!(0, a.0);
1982        assert_eq!(&DataType::Int64, a.1.data_type());
1983        let c = schema.column_with_name("c").unwrap();
1984        assert_eq!(1, c.0);
1985        assert_eq!(&DataType::Boolean, c.1.data_type());
1986    }
1987
1988    #[test]
1989    fn test_json_arrays() {
1990        let mut reader = read_file("test/data/arrays.json", None);
1991        let batch = reader.next().unwrap().unwrap();
1992
1993        assert_eq!(4, batch.num_columns());
1994        assert_eq!(3, batch.num_rows());
1995
1996        let schema = batch.schema();
1997
1998        let a = schema.column_with_name("a").unwrap();
1999        assert_eq!(&DataType::Int64, a.1.data_type());
2000        let b = schema.column_with_name("b").unwrap();
2001        assert_eq!(
2002            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2003            b.1.data_type()
2004        );
2005        let c = schema.column_with_name("c").unwrap();
2006        assert_eq!(
2007            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2008            c.1.data_type()
2009        );
2010        let d = schema.column_with_name("d").unwrap();
2011        assert_eq!(&DataType::Utf8, d.1.data_type());
2012
2013        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2014        assert_eq!(1, aa.value(0));
2015        assert_eq!(-10, aa.value(1));
2016        assert_eq!(1627668684594000000, aa.value(2));
2017        let bb = batch.column(b.0).as_list::<i32>();
2018        let bb = bb.values().as_primitive::<Float64Type>();
2019        assert_eq!(9, bb.len());
2020        assert_eq!(2.0, bb.value(0));
2021        assert_eq!(-6.1, bb.value(5));
2022        assert!(!bb.is_valid(7));
2023
2024        let cc = batch
2025            .column(c.0)
2026            .as_any()
2027            .downcast_ref::<ListArray>()
2028            .unwrap();
2029        let cc = cc.values().as_boolean();
2030        assert_eq!(6, cc.len());
2031        assert!(!cc.value(0));
2032        assert!(!cc.value(4));
2033        assert!(!cc.is_valid(5));
2034    }
2035
2036    #[test]
2037    fn test_empty_json_arrays() {
2038        let json_content = r#"
2039            {"items": []}
2040            {"items": null}
2041            {}
2042            "#;
2043
2044        let schema = Arc::new(Schema::new(vec![Field::new(
2045            "items",
2046            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2047            true,
2048        )]));
2049
2050        let batches = do_read(json_content, 1024, false, false, schema);
2051        assert_eq!(batches.len(), 1);
2052
2053        let col1 = batches[0].column(0).as_list::<i32>();
2054        assert_eq!(col1.null_count(), 2);
2055        assert!(col1.value(0).is_empty());
2056        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2057        assert!(col1.is_null(1));
2058        assert!(col1.is_null(2));
2059    }
2060
2061    #[test]
2062    fn test_nested_empty_json_arrays() {
2063        let json_content = r#"
2064            {"items": [[],[]]}
2065            {"items": [[null, null],[null]]}
2066            "#;
2067
2068        let schema = Arc::new(Schema::new(vec![Field::new(
2069            "items",
2070            DataType::List(FieldRef::new(Field::new_list_field(
2071                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2072                true,
2073            ))),
2074            true,
2075        )]));
2076
2077        let batches = do_read(json_content, 1024, false, false, schema);
2078        assert_eq!(batches.len(), 1);
2079
2080        let col1 = batches[0].column(0).as_list::<i32>();
2081        assert_eq!(col1.null_count(), 0);
2082        assert_eq!(col1.value(0).len(), 2);
2083        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2084        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2085
2086        assert_eq!(col1.value(1).len(), 2);
2087        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2088        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2089    }
2090
2091    #[test]
2092    fn test_nested_list_json_arrays() {
2093        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2094        let a_struct_field = Field::new_struct(
2095            "a",
2096            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2097            true,
2098        );
2099        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2100        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2101        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2102        let json_content = r#"
2103        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2104        {"a": [{"b": false, "c": null}]}
2105        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2106        {"a": null}
2107        {"a": []}
2108        {"a": [null]}
2109        "#;
2110        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2111
2112        // build expected output
2113        let d = StringArray::from(vec![
2114            Some("a_text"),
2115            Some("b_text"),
2116            None,
2117            Some("c_text"),
2118            Some("d_text"),
2119            None,
2120            None,
2121        ]);
2122        let c = ArrayDataBuilder::new(c_field.data_type().clone())
2123            .len(7)
2124            .add_child_data(d.to_data())
2125            .null_bit_buffer(Some(Buffer::from([0b00111011])))
2126            .build()
2127            .unwrap();
2128        let b = BooleanArray::from(vec![
2129            Some(true),
2130            Some(false),
2131            Some(false),
2132            Some(true),
2133            None,
2134            Some(true),
2135            None,
2136        ]);
2137        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2138            .len(7)
2139            .add_child_data(b.to_data())
2140            .add_child_data(c.clone())
2141            .null_bit_buffer(Some(Buffer::from([0b00111111])))
2142            .build()
2143            .unwrap();
2144        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2145            .len(6)
2146            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2147            .add_child_data(a)
2148            .null_bit_buffer(Some(Buffer::from([0b00110111])))
2149            .build()
2150            .unwrap();
2151        let expected = make_array(a_list);
2152
2153        // compare `a` with result from json reader
2154        let batch = reader.next().unwrap().unwrap();
2155        let read = batch.column(0);
2156        assert_eq!(read.len(), 6);
2157        // compare the arrays the long way around, to better detect differences
2158        let read: &ListArray = read.as_list::<i32>();
2159        let expected = expected.as_list::<i32>();
2160        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2161        // compare list null buffers
2162        assert_eq!(read.nulls(), expected.nulls());
2163        // build struct from list
2164        let struct_array = read.values().as_struct();
2165        let expected_struct_array = expected.values().as_struct();
2166
2167        assert_eq!(7, struct_array.len());
2168        assert_eq!(1, struct_array.null_count());
2169        assert_eq!(7, expected_struct_array.len());
2170        assert_eq!(1, expected_struct_array.null_count());
2171        // test struct's nulls
2172        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2173        // test struct's fields
2174        let read_b = struct_array.column(0);
2175        assert_eq!(read_b.as_ref(), &b);
2176        let read_c = struct_array.column(1);
2177        assert_eq!(read_c.to_data(), c);
2178        let read_c = read_c.as_struct();
2179        let read_d = read_c.column(0);
2180        assert_eq!(read_d.as_ref(), &d);
2181
2182        assert_eq!(read, expected);
2183    }
2184
2185    #[test]
2186    fn test_skip_empty_lines() {
2187        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2188        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2189        let json_content = "
2190        {\"a\": 1}
2191        {\"a\": 2}
2192        {\"a\": 3}";
2193        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2194        let batch = reader.next().unwrap().unwrap();
2195
2196        assert_eq!(1, batch.num_columns());
2197        assert_eq!(3, batch.num_rows());
2198
2199        let schema = reader.schema();
2200        let c = schema.column_with_name("a").unwrap();
2201        assert_eq!(&DataType::Int64, c.1.data_type());
2202    }
2203
2204    #[test]
2205    fn test_with_multiple_batches() {
2206        let file = File::open("test/data/basic_nulls.json").unwrap();
2207        let mut reader = BufReader::new(file);
2208        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2209        reader.rewind().unwrap();
2210
2211        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2212        let mut reader = builder.build(reader).unwrap();
2213
2214        let mut num_records = Vec::new();
2215        while let Some(rb) = reader.next().transpose().unwrap() {
2216            num_records.push(rb.num_rows());
2217        }
2218
2219        assert_eq!(vec![5, 5, 2], num_records);
2220    }
2221
2222    #[test]
2223    fn test_timestamp_from_json_seconds() {
2224        let schema = Schema::new(vec![Field::new(
2225            "a",
2226            DataType::Timestamp(TimeUnit::Second, None),
2227            true,
2228        )]);
2229
2230        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2231        let batch = reader.next().unwrap().unwrap();
2232
2233        assert_eq!(1, batch.num_columns());
2234        assert_eq!(12, batch.num_rows());
2235
2236        let schema = reader.schema();
2237        let batch_schema = batch.schema();
2238        assert_eq!(schema, batch_schema);
2239
2240        let a = schema.column_with_name("a").unwrap();
2241        assert_eq!(
2242            &DataType::Timestamp(TimeUnit::Second, None),
2243            a.1.data_type()
2244        );
2245
2246        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2247        assert!(aa.is_valid(0));
2248        assert!(!aa.is_valid(1));
2249        assert!(!aa.is_valid(2));
2250        assert_eq!(1, aa.value(0));
2251        assert_eq!(1, aa.value(3));
2252        assert_eq!(5, aa.value(7));
2253    }
2254
2255    #[test]
2256    fn test_timestamp_from_json_milliseconds() {
2257        let schema = Schema::new(vec![Field::new(
2258            "a",
2259            DataType::Timestamp(TimeUnit::Millisecond, None),
2260            true,
2261        )]);
2262
2263        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2264        let batch = reader.next().unwrap().unwrap();
2265
2266        assert_eq!(1, batch.num_columns());
2267        assert_eq!(12, batch.num_rows());
2268
2269        let schema = reader.schema();
2270        let batch_schema = batch.schema();
2271        assert_eq!(schema, batch_schema);
2272
2273        let a = schema.column_with_name("a").unwrap();
2274        assert_eq!(
2275            &DataType::Timestamp(TimeUnit::Millisecond, None),
2276            a.1.data_type()
2277        );
2278
2279        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2280        assert!(aa.is_valid(0));
2281        assert!(!aa.is_valid(1));
2282        assert!(!aa.is_valid(2));
2283        assert_eq!(1, aa.value(0));
2284        assert_eq!(1, aa.value(3));
2285        assert_eq!(5, aa.value(7));
2286    }
2287
2288    #[test]
2289    fn test_date_from_json_milliseconds() {
2290        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2291
2292        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2293        let batch = reader.next().unwrap().unwrap();
2294
2295        assert_eq!(1, batch.num_columns());
2296        assert_eq!(12, batch.num_rows());
2297
2298        let schema = reader.schema();
2299        let batch_schema = batch.schema();
2300        assert_eq!(schema, batch_schema);
2301
2302        let a = schema.column_with_name("a").unwrap();
2303        assert_eq!(&DataType::Date64, a.1.data_type());
2304
2305        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2306        assert!(aa.is_valid(0));
2307        assert!(!aa.is_valid(1));
2308        assert!(!aa.is_valid(2));
2309        assert_eq!(1, aa.value(0));
2310        assert_eq!(1, aa.value(3));
2311        assert_eq!(5, aa.value(7));
2312    }
2313
2314    #[test]
2315    fn test_time_from_json_nanoseconds() {
2316        let schema = Schema::new(vec![Field::new(
2317            "a",
2318            DataType::Time64(TimeUnit::Nanosecond),
2319            true,
2320        )]);
2321
2322        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2323        let batch = reader.next().unwrap().unwrap();
2324
2325        assert_eq!(1, batch.num_columns());
2326        assert_eq!(12, batch.num_rows());
2327
2328        let schema = reader.schema();
2329        let batch_schema = batch.schema();
2330        assert_eq!(schema, batch_schema);
2331
2332        let a = schema.column_with_name("a").unwrap();
2333        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2334
2335        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2336        assert!(aa.is_valid(0));
2337        assert!(!aa.is_valid(1));
2338        assert!(!aa.is_valid(2));
2339        assert_eq!(1, aa.value(0));
2340        assert_eq!(1, aa.value(3));
2341        assert_eq!(5, aa.value(7));
2342    }
2343
2344    #[test]
2345    fn test_json_iterator() {
2346        let file = File::open("test/data/basic.json").unwrap();
2347        let mut reader = BufReader::new(file);
2348        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2349        reader.rewind().unwrap();
2350
2351        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2352        let reader = builder.build(reader).unwrap();
2353        let schema = reader.schema();
2354        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2355
2356        let mut sum_num_rows = 0;
2357        let mut num_batches = 0;
2358        let mut sum_a = 0;
2359        for batch in reader {
2360            let batch = batch.unwrap();
2361            assert_eq!(8, batch.num_columns());
2362            sum_num_rows += batch.num_rows();
2363            num_batches += 1;
2364            let batch_schema = batch.schema();
2365            assert_eq!(schema, batch_schema);
2366            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2367            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2368        }
2369        assert_eq!(12, sum_num_rows);
2370        assert_eq!(3, num_batches);
2371        assert_eq!(100000000000011, sum_a);
2372    }
2373
2374    #[test]
2375    fn test_decoder_error() {
2376        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2377            "a",
2378            vec![Field::new("child", DataType::Int32, false)],
2379            true,
2380        )]));
2381
2382        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2383        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2384        assert!(decoder.tape_decoder.has_partial_row());
2385        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2386        let _ = decoder.flush().unwrap_err();
2387        assert!(decoder.tape_decoder.has_partial_row());
2388        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2389
2390        let parse_err = |s: &str| {
2391            ReaderBuilder::new(schema.clone())
2392                .build(Cursor::new(s.as_bytes()))
2393                .unwrap()
2394                .next()
2395                .unwrap()
2396                .unwrap_err()
2397                .to_string()
2398        };
2399
2400        let err = parse_err(r#"{"a": 123}"#);
2401        assert_eq!(
2402            err,
2403            "Json error: whilst decoding field 'a': expected { got 123"
2404        );
2405
2406        let err = parse_err(r#"{"a": ["bar"]}"#);
2407        assert_eq!(
2408            err,
2409            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2410        );
2411
2412        let err = parse_err(r#"{"a": []}"#);
2413        assert_eq!(
2414            err,
2415            "Json error: whilst decoding field 'a': expected { got []"
2416        );
2417
2418        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2419        assert_eq!(
2420            err,
2421            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2422        );
2423
2424        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2425        assert_eq!(
2426            err,
2427            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2428        );
2429
2430        let err = parse_err(r#"{"a": true}"#);
2431        assert_eq!(
2432            err,
2433            "Json error: whilst decoding field 'a': expected { got true"
2434        );
2435
2436        let err = parse_err(r#"{"a": false}"#);
2437        assert_eq!(
2438            err,
2439            "Json error: whilst decoding field 'a': expected { got false"
2440        );
2441
2442        let err = parse_err(r#"{"a": "foo"}"#);
2443        assert_eq!(
2444            err,
2445            "Json error: whilst decoding field 'a': expected { got \"foo\""
2446        );
2447
2448        let err = parse_err(r#"{"a": {"child": false}}"#);
2449        assert_eq!(
2450            err,
2451            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2452        );
2453
2454        let err = parse_err(r#"{"a": {"child": []}}"#);
2455        assert_eq!(
2456            err,
2457            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2458        );
2459
2460        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2461        assert_eq!(
2462            err,
2463            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2464        );
2465
2466        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2467        assert_eq!(
2468            err,
2469            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2470        );
2471    }
2472
2473    #[test]
2474    fn test_serialize_timestamp() {
2475        let json = vec![
2476            json!({"timestamp": 1681319393}),
2477            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2478        ];
2479        let schema = Schema::new(vec![Field::new(
2480            "timestamp",
2481            DataType::Timestamp(TimeUnit::Second, None),
2482            true,
2483        )]);
2484        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2485            .build_decoder()
2486            .unwrap();
2487        decoder.serialize(&json).unwrap();
2488        let batch = decoder.flush().unwrap().unwrap();
2489        assert_eq!(batch.num_rows(), 2);
2490        assert_eq!(batch.num_columns(), 1);
2491        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2492        assert_eq!(values.values(), &[1681319393, -7200]);
2493    }
2494
2495    #[test]
2496    fn test_serialize_decimal() {
2497        let json = vec![
2498            json!({"decimal": 1.234}),
2499            json!({"decimal": "1.234"}),
2500            json!({"decimal": 1234}),
2501            json!({"decimal": "1234"}),
2502        ];
2503        let schema = Schema::new(vec![Field::new(
2504            "decimal",
2505            DataType::Decimal128(10, 3),
2506            true,
2507        )]);
2508        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2509            .build_decoder()
2510            .unwrap();
2511        decoder.serialize(&json).unwrap();
2512        let batch = decoder.flush().unwrap().unwrap();
2513        assert_eq!(batch.num_rows(), 4);
2514        assert_eq!(batch.num_columns(), 1);
2515        let values = batch.column(0).as_primitive::<Decimal128Type>();
2516        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2517    }
2518
2519    #[test]
2520    fn test_serde_field() {
2521        let field = Field::new("int", DataType::Int32, true);
2522        let mut decoder = ReaderBuilder::new_with_field(field)
2523            .build_decoder()
2524            .unwrap();
2525        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2526        let b = decoder.flush().unwrap().unwrap();
2527        let values = b.column(0).as_primitive::<Int32Type>().values();
2528        assert_eq!(values, &[1, 2, 3, 4]);
2529    }
2530
2531    #[test]
2532    fn test_serde_large_numbers() {
2533        let field = Field::new("int", DataType::Int64, true);
2534        let mut decoder = ReaderBuilder::new_with_field(field)
2535            .build_decoder()
2536            .unwrap();
2537
2538        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2539        let b = decoder.flush().unwrap().unwrap();
2540        let values = b.column(0).as_primitive::<Int64Type>().values();
2541        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2542
2543        let field = Field::new(
2544            "int",
2545            DataType::Timestamp(TimeUnit::Microsecond, None),
2546            true,
2547        );
2548        let mut decoder = ReaderBuilder::new_with_field(field)
2549            .build_decoder()
2550            .unwrap();
2551
2552        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2553        let b = decoder.flush().unwrap().unwrap();
2554        let values = b
2555            .column(0)
2556            .as_primitive::<TimestampMicrosecondType>()
2557            .values();
2558        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2559    }
2560
2561    #[test]
2562    fn test_coercing_primitive_into_string_decoder() {
2563        let buf = &format!(
2564            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2565            (i32::MAX as i64 + 10),
2566            i64::MAX - 10
2567        );
2568        let schema = Schema::new(vec![
2569            Field::new("a", DataType::Float64, true),
2570            Field::new("b", DataType::Utf8, true),
2571            Field::new("c", DataType::Utf8, true),
2572        ]);
2573        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2574        let schema_ref = Arc::new(schema);
2575
2576        // read record batches
2577        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2578        let mut decoder = reader.build_decoder().unwrap();
2579        decoder.serialize(json_array.as_slice()).unwrap();
2580        let batch = decoder.flush().unwrap().unwrap();
2581        assert_eq!(
2582            batch,
2583            RecordBatch::try_new(
2584                schema_ref,
2585                vec![
2586                    Arc::new(Float64Array::from(vec![
2587                        1.0,
2588                        2.0,
2589                        (i32::MAX as i64 + 10) as f64,
2590                        (i64::MAX - 10) as f64
2591                    ])),
2592                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2593                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2594                ]
2595            )
2596            .unwrap()
2597        );
2598    }
2599
2600    // Parse the given `row` in `struct_mode` as a type given by fields.
2601    //
2602    // If as_struct == true, wrap the fields in a Struct field with name "r".
2603    // If as_struct == false, wrap the fields in a Schema.
2604    fn _parse_structs(
2605        row: &str,
2606        struct_mode: StructMode,
2607        fields: Fields,
2608        as_struct: bool,
2609    ) -> Result<RecordBatch, ArrowError> {
2610        let builder = if as_struct {
2611            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2612        } else {
2613            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2614        };
2615        builder
2616            .with_struct_mode(struct_mode)
2617            .build(Cursor::new(row.as_bytes()))
2618            .unwrap()
2619            .next()
2620            .unwrap()
2621    }
2622
2623    #[test]
2624    fn test_struct_decoding_list_length() {
2625        use arrow_array::array;
2626
2627        let row = "[1, 2]";
2628
2629        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2630        let too_few_fields = Fields::from(fields.clone());
2631        fields.push(Field::new("b", DataType::Int32, true));
2632        let correct_fields = Fields::from(fields.clone());
2633        fields.push(Field::new("c", DataType::Int32, true));
2634        let too_many_fields = Fields::from(fields.clone());
2635
2636        let parse = |fields: Fields, as_struct: bool| {
2637            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2638        };
2639
2640        let expected_row = StructArray::new(
2641            correct_fields.clone(),
2642            vec![
2643                Arc::new(array::Int32Array::from(vec![1])),
2644                Arc::new(array::Int32Array::from(vec![2])),
2645            ],
2646            None,
2647        );
2648        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2649
2650        assert_eq!(
2651            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2652            "Json error: found extra columns for 1 fields".to_string()
2653        );
2654        assert_eq!(
2655            parse(too_few_fields, false).unwrap_err().to_string(),
2656            "Json error: found extra columns for 1 fields".to_string()
2657        );
2658        assert_eq!(
2659            parse(correct_fields.clone(), true).unwrap(),
2660            RecordBatch::try_new(
2661                Arc::new(Schema::new(vec![row_field])),
2662                vec![Arc::new(expected_row.clone())]
2663            )
2664            .unwrap()
2665        );
2666        assert_eq!(
2667            parse(correct_fields, false).unwrap(),
2668            RecordBatch::from(expected_row)
2669        );
2670        assert_eq!(
2671            parse(too_many_fields.clone(), true)
2672                .unwrap_err()
2673                .to_string(),
2674            "Json error: found 2 columns for 3 fields".to_string()
2675        );
2676        assert_eq!(
2677            parse(too_many_fields, false).unwrap_err().to_string(),
2678            "Json error: found 2 columns for 3 fields".to_string()
2679        );
2680    }
2681
2682    #[test]
2683    fn test_struct_decoding() {
2684        use arrow_array::builder;
2685
2686        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2687        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2688        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2689
2690        let struct_fields = Fields::from(vec![
2691            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2692            Field::new_map(
2693                "c",
2694                "entries",
2695                Field::new("keys", DataType::Utf8, false),
2696                Field::new("values", DataType::Int32, true),
2697                false,
2698                false,
2699            ),
2700        ]);
2701
2702        let list_array =
2703            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2704
2705        let map_array = {
2706            let mut map_builder = builder::MapBuilder::new(
2707                None,
2708                builder::StringBuilder::new(),
2709                builder::Int32Builder::new(),
2710            );
2711            map_builder.keys().append_value("d");
2712            map_builder.values().append_value(3);
2713            map_builder.append(true).unwrap();
2714            map_builder.finish()
2715        };
2716
2717        let struct_array = StructArray::new(
2718            struct_fields.clone(),
2719            vec![Arc::new(list_array), Arc::new(map_array)],
2720            None,
2721        );
2722
2723        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2724        let schema = Arc::new(Schema::new(fields.clone()));
2725        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2726
2727        let parse = |row: &str, struct_mode: StructMode| {
2728            _parse_structs(row, struct_mode, fields.clone(), false)
2729        };
2730
2731        assert_eq!(
2732            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2733            expected
2734        );
2735        assert_eq!(
2736            parse(nested_list_json, StructMode::ObjectOnly)
2737                .unwrap_err()
2738                .to_string(),
2739            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2740        );
2741        assert_eq!(
2742            parse(nested_mixed_json, StructMode::ObjectOnly)
2743                .unwrap_err()
2744                .to_string(),
2745            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2746        );
2747
2748        assert_eq!(
2749            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2750            expected
2751        );
2752        assert_eq!(
2753            parse(nested_object_json, StructMode::ListOnly)
2754                .unwrap_err()
2755                .to_string(),
2756            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2757        );
2758        assert_eq!(
2759            parse(nested_mixed_json, StructMode::ListOnly)
2760                .unwrap_err()
2761                .to_string(),
2762            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2763        );
2764    }
2765
2766    // Test cases:
2767    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2768    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2769    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2770    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2771    #[test]
2772    fn test_struct_decoding_empty_list() {
2773        let int_field = Field::new("a", DataType::Int32, true);
2774        let struct_field = Field::new(
2775            "r",
2776            DataType::Struct(Fields::from(vec![int_field.clone()])),
2777            true,
2778        );
2779
2780        let parse = |row: &str, as_struct: bool, field: Field| {
2781            _parse_structs(
2782                row,
2783                StructMode::ListOnly,
2784                Fields::from(vec![field]),
2785                as_struct,
2786            )
2787        };
2788
2789        // Missing fields
2790        assert_eq!(
2791            parse("[]", true, struct_field.clone())
2792                .unwrap_err()
2793                .to_string(),
2794            "Json error: found 0 columns for 1 fields".to_owned()
2795        );
2796        assert_eq!(
2797            parse("[]", false, int_field.clone())
2798                .unwrap_err()
2799                .to_string(),
2800            "Json error: found 0 columns for 1 fields".to_owned()
2801        );
2802        assert_eq!(
2803            parse("[]", false, struct_field.clone())
2804                .unwrap_err()
2805                .to_string(),
2806            "Json error: found 0 columns for 1 fields".to_owned()
2807        );
2808        assert_eq!(
2809            parse("[[]]", false, struct_field.clone())
2810                .unwrap_err()
2811                .to_string(),
2812            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2813        );
2814    }
2815
2816    #[test]
2817    fn test_decode_list_struct_with_wrong_types() {
2818        let int_field = Field::new("a", DataType::Int32, true);
2819        let struct_field = Field::new(
2820            "r",
2821            DataType::Struct(Fields::from(vec![int_field.clone()])),
2822            true,
2823        );
2824
2825        let parse = |row: &str, as_struct: bool, field: Field| {
2826            _parse_structs(
2827                row,
2828                StructMode::ListOnly,
2829                Fields::from(vec![field]),
2830                as_struct,
2831            )
2832        };
2833
2834        // Wrong values
2835        assert_eq!(
2836            parse(r#"[["a"]]"#, false, struct_field.clone())
2837                .unwrap_err()
2838                .to_string(),
2839            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2840        );
2841        assert_eq!(
2842            parse(r#"[["a"]]"#, true, struct_field.clone())
2843                .unwrap_err()
2844                .to_string(),
2845            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2846        );
2847        assert_eq!(
2848            parse(r#"["a"]"#, true, int_field.clone())
2849                .unwrap_err()
2850                .to_string(),
2851            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2852        );
2853        assert_eq!(
2854            parse(r#"["a"]"#, false, int_field.clone())
2855                .unwrap_err()
2856                .to_string(),
2857            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2858        );
2859    }
2860}