Skip to main content

arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use std::borrow::Cow;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use arrow_array::cast::AsArray;
141use arrow_array::timezone::Tz;
142use arrow_array::types::*;
143use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer};
144use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
145use chrono::Utc;
146use serde_core::Serialize;
147
148use crate::StructMode;
149use crate::reader::binary_array::{
150    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
151};
152use crate::reader::boolean_array::BooleanArrayDecoder;
153use crate::reader::decimal_array::DecimalArrayDecoder;
154use crate::reader::list_array::{
155    FixedSizeListArrayDecoder, ListArrayDecoder, ListViewArrayDecoder,
156};
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
161use crate::reader::string_array::StringArrayDecoder;
162use crate::reader::string_view_array::StringViewArrayDecoder;
163use crate::reader::struct_array::StructArrayDecoder;
164use crate::reader::tape::{Tape, TapeDecoder};
165use crate::reader::timestamp_array::TimestampArrayDecoder;
166
167pub use schema::*;
168pub use value_iter::ValueIter;
169
170mod binary_array;
171mod boolean_array;
172mod decimal_array;
173mod list_array;
174mod map_array;
175mod null_array;
176mod primitive_array;
177mod run_end_array;
178mod schema;
179mod serializer;
180mod string_array;
181mod string_view_array;
182mod struct_array;
183mod tape;
184mod timestamp_array;
185mod value_iter;
186
187/// A builder for [`Reader`] and [`Decoder`]
188pub struct ReaderBuilder {
189    batch_size: usize,
190    coerce_primitive: bool,
191    strict_mode: bool,
192    ignore_type_conflicts: bool,
193    is_field: bool,
194    struct_mode: StructMode,
195
196    schema: SchemaRef,
197}
198
199impl ReaderBuilder {
200    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
201    ///
202    /// This could be obtained using [`infer_json_schema`] if not known
203    ///
204    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
205    /// In this case, an error is returned when a column is missing from `schema`.
206    ///
207    /// [`infer_json_schema`]: crate::reader::infer_json_schema
208    pub fn new(schema: SchemaRef) -> Self {
209        Self {
210            batch_size: 1024,
211            coerce_primitive: false,
212            strict_mode: false,
213            ignore_type_conflicts: false,
214            is_field: false,
215            struct_mode: Default::default(),
216            schema,
217        }
218    }
219
220    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
221    ///
222    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
223    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
224    ///
225    /// ```
226    /// # use std::sync::Arc;
227    /// # use arrow_array::cast::AsArray;
228    /// # use arrow_array::types::Int32Type;
229    /// # use arrow_json::ReaderBuilder;
230    /// # use arrow_schema::{DataType, Field};
231    /// // Root of JSON schema is a numeric type
232    /// let data = "1\n2\n3\n";
233    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
234    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
235    /// let b = reader.next().unwrap().unwrap();
236    /// let values = b.column(0).as_primitive::<Int32Type>().values();
237    /// assert_eq!(values, &[1, 2, 3]);
238    ///
239    /// // Root of JSON schema is a list type
240    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
241    /// let field = Field::new_list("int", field.clone(), true);
242    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
243    /// let b = reader.next().unwrap().unwrap();
244    /// let list = b.column(0).as_list::<i32>();
245    ///
246    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
247    /// let list_values = list.values().as_primitive::<Int32Type>();
248    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
249    /// ```
250    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
251        Self {
252            batch_size: 1024,
253            coerce_primitive: false,
254            strict_mode: false,
255            ignore_type_conflicts: false,
256            is_field: true,
257            struct_mode: Default::default(),
258            schema: Arc::new(Schema::new([field.into()])),
259        }
260    }
261
262    /// Sets the batch size in rows to read
263    pub fn with_batch_size(self, batch_size: usize) -> Self {
264        Self { batch_size, ..self }
265    }
266
267    /// Sets if the decoder should coerce primitive values (bool and number) into string
268    /// when the Schema's column is Utf8 or LargeUtf8.
269    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
270        Self {
271            coerce_primitive,
272            ..self
273        }
274    }
275
276    /// Sets if the decoder should return an error if it encounters a column not
277    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
278    /// `strict_mode` is effectively `true`. It is required for all fields of
279    /// the struct to be in the list: without field names, there is no way to
280    /// determine which field is missing.
281    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
282        Self {
283            strict_mode,
284            ..self
285        }
286    }
287
288    /// Set the [`StructMode`] for the reader, which determines whether structs
289    /// can be decoded from JSON as objects or lists. For more details refer to
290    /// the enum documentation. Default is to use `ObjectOnly`.
291    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
292        Self {
293            struct_mode,
294            ..self
295        }
296    }
297
298    /// Sets whether the decoder should produce NULL instead of returning an error if it encounters
299    /// value that can not be parsed into the specified column type.
300    ///
301    /// For example, if the type is declared to be a nullable array of `DataType::Int32` but the
302    /// reader encounters a string value `"foo"` and the value `ignore_type_conflicts` is:
303    ///
304    /// * `false` (the default): The reader will return an error.
305    ///
306    /// * `true`: The reader will fill in NULL value for that array element.
307    ///
308    /// NOTE: An inferred NULL due to a type conflict will still produce parsing errors for
309    /// non-nullable fields, the same as any other NULL or missing value.
310    pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self {
311        Self {
312            ignore_type_conflicts,
313            ..self
314        }
315    }
316
317    /// Create a [`Reader`] with the provided [`BufRead`]
318    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
319        Ok(Reader {
320            reader,
321            decoder: self.build_decoder()?,
322        })
323    }
324
325    /// Create a [`Decoder`]
326    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
327        let (data_type, nullable) = if self.is_field {
328            let field = &self.schema.fields[0];
329            let data_type = Cow::Borrowed(field.data_type());
330            (data_type, field.is_nullable())
331        } else {
332            let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
333            (data_type, false)
334        };
335
336        let ctx = DecoderContext {
337            coerce_primitive: self.coerce_primitive,
338            strict_mode: self.strict_mode,
339            struct_mode: self.struct_mode,
340            ignore_type_conflicts: self.ignore_type_conflicts,
341        };
342        let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
343
344        let num_fields = self.schema.flattened_fields().len();
345
346        Ok(Decoder {
347            decoder,
348            is_field: self.is_field,
349            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
350            batch_size: self.batch_size,
351            schema: self.schema,
352        })
353    }
354}
355
356/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
357///
358/// Lines consisting solely of ASCII whitespace are ignored
359pub struct Reader<R> {
360    reader: R,
361    decoder: Decoder,
362}
363
364impl<R> std::fmt::Debug for Reader<R> {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        f.debug_struct("Reader")
367            .field("decoder", &self.decoder)
368            .finish()
369    }
370}
371
372impl<R: BufRead> Reader<R> {
373    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
374    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
375        loop {
376            let buf = self.reader.fill_buf()?;
377            if buf.is_empty() {
378                break;
379            }
380            let read = buf.len();
381
382            let decoded = self.decoder.decode(buf)?;
383            self.reader.consume(decoded);
384            if decoded != read {
385                break;
386            }
387        }
388        self.decoder.flush()
389    }
390}
391
392impl<R: BufRead> Iterator for Reader<R> {
393    type Item = Result<RecordBatch, ArrowError>;
394
395    fn next(&mut self) -> Option<Self::Item> {
396        self.read().transpose()
397    }
398}
399
400impl<R: BufRead> RecordBatchReader for Reader<R> {
401    fn schema(&self) -> SchemaRef {
402        self.decoder.schema.clone()
403    }
404}
405
406/// A low-level interface for reading JSON data from a byte stream
407///
408/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
409///
410/// The push-based interface facilitates integration with sources that yield arbitrarily
411/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
412/// object storage
413///
414/// ```
415/// # use std::io::BufRead;
416/// # use arrow_array::RecordBatch;
417/// # use arrow_json::reader::{Decoder, ReaderBuilder};
418/// # use arrow_schema::{ArrowError, SchemaRef};
419/// #
420/// fn read_from_json<R: BufRead>(
421///     mut reader: R,
422///     schema: SchemaRef,
423/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
424///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
425///     let mut next = move || {
426///         loop {
427///             // Decoder is agnostic that buf doesn't contain whole records
428///             let buf = reader.fill_buf()?;
429///             if buf.is_empty() {
430///                 break; // Input exhausted
431///             }
432///             let read = buf.len();
433///             let decoded = decoder.decode(buf)?;
434///
435///             // Consume the number of bytes read
436///             reader.consume(decoded);
437///             if decoded != read {
438///                 break; // Read batch size
439///             }
440///         }
441///         decoder.flush()
442///     };
443///     Ok(std::iter::from_fn(move || next().transpose()))
444/// }
445/// ```
446pub struct Decoder {
447    tape_decoder: TapeDecoder,
448    decoder: Box<dyn ArrayDecoder>,
449    batch_size: usize,
450    is_field: bool,
451    schema: SchemaRef,
452}
453
454impl std::fmt::Debug for Decoder {
455    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456        f.debug_struct("Decoder")
457            .field("schema", &self.schema)
458            .field("batch_size", &self.batch_size)
459            .finish()
460    }
461}
462
463impl Decoder {
464    /// Read JSON objects from `buf`, returning the number of bytes read
465    ///
466    /// This method returns once `batch_size` objects have been parsed since the
467    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
468    /// should be included in the next call to [`Self::decode`]
469    ///
470    /// There is no requirement that `buf` contains a whole number of records, facilitating
471    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
472    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
473        self.tape_decoder.decode(buf)
474    }
475
476    /// Serialize `rows` to this [`Decoder`]
477    ///
478    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
479    /// [`RecordBatch`].
480    ///
481    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
482    /// especially where the schema is known at compile-time, however, this provides a mechanism
483    /// to get something up and running quickly
484    ///
485    /// It can be used with [`serde_json::Value`]
486    ///
487    /// ```
488    /// # use std::sync::Arc;
489    /// # use serde_json::{Value, json};
490    /// # use arrow_array::cast::AsArray;
491    /// # use arrow_array::types::Float32Type;
492    /// # use arrow_json::ReaderBuilder;
493    /// # use arrow_schema::{DataType, Field, Schema};
494    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
495    ///
496    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
497    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
498    ///
499    /// decoder.serialize(&json).unwrap();
500    /// let batch = decoder.flush().unwrap().unwrap();
501    /// assert_eq!(batch.num_rows(), 2);
502    /// assert_eq!(batch.num_columns(), 1);
503    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
504    /// assert_eq!(values, &[2.3, 5.7])
505    /// ```
506    ///
507    /// Or with arbitrary [`Serialize`] types
508    ///
509    /// ```
510    /// # use std::sync::Arc;
511    /// # use arrow_json::ReaderBuilder;
512    /// # use arrow_schema::{DataType, Field, Schema};
513    /// # use serde::Serialize;
514    /// # use arrow_array::cast::AsArray;
515    /// # use arrow_array::types::{Float32Type, Int32Type};
516    /// #
517    /// #[derive(Serialize)]
518    /// struct MyStruct {
519    ///     int32: i32,
520    ///     float: f32,
521    /// }
522    ///
523    /// let schema = Schema::new(vec![
524    ///     Field::new("int32", DataType::Int32, false),
525    ///     Field::new("float", DataType::Float32, false),
526    /// ]);
527    ///
528    /// let rows = vec![
529    ///     MyStruct{ int32: 0, float: 3. },
530    ///     MyStruct{ int32: 4, float: 67.53 },
531    /// ];
532    ///
533    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
534    /// decoder.serialize(&rows).unwrap();
535    ///
536    /// let batch = decoder.flush().unwrap().unwrap();
537    ///
538    /// // Expect batch containing two columns
539    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
540    /// assert_eq!(int32.values(), &[0, 4]);
541    ///
542    /// let float = batch.column(1).as_primitive::<Float32Type>();
543    /// assert_eq!(float.values(), &[3., 67.53]);
544    /// ```
545    ///
546    /// Or even complex nested types
547    ///
548    /// ```
549    /// # use std::collections::BTreeMap;
550    /// # use std::sync::Arc;
551    /// # use arrow_array::StructArray;
552    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
553    /// # use arrow_json::ReaderBuilder;
554    /// # use arrow_schema::{DataType, Field, Fields, Schema};
555    /// # use serde::Serialize;
556    /// #
557    /// #[derive(Serialize)]
558    /// struct MyStruct {
559    ///     int32: i32,
560    ///     list: Vec<f64>,
561    ///     nested: Vec<Option<Nested>>,
562    /// }
563    ///
564    /// impl MyStruct {
565    ///     /// Returns the [`Fields`] for [`MyStruct`]
566    ///     fn fields() -> Fields {
567    ///         let nested = DataType::Struct(Nested::fields());
568    ///         Fields::from([
569    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
570    ///             Arc::new(Field::new_list(
571    ///                 "list",
572    ///                 Field::new("element", DataType::Float64, false),
573    ///                 false,
574    ///             )),
575    ///             Arc::new(Field::new_list(
576    ///                 "nested",
577    ///                 Field::new("element", nested, true),
578    ///                 true,
579    ///             )),
580    ///         ])
581    ///     }
582    /// }
583    ///
584    /// #[derive(Serialize)]
585    /// struct Nested {
586    ///     map: BTreeMap<String, Vec<String>>
587    /// }
588    ///
589    /// impl Nested {
590    ///     /// Returns the [`Fields`] for [`Nested`]
591    ///     fn fields() -> Fields {
592    ///         let element = Field::new("element", DataType::Utf8, false);
593    ///         Fields::from([
594    ///             Arc::new(Field::new_map(
595    ///                 "map",
596    ///                 "entries",
597    ///                 Field::new("key", DataType::Utf8, false),
598    ///                 Field::new_list("value", element, false),
599    ///                 false, // sorted
600    ///                 false, // nullable
601    ///             ))
602    ///         ])
603    ///     }
604    /// }
605    ///
606    /// let data = vec![
607    ///     MyStruct {
608    ///         int32: 34,
609    ///         list: vec![1., 2., 34.],
610    ///         nested: vec![
611    ///             None,
612    ///             Some(Nested {
613    ///                 map: vec![
614    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
615    ///                     ("key2".to_string(), vec!["baz".to_string()])
616    ///                 ].into_iter().collect()
617    ///             })
618    ///         ]
619    ///     },
620    ///     MyStruct {
621    ///         int32: 56,
622    ///         list: vec![],
623    ///         nested: vec![]
624    ///     },
625    ///     MyStruct {
626    ///         int32: 24,
627    ///         list: vec![-1., 245.],
628    ///         nested: vec![None]
629    ///     }
630    /// ];
631    ///
632    /// let schema = Schema::new(MyStruct::fields());
633    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
634    /// decoder.serialize(&data).unwrap();
635    /// let batch = decoder.flush().unwrap().unwrap();
636    /// assert_eq!(batch.num_rows(), 3);
637    /// assert_eq!(batch.num_columns(), 3);
638    ///
639    /// // Convert to StructArray to format
640    /// let s = StructArray::from(batch);
641    /// let options = FormatOptions::default().with_null("null");
642    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
643    ///
644    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
645    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
646    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
647    /// ```
648    ///
649    /// Note: this ignores any batch size setting, and always decodes all rows
650    ///
651    /// [serde]: https://docs.rs/serde/latest/serde/
652    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
653        self.tape_decoder.serialize(rows)
654    }
655
656    /// True if the decoder is currently part way through decoding a record.
657    pub fn has_partial_record(&self) -> bool {
658        self.tape_decoder.has_partial_row()
659    }
660
661    /// The number of unflushed records, including the partially decoded record (if any).
662    pub fn len(&self) -> usize {
663        self.tape_decoder.num_buffered_rows()
664    }
665
666    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
667    pub fn is_empty(&self) -> bool {
668        self.len() == 0
669    }
670
671    /// Flushes the currently buffered data to a [`RecordBatch`]
672    ///
673    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
674    ///
675    /// Note: This will return an error if called part way through decoding a record,
676    /// i.e. [`Self::has_partial_record`] is true.
677    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
678        let tape = self.tape_decoder.finish()?;
679
680        if tape.num_rows() == 0 {
681            return Ok(None);
682        }
683
684        // First offset is null sentinel
685        let mut next_object = 1;
686        let pos: Vec<_> = (0..tape.num_rows())
687            .map(|_| {
688                let next = tape.next(next_object, "row").unwrap();
689                std::mem::replace(&mut next_object, next)
690            })
691            .collect();
692
693        let decoded = self.decoder.decode(&tape, &pos)?;
694        self.tape_decoder.clear();
695
696        let batch = match self.is_field {
697            true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?,
698            false => {
699                RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())?
700            }
701        };
702
703        Ok(Some(batch))
704    }
705}
706
707trait ArrayDecoder: Send {
708    /// Decode elements from `tape` starting at the indexes contained in `pos`
709    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError>;
710}
711
712/// Context for decoder creation, containing configuration.
713///
714/// This context is passed through the decoder creation process and contains
715/// all the configuration needed to create decoders recursively.
716pub struct DecoderContext {
717    /// Whether to coerce primitives to strings
718    coerce_primitive: bool,
719    /// Whether to validate struct fields strictly
720    strict_mode: bool,
721    /// How to decode struct fields
722    struct_mode: StructMode,
723    /// Whether to treat columns with incompatible types as missing (i.e. NULL)
724    ignore_type_conflicts: bool,
725}
726
727impl DecoderContext {
728    /// Returns whether to coerce primitive types (e.g., number to string)
729    pub fn coerce_primitive(&self) -> bool {
730        self.coerce_primitive
731    }
732
733    /// Returns whether to validate struct fields strictly
734    pub fn strict_mode(&self) -> bool {
735        self.strict_mode
736    }
737
738    /// Returns how to decode struct fields
739    pub fn struct_mode(&self) -> StructMode {
740        self.struct_mode
741    }
742
743    /// Returns whether to treat columns with incompatible types as missing (i.e. NULL)
744    pub fn ignore_type_conflicts(&self) -> bool {
745        self.ignore_type_conflicts
746    }
747
748    /// Create a decoder for a type.
749    ///
750    /// This is the standard way to create child decoders from within a decoder
751    /// implementation.
752    fn make_decoder(
753        &self,
754        data_type: &DataType,
755        is_nullable: bool,
756    ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
757        make_decoder(self, data_type, is_nullable)
758    }
759}
760
761fn make_decoder(
762    ctx: &DecoderContext,
763    data_type: &DataType,
764    is_nullable: bool,
765) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
766    macro_rules! primitive_decoder {
767        ($t:ty, $data_type:expr) => {
768            Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type)))
769        };
770    }
771    macro_rules! timestamp_decoder {
772        ($t:ty, $data_type:expr, $tz:expr) => {{
773            Ok(Box::new(TimestampArrayDecoder::<$t, _>::new(
774                ctx, $data_type, $tz,
775            )))
776        }};
777    }
778    macro_rules! decimal_decoder {
779        ($t:ty, $p:expr, $s:expr) => {
780            Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s)))
781        };
782    }
783
784    downcast_integer! {
785        *data_type => (primitive_decoder, data_type),
786        DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))),
787        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
788        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
789        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
790        DataType::Timestamp(TimeUnit::Second, None) => {
791            timestamp_decoder!(TimestampSecondType, data_type, Utc)
792        },
793        DataType::Timestamp(TimeUnit::Millisecond, None) => {
794            timestamp_decoder!(TimestampMillisecondType, data_type, Utc)
795        },
796        DataType::Timestamp(TimeUnit::Microsecond, None) => {
797            timestamp_decoder!(TimestampMicrosecondType, data_type, Utc)
798        },
799        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
800            timestamp_decoder!(TimestampNanosecondType, data_type, Utc)
801        },
802        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
803            let tz: Tz = tz.parse()?;
804            timestamp_decoder!(TimestampSecondType, data_type, tz)
805        },
806        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
807            let tz: Tz = tz.parse()?;
808            timestamp_decoder!(TimestampMillisecondType, data_type, tz)
809        },
810        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
811            let tz: Tz = tz.parse()?;
812            timestamp_decoder!(TimestampMicrosecondType, data_type, tz)
813        },
814        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
815            let tz: Tz = tz.parse()?;
816            timestamp_decoder!(TimestampNanosecondType, data_type, tz)
817        },
818        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
819        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
820        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
821        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
822        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
823        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
824        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
825        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
826        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
827        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
828        DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s),
829        DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s),
830        DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s),
831        DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s),
832        DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))),
833        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(ctx))),
834        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))),
835        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(ctx))),
836        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
838        DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
839        DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
840        DataType::FixedSizeList(_, _) => Ok(Box::new(FixedSizeListArrayDecoder::new(ctx, data_type, is_nullable)?)),
841        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
842        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
843        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
844        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
845        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
846        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
847        DataType::RunEndEncoded(ref r, _) => match r.data_type() {
848            DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
849            DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
850            DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
851            d => unreachable!("unsupported run end index type: {d}"),
852        },
853        _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use arrow_array::cast::AsArray;
860    use arrow_array::{
861        Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array, ListArray, MapArray,
862        NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray,
863    };
864    use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
865    use arrow_cast::display::{ArrayFormatter, FormatOptions};
866    use arrow_schema::{Field, Fields};
867    use serde_json::json;
868    use std::fs::File;
869    use std::io::{BufReader, Cursor, Seek};
870
871    use super::*;
872
873    fn do_read(
874        buf: &str,
875        batch_size: usize,
876        coerce_primitive: bool,
877        strict_mode: bool,
878        schema: SchemaRef,
879    ) -> Vec<RecordBatch> {
880        let mut unbuffered = vec![];
881
882        // Test with different batch sizes to test for boundary conditions
883        for batch_size in [1, 3, 100, batch_size] {
884            unbuffered = ReaderBuilder::new(schema.clone())
885                .with_batch_size(batch_size)
886                .with_coerce_primitive(coerce_primitive)
887                .build(Cursor::new(buf.as_bytes()))
888                .unwrap()
889                .collect::<Result<Vec<_>, _>>()
890                .unwrap();
891
892            for b in unbuffered.iter().take(unbuffered.len() - 1) {
893                assert_eq!(b.num_rows(), batch_size)
894            }
895
896            // Test with different buffer sizes to test for boundary conditions
897            for b in [1, 3, 5] {
898                let buffered = ReaderBuilder::new(schema.clone())
899                    .with_batch_size(batch_size)
900                    .with_coerce_primitive(coerce_primitive)
901                    .with_strict_mode(strict_mode)
902                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
903                    .unwrap()
904                    .collect::<Result<Vec<_>, _>>()
905                    .unwrap();
906                assert_eq!(unbuffered, buffered);
907            }
908        }
909
910        unbuffered
911    }
912
913    #[test]
914    fn test_basic() {
915        let buf = r#"
916        {"a": 1, "b": 2, "c": true, "d": 1}
917        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
918
919        {"b": 6, "a": 2.0, "d": 45}
920        {"b": "5", "a": 2}
921        {"b": 4e0}
922        {"b": 7, "a": null}
923        "#;
924
925        let schema = Arc::new(Schema::new(vec![
926            Field::new("a", DataType::Int64, true),
927            Field::new("b", DataType::Int32, true),
928            Field::new("c", DataType::Boolean, true),
929            Field::new("d", DataType::Date32, true),
930            Field::new("e", DataType::Date64, true),
931        ]));
932
933        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
934        assert!(decoder.is_empty());
935        assert_eq!(decoder.len(), 0);
936        assert!(!decoder.has_partial_record());
937        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
938        assert!(!decoder.is_empty());
939        assert_eq!(decoder.len(), 6);
940        assert!(!decoder.has_partial_record());
941        let batch = decoder.flush().unwrap().unwrap();
942        assert_eq!(batch.num_rows(), 6);
943        assert!(decoder.is_empty());
944        assert_eq!(decoder.len(), 0);
945        assert!(!decoder.has_partial_record());
946
947        let batches = do_read(buf, 1024, false, false, schema);
948        assert_eq!(batches.len(), 1);
949
950        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
951        assert_eq!(col1.null_count(), 2);
952        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
953        assert!(col1.is_null(4));
954        assert!(col1.is_null(5));
955
956        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
957        assert_eq!(col2.null_count(), 0);
958        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
959
960        let col3 = batches[0].column(2).as_boolean();
961        assert_eq!(col3.null_count(), 4);
962        assert!(col3.value(0));
963        assert!(!col3.is_null(0));
964        assert!(!col3.value(1));
965        assert!(!col3.is_null(1));
966
967        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
968        assert_eq!(col4.null_count(), 3);
969        assert!(col4.is_null(3));
970        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
971
972        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
973        assert_eq!(col5.null_count(), 5);
974        assert!(col5.is_null(0));
975        assert!(col5.is_null(2));
976        assert!(col5.is_null(3));
977        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
978    }
979
980    #[test]
981    fn test_string() {
982        let buf = r#"
983        {"a": "1", "b": "2"}
984        {"a": "hello", "b": "shoo"}
985        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
986
987        {"b": null}
988        {"b": "", "a": null}
989
990        "#;
991        let schema = Arc::new(Schema::new(vec![
992            Field::new("a", DataType::Utf8, true),
993            Field::new("b", DataType::LargeUtf8, true),
994        ]));
995
996        let batches = do_read(buf, 1024, false, false, schema);
997        assert_eq!(batches.len(), 1);
998
999        let col1 = batches[0].column(0).as_string::<i32>();
1000        assert_eq!(col1.null_count(), 2);
1001        assert_eq!(col1.value(0), "1");
1002        assert_eq!(col1.value(1), "hello");
1003        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1004        assert!(col1.is_null(3));
1005        assert!(col1.is_null(4));
1006
1007        let col2 = batches[0].column(1).as_string::<i64>();
1008        assert_eq!(col2.null_count(), 1);
1009        assert_eq!(col2.value(0), "2");
1010        assert_eq!(col2.value(1), "shoo");
1011        assert_eq!(col2.value(2), "\t😁foo");
1012        assert!(col2.is_null(3));
1013        assert_eq!(col2.value(4), "");
1014    }
1015
1016    #[test]
1017    fn test_long_string_view_allocation() {
1018        // The JSON input contains field "a" with different string lengths.
1019        // According to the implementation in the decoder:
1020        // - For a string, capacity is only increased if its length > 12 bytes.
1021        // Therefore, for:
1022        // Row 1: "short" (5 bytes) -> capacity += 0
1023        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
1024        // Row 3: "hello" (5 bytes) -> capacity += 0
1025        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
1026        // Expected total capacity = 24 + 17 = 41
1027        let expected_capacity: usize = 41;
1028
1029        let buf = r#"
1030        {"a": "short", "b": "dummy"}
1031        {"a": "this is definitely long", "b": "dummy"}
1032        {"a": "hello", "b": "dummy"}
1033        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
1034        "#;
1035
1036        let schema = Arc::new(Schema::new(vec![
1037            Field::new("a", DataType::Utf8View, true),
1038            Field::new("b", DataType::LargeUtf8, true),
1039        ]));
1040
1041        let batches = do_read(buf, 1024, false, false, schema);
1042        assert_eq!(batches.len(), 1, "Expected one record batch");
1043
1044        // Get the first column ("a") as a StringViewArray.
1045        let col_a = batches[0].column(0);
1046        let string_view_array = col_a
1047            .as_any()
1048            .downcast_ref::<StringViewArray>()
1049            .expect("Column should be a StringViewArray");
1050
1051        // Retrieve the underlying data buffer from the array.
1052        // The builder pre-allocates capacity based on the sum of lengths for long strings.
1053        let data_buffer = string_view_array.to_data().buffers()[0].len();
1054
1055        // Check that the allocated capacity is at least what we expected.
1056        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
1057        assert!(
1058            data_buffer >= expected_capacity,
1059            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1060        );
1061
1062        // Additionally, verify that the decoded values are correct.
1063        assert_eq!(string_view_array.value(0), "short");
1064        assert_eq!(string_view_array.value(1), "this is definitely long");
1065        assert_eq!(string_view_array.value(2), "hello");
1066        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1067    }
1068
1069    /// Test the memory capacity allocation logic when converting numeric types to strings.
1070    #[test]
1071    fn test_numeric_view_allocation() {
1072        // For numeric types, the expected capacity calculation is as follows:
1073        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
1074        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
1075        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
1076        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
1077        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
1078        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
1079        let expected_capacity: usize = 33;
1080
1081        let buf = r#"
1082    {"n": 123456789}
1083    {"n": 1000000000000}
1084    {"n": 3.1415}
1085    {"n": 2.718281828459045}
1086    "#;
1087
1088        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1089
1090        let batches = do_read(buf, 1024, true, false, schema);
1091        assert_eq!(batches.len(), 1, "Expected one record batch");
1092
1093        let col_n = batches[0].column(0);
1094        let string_view_array = col_n
1095            .as_any()
1096            .downcast_ref::<StringViewArray>()
1097            .expect("Column should be a StringViewArray");
1098
1099        // Check that the underlying data buffer capacity is at least the expected value.
1100        let data_buffer = string_view_array.to_data().buffers()[0].len();
1101        assert!(
1102            data_buffer >= expected_capacity,
1103            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1104        );
1105
1106        // Verify that the converted string values are correct.
1107        // Note: The format of the number converted to a string should match the actual implementation.
1108        assert_eq!(string_view_array.value(0), "123456789");
1109        assert_eq!(string_view_array.value(1), "1000000000000");
1110        assert_eq!(string_view_array.value(2), "3.1415");
1111        assert_eq!(string_view_array.value(3), "2.718281828459045");
1112    }
1113
1114    #[test]
1115    fn test_string_with_uft8view() {
1116        let buf = r#"
1117        {"a": "1", "b": "2"}
1118        {"a": "hello", "b": "shoo"}
1119        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1120
1121        {"b": null}
1122        {"b": "", "a": null}
1123
1124        "#;
1125        let schema = Arc::new(Schema::new(vec![
1126            Field::new("a", DataType::Utf8View, true),
1127            Field::new("b", DataType::LargeUtf8, true),
1128        ]));
1129
1130        let batches = do_read(buf, 1024, false, false, schema);
1131        assert_eq!(batches.len(), 1);
1132
1133        let col1 = batches[0].column(0).as_string_view();
1134        assert_eq!(col1.null_count(), 2);
1135        assert_eq!(col1.value(0), "1");
1136        assert_eq!(col1.value(1), "hello");
1137        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1138        assert!(col1.is_null(3));
1139        assert!(col1.is_null(4));
1140        assert_eq!(col1.data_type(), &DataType::Utf8View);
1141
1142        let col2 = batches[0].column(1).as_string::<i64>();
1143        assert_eq!(col2.null_count(), 1);
1144        assert_eq!(col2.value(0), "2");
1145        assert_eq!(col2.value(1), "shoo");
1146        assert_eq!(col2.value(2), "\t😁foo");
1147        assert!(col2.is_null(3));
1148        assert_eq!(col2.value(4), "");
1149    }
1150
1151    #[test]
1152    fn test_complex() {
1153        let buf = r#"
1154           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1155           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1156           {"list": null, "nested": {"a": null}}
1157        "#;
1158
1159        let schema = Arc::new(Schema::new(vec![
1160            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1161            Field::new_struct(
1162                "nested",
1163                vec![
1164                    Field::new("a", DataType::Int32, true),
1165                    Field::new("b", DataType::Int32, true),
1166                ],
1167                true,
1168            ),
1169            Field::new_struct(
1170                "nested_list",
1171                vec![Field::new_list(
1172                    "list2",
1173                    Field::new_struct(
1174                        "element",
1175                        vec![Field::new("c", DataType::Int32, false)],
1176                        false,
1177                    ),
1178                    true,
1179                )],
1180                true,
1181            ),
1182        ]));
1183
1184        let batches = do_read(buf, 1024, false, false, schema);
1185        assert_eq!(batches.len(), 1);
1186
1187        let list = batches[0].column(0).as_list::<i32>();
1188        assert_eq!(list.len(), 3);
1189        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1190        assert_eq!(list.null_count(), 1);
1191        assert!(list.is_null(2));
1192        let list_values = list.values().as_primitive::<Int32Type>();
1193        assert_eq!(list_values.values(), &[5, 6]);
1194
1195        let nested = batches[0].column(1).as_struct();
1196        let a = nested.column(0).as_primitive::<Int32Type>();
1197        assert_eq!(list.null_count(), 1);
1198        assert_eq!(a.values(), &[1, 7, 0]);
1199        assert!(list.is_null(2));
1200
1201        let b = nested.column(1).as_primitive::<Int32Type>();
1202        assert_eq!(b.null_count(), 2);
1203        assert_eq!(b.len(), 3);
1204        assert_eq!(b.value(0), 2);
1205        assert!(b.is_null(1));
1206        assert!(b.is_null(2));
1207
1208        let nested_list = batches[0].column(2).as_struct();
1209        assert_eq!(nested_list.len(), 3);
1210        assert_eq!(nested_list.null_count(), 1);
1211        assert!(nested_list.is_null(2));
1212
1213        let list2 = nested_list.column(0).as_list::<i32>();
1214        assert_eq!(list2.len(), 3);
1215        assert_eq!(list2.null_count(), 1);
1216        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1217        assert!(list2.is_null(2));
1218
1219        let list2_values = list2.values().as_struct();
1220
1221        let c = list2_values.column(0).as_primitive::<Int32Type>();
1222        assert_eq!(c.values(), &[3, 4]);
1223    }
1224
1225    #[test]
1226    fn test_projection() {
1227        let buf = r#"
1228           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1229           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1230        "#;
1231
1232        let schema = Arc::new(Schema::new(vec![
1233            Field::new_struct(
1234                "nested",
1235                vec![Field::new("a", DataType::Int32, false)],
1236                true,
1237            ),
1238            Field::new_struct(
1239                "nested_list",
1240                vec![Field::new_list(
1241                    "list2",
1242                    Field::new_struct(
1243                        "element",
1244                        vec![Field::new("d", DataType::Int32, true)],
1245                        false,
1246                    ),
1247                    true,
1248                )],
1249                true,
1250            ),
1251        ]));
1252
1253        let batches = do_read(buf, 1024, false, false, schema);
1254        assert_eq!(batches.len(), 1);
1255
1256        let nested = batches[0].column(0).as_struct();
1257        assert_eq!(nested.num_columns(), 1);
1258        let a = nested.column(0).as_primitive::<Int32Type>();
1259        assert_eq!(a.null_count(), 0);
1260        assert_eq!(a.values(), &[1, 7]);
1261
1262        let nested_list = batches[0].column(1).as_struct();
1263        assert_eq!(nested_list.num_columns(), 1);
1264        assert_eq!(nested_list.null_count(), 0);
1265
1266        let list2 = nested_list.column(0).as_list::<i32>();
1267        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1268        assert_eq!(list2.null_count(), 0);
1269
1270        let child = list2.values().as_struct();
1271        assert_eq!(child.num_columns(), 1);
1272        assert_eq!(child.len(), 2);
1273        assert_eq!(child.null_count(), 0);
1274
1275        let c = child.column(0).as_primitive::<Int32Type>();
1276        assert_eq!(c.values(), &[5, 0]);
1277        assert_eq!(c.null_count(), 1);
1278        assert!(c.is_null(1));
1279    }
1280
1281    #[test]
1282    fn test_map() {
1283        let buf = r#"
1284           {"map": {"a": ["foo", null]}}
1285           {"map": {"a": [null], "b": []}}
1286           {"map": {"c": null, "a": ["baz"]}}
1287        "#;
1288        let map = Field::new_map(
1289            "map",
1290            "entries",
1291            Field::new("key", DataType::Utf8, false),
1292            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1293            false,
1294            true,
1295        );
1296
1297        let schema = Arc::new(Schema::new(vec![map]));
1298
1299        let batches = do_read(buf, 1024, false, false, schema);
1300        assert_eq!(batches.len(), 1);
1301
1302        let map = batches[0].column(0).as_map();
1303        let map_keys = map.keys().as_string::<i32>();
1304        let map_values = map.values().as_list::<i32>();
1305        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1306
1307        let k: Vec<_> = map_keys.iter().flatten().collect();
1308        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1309
1310        let list_values = map_values.values().as_string::<i32>();
1311        let lv: Vec<_> = list_values.iter().collect();
1312        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1313        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1314        assert_eq!(map_values.null_count(), 1);
1315        assert!(map_values.is_null(3));
1316
1317        let options = FormatOptions::default().with_null("null");
1318        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1319        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1320        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1321        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1322    }
1323
1324    #[test]
1325    fn test_not_coercing_primitive_into_string_without_flag() {
1326        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1327
1328        let buf = r#"{"a": 1}"#;
1329        let err = ReaderBuilder::new(schema.clone())
1330            .with_batch_size(1024)
1331            .build(Cursor::new(buf.as_bytes()))
1332            .unwrap()
1333            .read()
1334            .unwrap_err();
1335
1336        assert_eq!(
1337            err.to_string(),
1338            "Json error: whilst decoding field 'a': expected string got 1"
1339        );
1340
1341        let buf = r#"{"a": true}"#;
1342        let err = ReaderBuilder::new(schema)
1343            .with_batch_size(1024)
1344            .build(Cursor::new(buf.as_bytes()))
1345            .unwrap()
1346            .read()
1347            .unwrap_err();
1348
1349        assert_eq!(
1350            err.to_string(),
1351            "Json error: whilst decoding field 'a': expected string got true"
1352        );
1353    }
1354
1355    #[test]
1356    fn test_coercing_primitive_into_string() {
1357        let buf = r#"
1358        {"a": 1, "b": 2, "c": true}
1359        {"a": 2E0, "b": 4, "c": false}
1360
1361        {"b": 6, "a": 2.0}
1362        {"b": "5", "a": 2}
1363        {"b": 4e0}
1364        {"b": 7, "a": null}
1365        "#;
1366
1367        let schema = Arc::new(Schema::new(vec![
1368            Field::new("a", DataType::Utf8, true),
1369            Field::new("b", DataType::Utf8, true),
1370            Field::new("c", DataType::Utf8, true),
1371        ]));
1372
1373        let batches = do_read(buf, 1024, true, false, schema);
1374        assert_eq!(batches.len(), 1);
1375
1376        let col1 = batches[0].column(0).as_string::<i32>();
1377        assert_eq!(col1.null_count(), 2);
1378        assert_eq!(col1.value(0), "1");
1379        assert_eq!(col1.value(1), "2E0");
1380        assert_eq!(col1.value(2), "2.0");
1381        assert_eq!(col1.value(3), "2");
1382        assert!(col1.is_null(4));
1383        assert!(col1.is_null(5));
1384
1385        let col2 = batches[0].column(1).as_string::<i32>();
1386        assert_eq!(col2.null_count(), 0);
1387        assert_eq!(col2.value(0), "2");
1388        assert_eq!(col2.value(1), "4");
1389        assert_eq!(col2.value(2), "6");
1390        assert_eq!(col2.value(3), "5");
1391        assert_eq!(col2.value(4), "4e0");
1392        assert_eq!(col2.value(5), "7");
1393
1394        let col3 = batches[0].column(2).as_string::<i32>();
1395        assert_eq!(col3.null_count(), 4);
1396        assert_eq!(col3.value(0), "true");
1397        assert_eq!(col3.value(1), "false");
1398        assert!(col3.is_null(2));
1399        assert!(col3.is_null(3));
1400        assert!(col3.is_null(4));
1401        assert!(col3.is_null(5));
1402    }
1403
1404    fn test_decimal<T: DecimalType>(data_type: DataType) {
1405        let buf = r#"
1406        {"a": 1, "b": 2, "c": 38.30}
1407        {"a": 2, "b": 4, "c": 123.456}
1408
1409        {"b": 1337, "a": "2.0452"}
1410        {"b": "5", "a": "11034.2"}
1411        {"b": 40}
1412        {"b": 1234, "a": null}
1413        "#;
1414
1415        let schema = Arc::new(Schema::new(vec![
1416            Field::new("a", data_type.clone(), true),
1417            Field::new("b", data_type.clone(), true),
1418            Field::new("c", data_type, true),
1419        ]));
1420
1421        let batches = do_read(buf, 1024, true, false, schema);
1422        assert_eq!(batches.len(), 1);
1423
1424        let col1 = batches[0].column(0).as_primitive::<T>();
1425        assert_eq!(col1.null_count(), 2);
1426        assert!(col1.is_null(4));
1427        assert!(col1.is_null(5));
1428        assert_eq!(
1429            col1.values(),
1430            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1431        );
1432
1433        let col2 = batches[0].column(1).as_primitive::<T>();
1434        assert_eq!(col2.null_count(), 0);
1435        assert_eq!(
1436            col2.values(),
1437            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1438        );
1439
1440        let col3 = batches[0].column(2).as_primitive::<T>();
1441        assert_eq!(col3.null_count(), 4);
1442        assert!(!col3.is_null(0));
1443        assert!(!col3.is_null(1));
1444        assert!(col3.is_null(2));
1445        assert!(col3.is_null(3));
1446        assert!(col3.is_null(4));
1447        assert!(col3.is_null(5));
1448        assert_eq!(
1449            col3.values(),
1450            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1451        );
1452    }
1453
1454    #[test]
1455    fn test_decimals() {
1456        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1457        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1458        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1459        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1460    }
1461
1462    fn test_timestamp<T: ArrowTimestampType>() {
1463        let buf = r#"
1464        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1465        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1466
1467        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1468        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1469        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1470        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1471        "#;
1472
1473        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1474        let schema = Arc::new(Schema::new(vec![
1475            Field::new("a", T::DATA_TYPE, true),
1476            Field::new("b", T::DATA_TYPE, true),
1477            Field::new("c", T::DATA_TYPE, true),
1478            Field::new("d", with_timezone, true),
1479        ]));
1480
1481        let batches = do_read(buf, 1024, true, false, schema);
1482        assert_eq!(batches.len(), 1);
1483
1484        let unit_in_nanos: i64 = match T::UNIT {
1485            TimeUnit::Second => 1_000_000_000,
1486            TimeUnit::Millisecond => 1_000_000,
1487            TimeUnit::Microsecond => 1_000,
1488            TimeUnit::Nanosecond => 1,
1489        };
1490
1491        let col1 = batches[0].column(0).as_primitive::<T>();
1492        assert_eq!(col1.null_count(), 4);
1493        assert!(col1.is_null(2));
1494        assert!(col1.is_null(3));
1495        assert!(col1.is_null(4));
1496        assert!(col1.is_null(5));
1497        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1498
1499        let col2 = batches[0].column(1).as_primitive::<T>();
1500        assert_eq!(col2.null_count(), 1);
1501        assert!(col2.is_null(5));
1502        assert_eq!(
1503            col2.values(),
1504            &[
1505                1599572549190855000 / unit_in_nanos,
1506                1599572549190855000 / unit_in_nanos,
1507                1599572549000000000 / unit_in_nanos,
1508                40,
1509                1234,
1510                0
1511            ]
1512        );
1513
1514        let col3 = batches[0].column(2).as_primitive::<T>();
1515        assert_eq!(col3.null_count(), 0);
1516        assert_eq!(
1517            col3.values(),
1518            &[
1519                38,
1520                123,
1521                854702816123000000 / unit_in_nanos,
1522                1599572549190855000 / unit_in_nanos,
1523                854702816123000000 / unit_in_nanos,
1524                854738816123000000 / unit_in_nanos
1525            ]
1526        );
1527
1528        let col4 = batches[0].column(3).as_primitive::<T>();
1529
1530        assert_eq!(col4.null_count(), 0);
1531        assert_eq!(
1532            col4.values(),
1533            &[
1534                854674016123000000 / unit_in_nanos,
1535                123,
1536                854702816123000000 / unit_in_nanos,
1537                854720816123000000 / unit_in_nanos,
1538                854674016000000000 / unit_in_nanos,
1539                854640000000000000 / unit_in_nanos
1540            ]
1541        );
1542    }
1543
1544    #[test]
1545    fn test_timestamps() {
1546        test_timestamp::<TimestampSecondType>();
1547        test_timestamp::<TimestampMillisecondType>();
1548        test_timestamp::<TimestampMicrosecondType>();
1549        test_timestamp::<TimestampNanosecondType>();
1550    }
1551
1552    fn test_time<T: ArrowTemporalType>() {
1553        let buf = r#"
1554        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1555        {"a": 2, "b": "23:59:59", "c": 123.456}
1556
1557        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1558        {"b": 40, "c": "13:42:29.190855"}
1559        {"b": 1234, "a": null, "c": "09:26:56.123"}
1560        {"c": "14:26:56.123"}
1561        "#;
1562
1563        let unit = match T::DATA_TYPE {
1564            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1565            _ => unreachable!(),
1566        };
1567
1568        let unit_in_nanos = match unit {
1569            TimeUnit::Second => 1_000_000_000,
1570            TimeUnit::Millisecond => 1_000_000,
1571            TimeUnit::Microsecond => 1_000,
1572            TimeUnit::Nanosecond => 1,
1573        };
1574
1575        let schema = Arc::new(Schema::new(vec![
1576            Field::new("a", T::DATA_TYPE, true),
1577            Field::new("b", T::DATA_TYPE, true),
1578            Field::new("c", T::DATA_TYPE, true),
1579        ]));
1580
1581        let batches = do_read(buf, 1024, true, false, schema);
1582        assert_eq!(batches.len(), 1);
1583
1584        let col1 = batches[0].column(0).as_primitive::<T>();
1585        assert_eq!(col1.null_count(), 4);
1586        assert!(col1.is_null(2));
1587        assert!(col1.is_null(3));
1588        assert!(col1.is_null(4));
1589        assert!(col1.is_null(5));
1590        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1591
1592        let col2 = batches[0].column(1).as_primitive::<T>();
1593        assert_eq!(col2.null_count(), 1);
1594        assert!(col2.is_null(5));
1595        assert_eq!(
1596            col2.values(),
1597            &[
1598                34016123000000 / unit_in_nanos,
1599                86399000000000 / unit_in_nanos,
1600                64800000000000 / unit_in_nanos,
1601                40,
1602                1234,
1603                0
1604            ]
1605            .map(T::Native::usize_as)
1606        );
1607
1608        let col3 = batches[0].column(2).as_primitive::<T>();
1609        assert_eq!(col3.null_count(), 0);
1610        assert_eq!(
1611            col3.values(),
1612            &[
1613                38,
1614                123,
1615                34016123000000 / unit_in_nanos,
1616                49349190855000 / unit_in_nanos,
1617                34016123000000 / unit_in_nanos,
1618                52016123000000 / unit_in_nanos
1619            ]
1620            .map(T::Native::usize_as)
1621        );
1622    }
1623
1624    #[test]
1625    fn test_times() {
1626        test_time::<Time32MillisecondType>();
1627        test_time::<Time32SecondType>();
1628        test_time::<Time64MicrosecondType>();
1629        test_time::<Time64NanosecondType>();
1630    }
1631
1632    fn test_duration<T: ArrowTemporalType>() {
1633        let buf = r#"
1634        {"a": 1, "b": "2"}
1635        {"a": 3, "b": null}
1636        "#;
1637
1638        let schema = Arc::new(Schema::new(vec![
1639            Field::new("a", T::DATA_TYPE, true),
1640            Field::new("b", T::DATA_TYPE, true),
1641        ]));
1642
1643        let batches = do_read(buf, 1024, true, false, schema);
1644        assert_eq!(batches.len(), 1);
1645
1646        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1647        assert_eq!(col_a.null_count(), 0);
1648        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1649
1650        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1651        assert_eq!(col2.null_count(), 1);
1652        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1653    }
1654
1655    #[test]
1656    fn test_durations() {
1657        test_duration::<DurationNanosecondType>();
1658        test_duration::<DurationMicrosecondType>();
1659        test_duration::<DurationMillisecondType>();
1660        test_duration::<DurationSecondType>();
1661    }
1662
1663    #[test]
1664    fn test_delta_checkpoint() {
1665        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1666        let schema = Arc::new(Schema::new(vec![
1667            Field::new_struct(
1668                "protocol",
1669                vec![
1670                    Field::new("minReaderVersion", DataType::Int32, true),
1671                    Field::new("minWriterVersion", DataType::Int32, true),
1672                ],
1673                true,
1674            ),
1675            Field::new_struct(
1676                "add",
1677                vec![Field::new_map(
1678                    "partitionValues",
1679                    "key_value",
1680                    Field::new("key", DataType::Utf8, false),
1681                    Field::new("value", DataType::Utf8, true),
1682                    false,
1683                    false,
1684                )],
1685                true,
1686            ),
1687        ]));
1688
1689        let batches = do_read(json, 1024, true, false, schema);
1690        assert_eq!(batches.len(), 1);
1691
1692        let s: StructArray = batches.into_iter().next().unwrap().into();
1693        let opts = FormatOptions::default().with_null("null");
1694        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1695        assert_eq!(
1696            formatter.value(0).to_string(),
1697            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1698        );
1699    }
1700
1701    #[test]
1702    fn struct_nullability() {
1703        let do_test = |child: DataType| {
1704            // Test correctly enforced nullability
1705            let non_null = r#"{"foo": {}}"#;
1706            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1707                "foo",
1708                vec![Field::new("bar", child, false)],
1709                true,
1710            )]));
1711            let mut reader = ReaderBuilder::new(schema.clone())
1712                .build(Cursor::new(non_null.as_bytes()))
1713                .unwrap();
1714            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1715
1716            let null = r#"{"foo": {bar: null}}"#;
1717            let mut reader = ReaderBuilder::new(schema.clone())
1718                .build(Cursor::new(null.as_bytes()))
1719                .unwrap();
1720            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1721
1722            // Test nulls in nullable parent can mask nulls in non-nullable child
1723            let null = r#"{"foo": null}"#;
1724            let mut reader = ReaderBuilder::new(schema)
1725                .build(Cursor::new(null.as_bytes()))
1726                .unwrap();
1727            let batch = reader.next().unwrap().unwrap();
1728            assert_eq!(batch.num_columns(), 1);
1729            let foo = batch.column(0).as_struct();
1730            assert_eq!(foo.len(), 1);
1731            assert!(foo.is_null(0));
1732            assert_eq!(foo.num_columns(), 1);
1733
1734            let bar = foo.column(0);
1735            assert_eq!(bar.len(), 1);
1736            // Non-nullable child can still contain null as masked by parent
1737            assert!(bar.is_null(0));
1738        };
1739
1740        do_test(DataType::Boolean);
1741        do_test(DataType::Int32);
1742        do_test(DataType::Utf8);
1743        do_test(DataType::Decimal128(2, 1));
1744        do_test(DataType::Timestamp(
1745            TimeUnit::Microsecond,
1746            Some("+00:00".into()),
1747        ));
1748    }
1749
1750    #[test]
1751    fn test_truncation() {
1752        let buf = r#"
1753        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1754        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1755        {"i64": -9223372036854775808, "u64": 0 }
1756        {"i64": "-9223372036854775808", "u64": 0 }
1757        "#;
1758
1759        let schema = Arc::new(Schema::new(vec![
1760            Field::new("i64", DataType::Int64, true),
1761            Field::new("u64", DataType::UInt64, true),
1762        ]));
1763
1764        let batches = do_read(buf, 1024, true, false, schema);
1765        assert_eq!(batches.len(), 1);
1766
1767        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1768        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1769
1770        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1771        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1772    }
1773
1774    #[test]
1775    fn test_timestamp_truncation() {
1776        let buf = r#"
1777        {"time": 9223372036854775807 }
1778        {"time": -9223372036854775808 }
1779        {"time": 9e5 }
1780        "#;
1781
1782        let schema = Arc::new(Schema::new(vec![Field::new(
1783            "time",
1784            DataType::Timestamp(TimeUnit::Nanosecond, None),
1785            true,
1786        )]));
1787
1788        let batches = do_read(buf, 1024, true, false, schema);
1789        assert_eq!(batches.len(), 1);
1790
1791        let i64 = batches[0]
1792            .column(0)
1793            .as_primitive::<TimestampNanosecondType>();
1794        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1795    }
1796
1797    #[test]
1798    fn test_strict_mode_no_missing_columns_in_schema() {
1799        let buf = r#"
1800        {"a": 1, "b": "2", "c": true}
1801        {"a": 2E0, "b": "4", "c": false}
1802        "#;
1803
1804        let schema = Arc::new(Schema::new(vec![
1805            Field::new("a", DataType::Int16, false),
1806            Field::new("b", DataType::Utf8, false),
1807            Field::new("c", DataType::Boolean, false),
1808        ]));
1809
1810        let batches = do_read(buf, 1024, true, true, schema);
1811        assert_eq!(batches.len(), 1);
1812
1813        let buf = r#"
1814        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1815        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1816        "#;
1817
1818        let schema = Arc::new(Schema::new(vec![
1819            Field::new("a", DataType::Int16, false),
1820            Field::new("b", DataType::Utf8, false),
1821            Field::new_struct(
1822                "c",
1823                vec![
1824                    Field::new("a", DataType::Boolean, false),
1825                    Field::new("b", DataType::Int16, false),
1826                ],
1827                false,
1828            ),
1829        ]));
1830
1831        let batches = do_read(buf, 1024, true, true, schema);
1832        assert_eq!(batches.len(), 1);
1833    }
1834
1835    #[test]
1836    fn test_strict_mode_missing_columns_in_schema() {
1837        let buf = r#"
1838        {"a": 1, "b": "2", "c": true}
1839        {"a": 2E0, "b": "4", "c": false}
1840        "#;
1841
1842        let schema = Arc::new(Schema::new(vec![
1843            Field::new("a", DataType::Int16, true),
1844            Field::new("c", DataType::Boolean, true),
1845        ]));
1846
1847        let err = ReaderBuilder::new(schema)
1848            .with_batch_size(1024)
1849            .with_strict_mode(true)
1850            .build(Cursor::new(buf.as_bytes()))
1851            .unwrap()
1852            .read()
1853            .unwrap_err();
1854
1855        assert_eq!(
1856            err.to_string(),
1857            "Json error: column 'b' missing from schema"
1858        );
1859
1860        let buf = r#"
1861        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1862        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1863        "#;
1864
1865        let schema = Arc::new(Schema::new(vec![
1866            Field::new("a", DataType::Int16, false),
1867            Field::new("b", DataType::Utf8, false),
1868            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1869        ]));
1870
1871        let err = ReaderBuilder::new(schema)
1872            .with_batch_size(1024)
1873            .with_strict_mode(true)
1874            .build(Cursor::new(buf.as_bytes()))
1875            .unwrap()
1876            .read()
1877            .unwrap_err();
1878
1879        assert_eq!(
1880            err.to_string(),
1881            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1882        );
1883    }
1884
1885    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1886        let file = File::open(path).unwrap();
1887        let mut reader = BufReader::new(file);
1888        let schema = schema.unwrap_or_else(|| {
1889            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1890            reader.rewind().unwrap();
1891            schema
1892        });
1893        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1894        builder.build(reader).unwrap()
1895    }
1896
1897    #[test]
1898    fn test_json_basic() {
1899        let mut reader = read_file("test/data/basic.json", None);
1900        let batch = reader.next().unwrap().unwrap();
1901
1902        assert_eq!(8, batch.num_columns());
1903        assert_eq!(12, batch.num_rows());
1904
1905        let schema = reader.schema();
1906        let batch_schema = batch.schema();
1907        assert_eq!(schema, batch_schema);
1908
1909        let a = schema.column_with_name("a").unwrap();
1910        assert_eq!(0, a.0);
1911        assert_eq!(&DataType::Int64, a.1.data_type());
1912        let b = schema.column_with_name("b").unwrap();
1913        assert_eq!(1, b.0);
1914        assert_eq!(&DataType::Float64, b.1.data_type());
1915        let c = schema.column_with_name("c").unwrap();
1916        assert_eq!(2, c.0);
1917        assert_eq!(&DataType::Boolean, c.1.data_type());
1918        let d = schema.column_with_name("d").unwrap();
1919        assert_eq!(3, d.0);
1920        assert_eq!(&DataType::Utf8, d.1.data_type());
1921
1922        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1923        assert_eq!(1, aa.value(0));
1924        assert_eq!(-10, aa.value(1));
1925        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1926        assert_eq!(2.0, bb.value(0));
1927        assert_eq!(-3.5, bb.value(1));
1928        let cc = batch.column(c.0).as_boolean();
1929        assert!(!cc.value(0));
1930        assert!(cc.value(10));
1931        let dd = batch.column(d.0).as_string::<i32>();
1932        assert_eq!("4", dd.value(0));
1933        assert_eq!("text", dd.value(8));
1934    }
1935
1936    #[test]
1937    fn test_json_empty_projection() {
1938        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1939        let batch = reader.next().unwrap().unwrap();
1940
1941        assert_eq!(0, batch.num_columns());
1942        assert_eq!(12, batch.num_rows());
1943    }
1944
1945    #[test]
1946    fn test_json_basic_with_nulls() {
1947        let mut reader = read_file("test/data/basic_nulls.json", None);
1948        let batch = reader.next().unwrap().unwrap();
1949
1950        assert_eq!(4, batch.num_columns());
1951        assert_eq!(12, batch.num_rows());
1952
1953        let schema = reader.schema();
1954        let batch_schema = batch.schema();
1955        assert_eq!(schema, batch_schema);
1956
1957        let a = schema.column_with_name("a").unwrap();
1958        assert_eq!(&DataType::Int64, a.1.data_type());
1959        let b = schema.column_with_name("b").unwrap();
1960        assert_eq!(&DataType::Float64, b.1.data_type());
1961        let c = schema.column_with_name("c").unwrap();
1962        assert_eq!(&DataType::Boolean, c.1.data_type());
1963        let d = schema.column_with_name("d").unwrap();
1964        assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967        assert!(aa.is_valid(0));
1968        assert!(!aa.is_valid(1));
1969        assert!(!aa.is_valid(11));
1970        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1971        assert!(bb.is_valid(0));
1972        assert!(!bb.is_valid(2));
1973        assert!(!bb.is_valid(11));
1974        let cc = batch.column(c.0).as_boolean();
1975        assert!(cc.is_valid(0));
1976        assert!(!cc.is_valid(4));
1977        assert!(!cc.is_valid(11));
1978        let dd = batch.column(d.0).as_string::<i32>();
1979        assert!(!dd.is_valid(0));
1980        assert!(dd.is_valid(1));
1981        assert!(!dd.is_valid(4));
1982        assert!(!dd.is_valid(11));
1983    }
1984
1985    #[test]
1986    fn test_json_basic_schema() {
1987        let schema = Schema::new(vec![
1988            Field::new("a", DataType::Int64, true),
1989            Field::new("b", DataType::Float32, false),
1990            Field::new("c", DataType::Boolean, false),
1991            Field::new("d", DataType::Utf8, false),
1992        ]);
1993
1994        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1995        let reader_schema = reader.schema();
1996        assert_eq!(reader_schema.as_ref(), &schema);
1997        let batch = reader.next().unwrap().unwrap();
1998
1999        assert_eq!(4, batch.num_columns());
2000        assert_eq!(12, batch.num_rows());
2001
2002        let schema = batch.schema();
2003
2004        let a = schema.column_with_name("a").unwrap();
2005        assert_eq!(&DataType::Int64, a.1.data_type());
2006        let b = schema.column_with_name("b").unwrap();
2007        assert_eq!(&DataType::Float32, b.1.data_type());
2008        let c = schema.column_with_name("c").unwrap();
2009        assert_eq!(&DataType::Boolean, c.1.data_type());
2010        let d = schema.column_with_name("d").unwrap();
2011        assert_eq!(&DataType::Utf8, d.1.data_type());
2012
2013        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2014        assert_eq!(1, aa.value(0));
2015        assert_eq!(100000000000000, aa.value(11));
2016        let bb = batch.column(b.0).as_primitive::<Float32Type>();
2017        assert_eq!(2.0, bb.value(0));
2018        assert_eq!(-3.5, bb.value(1));
2019    }
2020
2021    #[test]
2022    fn test_json_basic_schema_projection() {
2023        let schema = Schema::new(vec![
2024            Field::new("a", DataType::Int64, true),
2025            Field::new("c", DataType::Boolean, false),
2026        ]);
2027
2028        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
2029        let batch = reader.next().unwrap().unwrap();
2030
2031        assert_eq!(2, batch.num_columns());
2032        assert_eq!(2, batch.schema().fields().len());
2033        assert_eq!(12, batch.num_rows());
2034
2035        assert_eq!(batch.schema().as_ref(), &schema);
2036
2037        let a = schema.column_with_name("a").unwrap();
2038        assert_eq!(0, a.0);
2039        assert_eq!(&DataType::Int64, a.1.data_type());
2040        let c = schema.column_with_name("c").unwrap();
2041        assert_eq!(1, c.0);
2042        assert_eq!(&DataType::Boolean, c.1.data_type());
2043    }
2044
2045    #[test]
2046    fn test_json_arrays() {
2047        let mut reader = read_file("test/data/arrays.json", None);
2048        let batch = reader.next().unwrap().unwrap();
2049
2050        assert_eq!(4, batch.num_columns());
2051        assert_eq!(3, batch.num_rows());
2052
2053        let schema = batch.schema();
2054
2055        let a = schema.column_with_name("a").unwrap();
2056        assert_eq!(&DataType::Int64, a.1.data_type());
2057        let b = schema.column_with_name("b").unwrap();
2058        assert_eq!(
2059            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2060            b.1.data_type()
2061        );
2062        let c = schema.column_with_name("c").unwrap();
2063        assert_eq!(
2064            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2065            c.1.data_type()
2066        );
2067        let d = schema.column_with_name("d").unwrap();
2068        assert_eq!(&DataType::Utf8, d.1.data_type());
2069
2070        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2071        assert_eq!(1, aa.value(0));
2072        assert_eq!(-10, aa.value(1));
2073        assert_eq!(1627668684594000000, aa.value(2));
2074        let bb = batch.column(b.0).as_list::<i32>();
2075        let bb = bb.values().as_primitive::<Float64Type>();
2076        assert_eq!(9, bb.len());
2077        assert_eq!(2.0, bb.value(0));
2078        assert_eq!(-6.1, bb.value(5));
2079        assert!(!bb.is_valid(7));
2080
2081        let cc = batch
2082            .column(c.0)
2083            .as_any()
2084            .downcast_ref::<ListArray>()
2085            .unwrap();
2086        let cc = cc.values().as_boolean();
2087        assert_eq!(6, cc.len());
2088        assert!(!cc.value(0));
2089        assert!(!cc.value(4));
2090        assert!(!cc.is_valid(5));
2091    }
2092
2093    #[test]
2094    fn test_empty_json_arrays() {
2095        let json_content = r#"
2096            {"items": []}
2097            {"items": null}
2098            {}
2099            "#;
2100
2101        let schema = Arc::new(Schema::new(vec![Field::new(
2102            "items",
2103            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2104            true,
2105        )]));
2106
2107        let batches = do_read(json_content, 1024, false, false, schema);
2108        assert_eq!(batches.len(), 1);
2109
2110        let col1 = batches[0].column(0).as_list::<i32>();
2111        assert_eq!(col1.null_count(), 2);
2112        assert!(col1.value(0).is_empty());
2113        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2114        assert!(col1.is_null(1));
2115        assert!(col1.is_null(2));
2116    }
2117
2118    #[test]
2119    fn test_nested_empty_json_arrays() {
2120        let json_content = r#"
2121            {"items": [[],[]]}
2122            {"items": [[null, null],[null]]}
2123            "#;
2124
2125        let schema = Arc::new(Schema::new(vec![Field::new(
2126            "items",
2127            DataType::List(FieldRef::new(Field::new_list_field(
2128                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2129                true,
2130            ))),
2131            true,
2132        )]));
2133
2134        let batches = do_read(json_content, 1024, false, false, schema);
2135        assert_eq!(batches.len(), 1);
2136
2137        let col1 = batches[0].column(0).as_list::<i32>();
2138        assert_eq!(col1.null_count(), 0);
2139        assert_eq!(col1.value(0).len(), 2);
2140        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2141        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2142
2143        assert_eq!(col1.value(1).len(), 2);
2144        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2145        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2146    }
2147
2148    #[test]
2149    fn test_nested_list_json_arrays() {
2150        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2151        let a_struct_field = Field::new_struct(
2152            "a",
2153            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2154            true,
2155        );
2156        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2157        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2158        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2159        let json_content = r#"
2160        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2161        {"a": [{"b": false, "c": null}]}
2162        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2163        {"a": null}
2164        {"a": []}
2165        {"a": [null]}
2166        "#;
2167        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2168
2169        // build expected output
2170        let d = StringArray::from(vec![
2171            Some("a_text"),
2172            Some("b_text"),
2173            None,
2174            Some("c_text"),
2175            Some("d_text"),
2176            None,
2177            None,
2178        ]);
2179        let c = StructArray::new(
2180            vec![Field::new("d", DataType::Utf8, true)].into(),
2181            vec![Arc::new(d.clone()) as ArrayRef],
2182            Some(NullBuffer::from(vec![
2183                true, true, false, true, true, true, false,
2184            ])),
2185        );
2186        let b = BooleanArray::from(vec![
2187            Some(true),
2188            Some(false),
2189            Some(false),
2190            Some(true),
2191            None,
2192            Some(true),
2193            None,
2194        ]);
2195        let a = StructArray::new(
2196            vec![Field::new("b", DataType::Boolean, true), c_field.clone()].into(),
2197            vec![
2198                Arc::new(b.clone()) as ArrayRef,
2199                Arc::new(c.clone()) as ArrayRef,
2200            ],
2201            Some(NullBuffer::from(vec![
2202                true, true, true, true, true, true, false,
2203            ])),
2204        );
2205        let a_list = ListArray::new(
2206            Arc::new(a_struct_field.clone()),
2207            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 6, 6, 6, 7])),
2208            Arc::new(a),
2209            Some(NullBuffer::from(vec![true, true, true, false, true, true])),
2210        );
2211
2212        // compare `a` with result from json reader
2213        let batch = reader.next().unwrap().unwrap();
2214        let read = batch.column(0);
2215        assert_eq!(read.len(), 6);
2216        // compare the arrays the long way around, to better detect differences
2217        let read: &ListArray = read.as_list::<i32>();
2218        let expected = &a_list;
2219        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2220        // compare list null buffers
2221        assert_eq!(read.nulls(), expected.nulls());
2222        // build struct from list
2223        let struct_array = read.values().as_struct();
2224        let expected_struct_array = expected.values().as_struct();
2225
2226        assert_eq!(7, struct_array.len());
2227        assert_eq!(1, struct_array.null_count());
2228        assert_eq!(7, expected_struct_array.len());
2229        assert_eq!(1, expected_struct_array.null_count());
2230        // test struct's nulls
2231        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2232        // test struct's fields
2233        let read_b = struct_array.column(0);
2234        assert_eq!(read_b.as_ref(), &b);
2235        let read_c = struct_array.column(1);
2236        assert_eq!(read_c.as_struct(), &c);
2237        let read_c = read_c.as_struct();
2238        let read_d = read_c.column(0);
2239        assert_eq!(read_d.as_ref(), &d);
2240
2241        assert_eq!(read, expected);
2242    }
2243
2244    fn assert_read_list_view<O: OffsetSizeTrait>() {
2245        let field = Arc::new(Field::new("item", DataType::Int32, true));
2246        let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
2247        let schema = Arc::new(Schema::new(vec![Field::new("lv", data_type, true)]));
2248
2249        let buf = r#"
2250        {"lv": [1, 2, 3]}
2251        {"lv": [4, null]}
2252        {"lv": null}
2253        {"lv": [6]}
2254        {"lv": []}
2255        "#;
2256
2257        let batches = do_read(buf, 1024, false, false, schema);
2258        assert_eq!(batches.len(), 1);
2259        let batch = &batches[0];
2260        let col = batch.column(0);
2261        let list_view = col
2262            .as_any()
2263            .downcast_ref::<GenericListViewArray<O>>()
2264            .unwrap();
2265
2266        assert_eq!(list_view.len(), 5);
2267
2268        // Check offsets and sizes
2269        let expected_offsets: Vec<O> = vec![0, 3, 5, 5, 6]
2270            .into_iter()
2271            .map(|v| O::usize_as(v))
2272            .collect();
2273        let expected_sizes: Vec<O> = vec![3, 2, 0, 1, 0]
2274            .into_iter()
2275            .map(|v| O::usize_as(v))
2276            .collect();
2277        assert_eq!(list_view.value_offsets(), &expected_offsets);
2278        assert_eq!(list_view.value_sizes(), &expected_sizes);
2279
2280        // Row 0: [1, 2, 3]
2281        assert!(list_view.is_valid(0));
2282        let vals = list_view.value(0);
2283        let ints = vals.as_primitive::<Int32Type>();
2284        assert_eq!(ints.values(), &[1, 2, 3]);
2285
2286        // Row 1: [4, null]
2287        assert!(list_view.is_valid(1));
2288        let vals = list_view.value(1);
2289        let ints = vals.as_primitive::<Int32Type>();
2290        assert_eq!(ints.len(), 2);
2291        assert_eq!(ints.value(0), 4);
2292        assert!(ints.is_null(1));
2293
2294        // Row 2: null
2295        assert!(list_view.is_null(2));
2296
2297        // Row 3: [6]
2298        assert!(list_view.is_valid(3));
2299        let vals = list_view.value(3);
2300        let ints = vals.as_primitive::<Int32Type>();
2301        assert_eq!(ints.values(), &[6]);
2302
2303        // Row 4: []
2304        assert!(list_view.is_valid(4));
2305        let vals = list_view.value(4);
2306        assert_eq!(vals.len(), 0);
2307    }
2308
2309    #[test]
2310    fn test_read_list_view() {
2311        assert_read_list_view::<i32>();
2312        assert_read_list_view::<i64>();
2313    }
2314
2315    #[test]
2316    fn test_fixed_size_list() {
2317        let buf = r#"
2318        {"a": [1, 2, 3]}
2319        {"a": [4, 5, 6]}
2320        {"a": [7, 8, 9]}
2321        "#;
2322
2323        let field = Field::new_list_field(DataType::Int32, true);
2324        let schema = Arc::new(Schema::new(vec![Field::new(
2325            "a",
2326            DataType::FixedSizeList(Arc::new(field), 3),
2327            false,
2328        )]));
2329
2330        let batches = do_read(buf, 1024, false, false, schema);
2331        assert_eq!(batches.len(), 1);
2332
2333        let col = batches[0].column(0).as_fixed_size_list();
2334        assert_eq!(col.len(), 3);
2335        assert_eq!(col.value_length(), 3);
2336
2337        let values = col.values().as_primitive::<Int32Type>();
2338        assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
2339    }
2340
2341    #[test]
2342    fn test_fixed_size_list_nullable() {
2343        let buf = r#"
2344        {"a": [1, 2]}
2345        {"a": null}
2346        {"a": [3, null]}
2347        "#;
2348
2349        let field = Field::new_list_field(DataType::Int32, true);
2350        let schema = Arc::new(Schema::new(vec![Field::new(
2351            "a",
2352            DataType::FixedSizeList(Arc::new(field), 2),
2353            true,
2354        )]));
2355
2356        let batches = do_read(buf, 1024, false, false, schema);
2357        assert_eq!(batches.len(), 1);
2358
2359        let col = batches[0].column(0).as_fixed_size_list();
2360        assert_eq!(col.len(), 3);
2361        assert!(col.is_valid(0));
2362        assert!(col.is_null(1));
2363        assert!(col.is_valid(2));
2364
2365        let values = col.values().as_primitive::<Int32Type>();
2366        assert_eq!(values.value(0), 1);
2367        assert_eq!(values.value(1), 2);
2368        assert_eq!(values.value(4), 3);
2369        assert!(values.is_null(5));
2370    }
2371
2372    #[test]
2373    fn test_fixed_size_list_zero_size_non_nullable() {
2374        let buf = r#"
2375        {"a": []}
2376        {"a": []}
2377        {"a": []}
2378        "#;
2379
2380        let field = Field::new_list_field(DataType::Int32, true);
2381        let schema = Arc::new(Schema::new(vec![Field::new(
2382            "a",
2383            DataType::FixedSizeList(Arc::new(field), 0),
2384            false,
2385        )]));
2386
2387        let batches = do_read(buf, 1024, false, false, schema);
2388        assert_eq!(batches.len(), 1);
2389
2390        let col = batches[0].column(0).as_fixed_size_list();
2391        assert_eq!(col.len(), 3);
2392        assert_eq!(col.value_length(), 0);
2393
2394        let values = col.values().as_primitive::<Int32Type>();
2395        assert!(values.values().is_empty());
2396    }
2397
2398    #[test]
2399    fn test_fixed_size_list_wrong_size() {
2400        let buf = r#"{"a": [1, 2, 3]}"#;
2401
2402        let field = Field::new_list_field(DataType::Int32, true);
2403        let schema = Arc::new(Schema::new(vec![Field::new(
2404            "a",
2405            DataType::FixedSizeList(Arc::new(field), 2),
2406            false,
2407        )]));
2408
2409        let err = ReaderBuilder::new(schema)
2410            .build(Cursor::new(buf.as_bytes()))
2411            .unwrap()
2412            .next()
2413            .unwrap()
2414            .unwrap_err();
2415
2416        assert!(err.to_string().contains("expected 2 but got 3"), "{}", err);
2417    }
2418
2419    #[test]
2420    fn test_fixed_size_list_nested() {
2421        let buf = r#"
2422        {"a": [[1, 2], [3, 4]]}
2423        {"a": [[5, 6], [7, 8]]}
2424        "#;
2425
2426        let inner_field = Field::new_list_field(DataType::Int32, true);
2427        let inner_type = DataType::FixedSizeList(Arc::new(inner_field), 2);
2428        let outer_field = Arc::new(Field::new_list_field(inner_type.clone(), true));
2429        let schema = Arc::new(Schema::new(vec![Field::new(
2430            "a",
2431            DataType::FixedSizeList(outer_field, 2),
2432            false,
2433        )]));
2434
2435        let batches = do_read(buf, 1024, false, false, schema);
2436        assert_eq!(batches.len(), 1);
2437
2438        let col = batches[0].column(0).as_fixed_size_list();
2439        assert_eq!(col.len(), 2);
2440        assert_eq!(col.value_length(), 2);
2441
2442        let inner = col.values().as_fixed_size_list();
2443        assert_eq!(inner.len(), 4);
2444        assert_eq!(inner.value_length(), 2);
2445
2446        let values = inner.values().as_primitive::<Int32Type>();
2447        assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8]);
2448    }
2449
2450    #[test]
2451    fn test_fixed_size_list_ignore_type_conflicts() {
2452        let field = Field::new("item", DataType::Int32, true);
2453        let schema = Arc::new(Schema::new(vec![Field::new(
2454            "a",
2455            DataType::FixedSizeList(Arc::new(field), 2),
2456            true,
2457        )]));
2458
2459        let json = vec![
2460            json!({"a": [1, 2]}),
2461            json!({"a": "not a list"}),
2462            json!({"a": 42}),
2463            json!({"a": [6, 7]}),
2464        ];
2465
2466        let mut decoder = ReaderBuilder::new(schema)
2467            .with_ignore_type_conflicts(true)
2468            .build_decoder()
2469            .unwrap();
2470        decoder.serialize(&json).unwrap();
2471        let batch = decoder.flush().unwrap().unwrap();
2472
2473        let col = batch.column(0).as_fixed_size_list();
2474        assert_eq!(col.len(), 4);
2475        assert!(col.is_valid(0));
2476        assert!(col.is_null(1)); // string -> null
2477        assert!(col.is_null(2)); // number -> null
2478        assert!(col.is_valid(3));
2479
2480        let values = col.values().as_primitive::<Int32Type>();
2481        assert_eq!(values.value(0), 1);
2482        assert_eq!(values.value(1), 2);
2483        assert_eq!(values.value(6), 6);
2484        assert_eq!(values.value(7), 7);
2485    }
2486
2487    #[test]
2488    fn test_skip_empty_lines() {
2489        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2490        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2491        let json_content = "
2492        {\"a\": 1}
2493        {\"a\": 2}
2494        {\"a\": 3}";
2495        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2496        let batch = reader.next().unwrap().unwrap();
2497
2498        assert_eq!(1, batch.num_columns());
2499        assert_eq!(3, batch.num_rows());
2500
2501        let schema = reader.schema();
2502        let c = schema.column_with_name("a").unwrap();
2503        assert_eq!(&DataType::Int64, c.1.data_type());
2504    }
2505
2506    #[test]
2507    fn test_with_multiple_batches() {
2508        let file = File::open("test/data/basic_nulls.json").unwrap();
2509        let mut reader = BufReader::new(file);
2510        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2511        reader.rewind().unwrap();
2512
2513        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2514        let mut reader = builder.build(reader).unwrap();
2515
2516        let mut num_records = Vec::new();
2517        while let Some(rb) = reader.next().transpose().unwrap() {
2518            num_records.push(rb.num_rows());
2519        }
2520
2521        assert_eq!(vec![5, 5, 2], num_records);
2522    }
2523
2524    #[test]
2525    fn test_timestamp_from_json_seconds() {
2526        let schema = Schema::new(vec![Field::new(
2527            "a",
2528            DataType::Timestamp(TimeUnit::Second, None),
2529            true,
2530        )]);
2531
2532        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2533        let batch = reader.next().unwrap().unwrap();
2534
2535        assert_eq!(1, batch.num_columns());
2536        assert_eq!(12, batch.num_rows());
2537
2538        let schema = reader.schema();
2539        let batch_schema = batch.schema();
2540        assert_eq!(schema, batch_schema);
2541
2542        let a = schema.column_with_name("a").unwrap();
2543        assert_eq!(
2544            &DataType::Timestamp(TimeUnit::Second, None),
2545            a.1.data_type()
2546        );
2547
2548        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2549        assert!(aa.is_valid(0));
2550        assert!(!aa.is_valid(1));
2551        assert!(!aa.is_valid(2));
2552        assert_eq!(1, aa.value(0));
2553        assert_eq!(1, aa.value(3));
2554        assert_eq!(5, aa.value(7));
2555    }
2556
2557    #[test]
2558    fn test_timestamp_from_json_milliseconds() {
2559        let schema = Schema::new(vec![Field::new(
2560            "a",
2561            DataType::Timestamp(TimeUnit::Millisecond, None),
2562            true,
2563        )]);
2564
2565        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2566        let batch = reader.next().unwrap().unwrap();
2567
2568        assert_eq!(1, batch.num_columns());
2569        assert_eq!(12, batch.num_rows());
2570
2571        let schema = reader.schema();
2572        let batch_schema = batch.schema();
2573        assert_eq!(schema, batch_schema);
2574
2575        let a = schema.column_with_name("a").unwrap();
2576        assert_eq!(
2577            &DataType::Timestamp(TimeUnit::Millisecond, None),
2578            a.1.data_type()
2579        );
2580
2581        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2582        assert!(aa.is_valid(0));
2583        assert!(!aa.is_valid(1));
2584        assert!(!aa.is_valid(2));
2585        assert_eq!(1, aa.value(0));
2586        assert_eq!(1, aa.value(3));
2587        assert_eq!(5, aa.value(7));
2588    }
2589
2590    #[test]
2591    fn test_date_from_json_milliseconds() {
2592        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2593
2594        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2595        let batch = reader.next().unwrap().unwrap();
2596
2597        assert_eq!(1, batch.num_columns());
2598        assert_eq!(12, batch.num_rows());
2599
2600        let schema = reader.schema();
2601        let batch_schema = batch.schema();
2602        assert_eq!(schema, batch_schema);
2603
2604        let a = schema.column_with_name("a").unwrap();
2605        assert_eq!(&DataType::Date64, a.1.data_type());
2606
2607        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2608        assert!(aa.is_valid(0));
2609        assert!(!aa.is_valid(1));
2610        assert!(!aa.is_valid(2));
2611        assert_eq!(1, aa.value(0));
2612        assert_eq!(1, aa.value(3));
2613        assert_eq!(5, aa.value(7));
2614    }
2615
2616    #[test]
2617    fn test_time_from_json_nanoseconds() {
2618        let schema = Schema::new(vec![Field::new(
2619            "a",
2620            DataType::Time64(TimeUnit::Nanosecond),
2621            true,
2622        )]);
2623
2624        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2625        let batch = reader.next().unwrap().unwrap();
2626
2627        assert_eq!(1, batch.num_columns());
2628        assert_eq!(12, batch.num_rows());
2629
2630        let schema = reader.schema();
2631        let batch_schema = batch.schema();
2632        assert_eq!(schema, batch_schema);
2633
2634        let a = schema.column_with_name("a").unwrap();
2635        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2636
2637        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2638        assert!(aa.is_valid(0));
2639        assert!(!aa.is_valid(1));
2640        assert!(!aa.is_valid(2));
2641        assert_eq!(1, aa.value(0));
2642        assert_eq!(1, aa.value(3));
2643        assert_eq!(5, aa.value(7));
2644    }
2645
2646    #[test]
2647    fn test_json_iterator() {
2648        let file = File::open("test/data/basic.json").unwrap();
2649        let mut reader = BufReader::new(file);
2650        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2651        reader.rewind().unwrap();
2652
2653        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2654        let reader = builder.build(reader).unwrap();
2655        let schema = reader.schema();
2656        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2657
2658        let mut sum_num_rows = 0;
2659        let mut num_batches = 0;
2660        let mut sum_a = 0;
2661        for batch in reader {
2662            let batch = batch.unwrap();
2663            assert_eq!(8, batch.num_columns());
2664            sum_num_rows += batch.num_rows();
2665            num_batches += 1;
2666            let batch_schema = batch.schema();
2667            assert_eq!(schema, batch_schema);
2668            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2669            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2670        }
2671        assert_eq!(12, sum_num_rows);
2672        assert_eq!(3, num_batches);
2673        assert_eq!(100000000000011, sum_a);
2674    }
2675
2676    #[test]
2677    fn test_decoder_error() {
2678        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2679            "a",
2680            vec![Field::new("child", DataType::Int32, false)],
2681            true,
2682        )]));
2683
2684        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2685        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2686        assert!(decoder.tape_decoder.has_partial_row());
2687        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2688        let _ = decoder.flush().unwrap_err();
2689        assert!(decoder.tape_decoder.has_partial_row());
2690        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2691
2692        let parse_err = |s: &str| {
2693            ReaderBuilder::new(schema.clone())
2694                .build(Cursor::new(s.as_bytes()))
2695                .unwrap()
2696                .next()
2697                .unwrap()
2698                .unwrap_err()
2699                .to_string()
2700        };
2701
2702        let err = parse_err(r#"{"a": 123}"#);
2703        assert_eq!(
2704            err,
2705            "Json error: whilst decoding field 'a': expected { got 123"
2706        );
2707
2708        let err = parse_err(r#"{"a": ["bar"]}"#);
2709        assert_eq!(
2710            err,
2711            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2712        );
2713
2714        let err = parse_err(r#"{"a": []}"#);
2715        assert_eq!(
2716            err,
2717            "Json error: whilst decoding field 'a': expected { got []"
2718        );
2719
2720        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2721        assert_eq!(
2722            err,
2723            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2724        );
2725
2726        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2727        assert_eq!(
2728            err,
2729            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2730        );
2731
2732        let err = parse_err(r#"{"a": true}"#);
2733        assert_eq!(
2734            err,
2735            "Json error: whilst decoding field 'a': expected { got true"
2736        );
2737
2738        let err = parse_err(r#"{"a": false}"#);
2739        assert_eq!(
2740            err,
2741            "Json error: whilst decoding field 'a': expected { got false"
2742        );
2743
2744        let err = parse_err(r#"{"a": "foo"}"#);
2745        assert_eq!(
2746            err,
2747            "Json error: whilst decoding field 'a': expected { got \"foo\""
2748        );
2749
2750        let err = parse_err(r#"{"a": {"child": false}}"#);
2751        assert_eq!(
2752            err,
2753            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2754        );
2755
2756        let err = parse_err(r#"{"a": {"child": []}}"#);
2757        assert_eq!(
2758            err,
2759            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2760        );
2761
2762        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2763        assert_eq!(
2764            err,
2765            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2766        );
2767
2768        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2769        assert_eq!(
2770            err,
2771            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2772        );
2773    }
2774
2775    #[test]
2776    fn test_serialize_timestamp() {
2777        let json = vec![
2778            json!({"timestamp": 1681319393}),
2779            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2780        ];
2781        let schema = Schema::new(vec![Field::new(
2782            "timestamp",
2783            DataType::Timestamp(TimeUnit::Second, None),
2784            true,
2785        )]);
2786        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2787            .build_decoder()
2788            .unwrap();
2789        decoder.serialize(&json).unwrap();
2790        let batch = decoder.flush().unwrap().unwrap();
2791        assert_eq!(batch.num_rows(), 2);
2792        assert_eq!(batch.num_columns(), 1);
2793        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2794        assert_eq!(values.values(), &[1681319393, -7200]);
2795    }
2796
2797    #[test]
2798    fn test_serialize_decimal() {
2799        let json = vec![
2800            json!({"decimal": 1.234}),
2801            json!({"decimal": "1.234"}),
2802            json!({"decimal": 1234}),
2803            json!({"decimal": "1234"}),
2804        ];
2805        let schema = Schema::new(vec![Field::new(
2806            "decimal",
2807            DataType::Decimal128(10, 3),
2808            true,
2809        )]);
2810        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2811            .build_decoder()
2812            .unwrap();
2813        decoder.serialize(&json).unwrap();
2814        let batch = decoder.flush().unwrap().unwrap();
2815        assert_eq!(batch.num_rows(), 4);
2816        assert_eq!(batch.num_columns(), 1);
2817        let values = batch.column(0).as_primitive::<Decimal128Type>();
2818        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2819    }
2820
2821    #[test]
2822    fn test_serde_field() {
2823        let field = Field::new("int", DataType::Int32, true);
2824        let mut decoder = ReaderBuilder::new_with_field(field)
2825            .build_decoder()
2826            .unwrap();
2827        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2828        let b = decoder.flush().unwrap().unwrap();
2829        let values = b.column(0).as_primitive::<Int32Type>().values();
2830        assert_eq!(values, &[1, 2, 3, 4]);
2831    }
2832
2833    #[test]
2834    fn test_serde_large_numbers() {
2835        let field = Field::new("int", DataType::Int64, true);
2836        let mut decoder = ReaderBuilder::new_with_field(field)
2837            .build_decoder()
2838            .unwrap();
2839
2840        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2841        let b = decoder.flush().unwrap().unwrap();
2842        let values = b.column(0).as_primitive::<Int64Type>().values();
2843        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2844
2845        let field = Field::new(
2846            "int",
2847            DataType::Timestamp(TimeUnit::Microsecond, None),
2848            true,
2849        );
2850        let mut decoder = ReaderBuilder::new_with_field(field)
2851            .build_decoder()
2852            .unwrap();
2853
2854        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2855        let b = decoder.flush().unwrap().unwrap();
2856        let values = b
2857            .column(0)
2858            .as_primitive::<TimestampMicrosecondType>()
2859            .values();
2860        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2861    }
2862
2863    #[test]
2864    fn test_coercing_primitive_into_string_decoder() {
2865        let buf = &format!(
2866            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2867            (i32::MAX as i64 + 10),
2868            i64::MAX - 10
2869        );
2870        let schema = Schema::new(vec![
2871            Field::new("a", DataType::Float64, true),
2872            Field::new("b", DataType::Utf8, true),
2873            Field::new("c", DataType::Utf8, true),
2874        ]);
2875        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2876        let schema_ref = Arc::new(schema);
2877
2878        // read record batches
2879        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2880        let mut decoder = reader.build_decoder().unwrap();
2881        decoder.serialize(json_array.as_slice()).unwrap();
2882        let batch = decoder.flush().unwrap().unwrap();
2883        assert_eq!(
2884            batch,
2885            RecordBatch::try_new(
2886                schema_ref,
2887                vec![
2888                    Arc::new(Float64Array::from(vec![
2889                        1.0,
2890                        2.0,
2891                        (i32::MAX as i64 + 10) as f64,
2892                        (i64::MAX - 10) as f64
2893                    ])),
2894                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2895                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2896                ]
2897            )
2898            .unwrap()
2899        );
2900    }
2901
2902    // Parse the given `row` in `struct_mode` as a type given by fields.
2903    //
2904    // If as_struct == true, wrap the fields in a Struct field with name "r".
2905    // If as_struct == false, wrap the fields in a Schema.
2906    fn _parse_structs(
2907        row: &str,
2908        struct_mode: StructMode,
2909        fields: Fields,
2910        as_struct: bool,
2911    ) -> Result<RecordBatch, ArrowError> {
2912        let builder = if as_struct {
2913            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2914        } else {
2915            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2916        };
2917        builder
2918            .with_struct_mode(struct_mode)
2919            .build(Cursor::new(row.as_bytes()))
2920            .unwrap()
2921            .next()
2922            .unwrap()
2923    }
2924
2925    #[test]
2926    fn test_struct_decoding_list_length() {
2927        use arrow_array::array;
2928
2929        let row = "[1, 2]";
2930
2931        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2932        let too_few_fields = Fields::from(fields.clone());
2933        fields.push(Field::new("b", DataType::Int32, true));
2934        let correct_fields = Fields::from(fields.clone());
2935        fields.push(Field::new("c", DataType::Int32, true));
2936        let too_many_fields = Fields::from(fields.clone());
2937
2938        let parse = |fields: Fields, as_struct: bool| {
2939            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2940        };
2941
2942        let expected_row = StructArray::new(
2943            correct_fields.clone(),
2944            vec![
2945                Arc::new(array::Int32Array::from(vec![1])),
2946                Arc::new(array::Int32Array::from(vec![2])),
2947            ],
2948            None,
2949        );
2950        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2951
2952        assert_eq!(
2953            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2954            "Json error: found extra columns for 1 fields".to_string()
2955        );
2956        assert_eq!(
2957            parse(too_few_fields, false).unwrap_err().to_string(),
2958            "Json error: found extra columns for 1 fields".to_string()
2959        );
2960        assert_eq!(
2961            parse(correct_fields.clone(), true).unwrap(),
2962            RecordBatch::try_new(
2963                Arc::new(Schema::new(vec![row_field])),
2964                vec![Arc::new(expected_row.clone())]
2965            )
2966            .unwrap()
2967        );
2968        assert_eq!(
2969            parse(correct_fields, false).unwrap(),
2970            RecordBatch::from(expected_row)
2971        );
2972        assert_eq!(
2973            parse(too_many_fields.clone(), true)
2974                .unwrap_err()
2975                .to_string(),
2976            "Json error: found 2 columns for 3 fields".to_string()
2977        );
2978        assert_eq!(
2979            parse(too_many_fields, false).unwrap_err().to_string(),
2980            "Json error: found 2 columns for 3 fields".to_string()
2981        );
2982    }
2983
2984    #[test]
2985    fn test_struct_decoding() {
2986        use arrow_array::builder;
2987
2988        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2989        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2990        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2991
2992        let struct_fields = Fields::from(vec![
2993            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2994            Field::new_map(
2995                "c",
2996                "entries",
2997                Field::new("keys", DataType::Utf8, false),
2998                Field::new("values", DataType::Int32, true),
2999                false,
3000                false,
3001            ),
3002        ]);
3003
3004        let list_array =
3005            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
3006
3007        let map_array = {
3008            let mut map_builder = builder::MapBuilder::new(
3009                None,
3010                builder::StringBuilder::new(),
3011                builder::Int32Builder::new(),
3012            );
3013            map_builder.keys().append_value("d");
3014            map_builder.values().append_value(3);
3015            map_builder.append(true).unwrap();
3016            map_builder.finish()
3017        };
3018
3019        let struct_array = StructArray::new(
3020            struct_fields.clone(),
3021            vec![Arc::new(list_array), Arc::new(map_array)],
3022            None,
3023        );
3024
3025        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
3026        let schema = Arc::new(Schema::new(fields.clone()));
3027        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
3028
3029        let parse = |row: &str, struct_mode: StructMode| {
3030            _parse_structs(row, struct_mode, fields.clone(), false)
3031        };
3032
3033        assert_eq!(
3034            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
3035            expected
3036        );
3037        assert_eq!(
3038            parse(nested_list_json, StructMode::ObjectOnly)
3039                .unwrap_err()
3040                .to_string(),
3041            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
3042        );
3043        assert_eq!(
3044            parse(nested_mixed_json, StructMode::ObjectOnly)
3045                .unwrap_err()
3046                .to_string(),
3047            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
3048        );
3049
3050        assert_eq!(
3051            parse(nested_list_json, StructMode::ListOnly).unwrap(),
3052            expected
3053        );
3054        assert_eq!(
3055            parse(nested_object_json, StructMode::ListOnly)
3056                .unwrap_err()
3057                .to_string(),
3058            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
3059        );
3060        assert_eq!(
3061            parse(nested_mixed_json, StructMode::ListOnly)
3062                .unwrap_err()
3063                .to_string(),
3064            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
3065        );
3066    }
3067
3068    // Test cases:
3069    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
3070    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
3071    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
3072    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
3073    #[test]
3074    fn test_struct_decoding_empty_list() {
3075        let int_field = Field::new("a", DataType::Int32, true);
3076        let struct_field = Field::new(
3077            "r",
3078            DataType::Struct(Fields::from(vec![int_field.clone()])),
3079            true,
3080        );
3081
3082        let parse = |row: &str, as_struct: bool, field: Field| {
3083            _parse_structs(
3084                row,
3085                StructMode::ListOnly,
3086                Fields::from(vec![field]),
3087                as_struct,
3088            )
3089        };
3090
3091        // Missing fields
3092        assert_eq!(
3093            parse("[]", true, struct_field.clone())
3094                .unwrap_err()
3095                .to_string(),
3096            "Json error: found 0 columns for 1 fields".to_owned()
3097        );
3098        assert_eq!(
3099            parse("[]", false, int_field.clone())
3100                .unwrap_err()
3101                .to_string(),
3102            "Json error: found 0 columns for 1 fields".to_owned()
3103        );
3104        assert_eq!(
3105            parse("[]", false, struct_field.clone())
3106                .unwrap_err()
3107                .to_string(),
3108            "Json error: found 0 columns for 1 fields".to_owned()
3109        );
3110        assert_eq!(
3111            parse("[[]]", false, struct_field.clone())
3112                .unwrap_err()
3113                .to_string(),
3114            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
3115        );
3116    }
3117
3118    #[test]
3119    fn test_decode_list_struct_with_wrong_types() {
3120        let int_field = Field::new("a", DataType::Int32, true);
3121        let struct_field = Field::new(
3122            "r",
3123            DataType::Struct(Fields::from(vec![int_field.clone()])),
3124            true,
3125        );
3126
3127        let parse = |row: &str, as_struct: bool, field: Field| {
3128            _parse_structs(
3129                row,
3130                StructMode::ListOnly,
3131                Fields::from(vec![field]),
3132                as_struct,
3133            )
3134        };
3135
3136        // Wrong values
3137        assert_eq!(
3138            parse(r#"[["a"]]"#, false, struct_field.clone())
3139                .unwrap_err()
3140                .to_string(),
3141            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3142        );
3143        assert_eq!(
3144            parse(r#"[["a"]]"#, true, struct_field.clone())
3145                .unwrap_err()
3146                .to_string(),
3147            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3148        );
3149        assert_eq!(
3150            parse(r#"["a"]"#, true, int_field.clone())
3151                .unwrap_err()
3152                .to_string(),
3153            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3154        );
3155        assert_eq!(
3156            parse(r#"["a"]"#, false, int_field.clone())
3157                .unwrap_err()
3158                .to_string(),
3159            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3160        );
3161    }
3162
3163    #[test]
3164    fn test_type_conflict_nulls() {
3165        let schema = Schema::new(vec![
3166            Field::new("null", DataType::Null, true),
3167            Field::new("bool", DataType::Boolean, true),
3168            Field::new("primitive", DataType::Int32, true),
3169            Field::new("numeric", DataType::Decimal128(10, 3), true),
3170            Field::new("string", DataType::Utf8, true),
3171            Field::new("string_view", DataType::Utf8View, true),
3172            Field::new(
3173                "timestamp",
3174                DataType::Timestamp(TimeUnit::Second, None),
3175                true,
3176            ),
3177            Field::new(
3178                "array",
3179                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3180                true,
3181            ),
3182            Field::new(
3183                "map",
3184                DataType::Map(
3185                    Arc::new(Field::new(
3186                        "entries",
3187                        DataType::Struct(Fields::from(vec![
3188                            Field::new("keys", DataType::Utf8, false),
3189                            Field::new("values", DataType::Utf8, true),
3190                        ])),
3191                        false, // not nullable
3192                    )),
3193                    false, // not sorted
3194                ),
3195                true, // nullable
3196            ),
3197            Field::new(
3198                "struct",
3199                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3200                true,
3201            ),
3202        ]);
3203
3204        // A compatible value for each schema field above, in schema order
3205        let json_values = vec![
3206            json!(null),
3207            json!(true),
3208            json!(42),
3209            json!(1.234),
3210            json!("hi"),
3211            json!("ho"),
3212            json!("1970-01-01T00:00:00+02:00"),
3213            json!([1, "ho", 3]),
3214            json!({"k": "value"}),
3215            json!({"a": 1}),
3216        ];
3217
3218        // Create a set of JSON rows that rotates each value past every field
3219        let json: Vec<_> = (0..json_values.len())
3220            .map(|i| {
3221                let pairs = json_values[i..]
3222                    .iter()
3223                    .chain(json_values[..i].iter())
3224                    .zip(&schema.fields)
3225                    .map(|(v, f)| (f.name().to_string(), v.clone()))
3226                    .collect();
3227                serde_json::Value::Object(pairs)
3228            })
3229            .collect();
3230        let mut decoder = ReaderBuilder::new(Arc::new(schema))
3231            .with_ignore_type_conflicts(true)
3232            .with_coerce_primitive(true)
3233            .build_decoder()
3234            .unwrap();
3235        decoder.serialize(&json).unwrap();
3236        let batch = decoder.flush().unwrap().unwrap();
3237        assert_eq!(batch.num_rows(), 10);
3238        assert_eq!(batch.num_columns(), 10);
3239
3240        // NOTE: NullArray doesn't materialize any values (they're all NULL by definition)
3241        let _ = batch
3242            .column(0)
3243            .as_any()
3244            .downcast_ref::<NullArray>()
3245            .unwrap();
3246
3247        assert!(
3248            batch
3249                .column(1)
3250                .as_any()
3251                .downcast_ref::<BooleanArray>()
3252                .unwrap()
3253                .iter()
3254                .eq([
3255                    Some(true),
3256                    None,
3257                    None,
3258                    None,
3259                    None,
3260                    None,
3261                    None,
3262                    None,
3263                    None,
3264                    None
3265                ])
3266        );
3267
3268        assert!(batch.column(2).as_primitive::<Int32Type>().iter().eq([
3269            Some(42),
3270            Some(1),
3271            None,
3272            None,
3273            None,
3274            None,
3275            None,
3276            None,
3277            None,
3278            None
3279        ]));
3280
3281        assert!(batch.column(3).as_primitive::<Decimal128Type>().iter().eq([
3282            Some(1234),
3283            None,
3284            None,
3285            None,
3286            None,
3287            None,
3288            None,
3289            None,
3290            None,
3291            Some(42000)
3292        ]));
3293
3294        assert!(
3295            batch
3296                .column(4)
3297                .as_any()
3298                .downcast_ref::<StringArray>()
3299                .unwrap()
3300                .iter()
3301                .eq([
3302                    Some("hi"),
3303                    Some("ho"),
3304                    Some("1970-01-01T00:00:00+02:00"),
3305                    None,
3306                    None,
3307                    None,
3308                    None,
3309                    Some("true"),
3310                    Some("42"),
3311                    Some("1.234"),
3312                ])
3313        );
3314
3315        assert!(
3316            batch
3317                .column(5)
3318                .as_any()
3319                .downcast_ref::<StringViewArray>()
3320                .unwrap()
3321                .iter()
3322                .eq([
3323                    Some("ho"),
3324                    Some("1970-01-01T00:00:00+02:00"),
3325                    None,
3326                    None,
3327                    None,
3328                    None,
3329                    Some("true"),
3330                    Some("42"),
3331                    Some("1.234"),
3332                    Some("hi"),
3333                ])
3334        );
3335
3336        assert!(
3337            batch
3338                .column(6)
3339                .as_primitive::<TimestampSecondType>()
3340                .iter()
3341                .eq([
3342                    Some(-7200),
3343                    None,
3344                    None,
3345                    None,
3346                    None,
3347                    None,
3348                    Some(42),
3349                    None,
3350                    None,
3351                    None,
3352                ])
3353        );
3354
3355        let arrays = batch
3356            .column(7)
3357            .as_any()
3358            .downcast_ref::<ListArray>()
3359            .unwrap();
3360        assert_eq!(
3361            arrays.nulls(),
3362            Some(&NullBuffer::from(
3363                &[
3364                    true, false, false, false, false, false, false, false, false, false
3365                ][..]
3366            ))
3367        );
3368        assert_eq!(arrays.offsets()[1], 3);
3369        let array_values = arrays
3370            .values()
3371            .as_any()
3372            .downcast_ref::<Int32Array>()
3373            .unwrap();
3374        assert!(array_values.iter().eq([Some(1), None, Some(3)]));
3375
3376        let maps = batch.column(8).as_any().downcast_ref::<MapArray>().unwrap();
3377        assert_eq!(
3378            maps.nulls(),
3379            Some(&NullBuffer::from(
3380                // Both map and struct can parse
3381                &[
3382                    true, true, false, false, false, false, false, false, false, false
3383                ][..]
3384            ))
3385        );
3386        let map_keys = maps.keys().as_any().downcast_ref::<StringArray>().unwrap();
3387        assert!(map_keys.iter().eq([Some("k"), Some("a")]));
3388        let map_values = maps
3389            .values()
3390            .as_any()
3391            .downcast_ref::<StringArray>()
3392            .unwrap();
3393        assert!(map_values.iter().eq([Some("value"), Some("1")]));
3394
3395        let structs = batch
3396            .column(9)
3397            .as_any()
3398            .downcast_ref::<StructArray>()
3399            .unwrap();
3400        assert_eq!(
3401            structs.nulls(),
3402            Some(&NullBuffer::from(
3403                // Both map and struct can parse
3404                &[
3405                    true, false, false, false, false, false, false, false, false, true
3406                ][..]
3407            ))
3408        );
3409        let struct_fields = structs
3410            .column(0)
3411            .as_any()
3412            .downcast_ref::<Int32Array>()
3413            .unwrap();
3414        assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None]));
3415    }
3416
3417    #[test]
3418    fn test_type_conflict_non_nullable() {
3419        let fields = [
3420            Field::new("bool", DataType::Boolean, false),
3421            Field::new("primitive", DataType::Int32, false),
3422            Field::new("numeric", DataType::Decimal128(10, 3), false),
3423            Field::new("string", DataType::Utf8, false),
3424            Field::new("string_view", DataType::Utf8View, false),
3425            Field::new(
3426                "timestamp",
3427                DataType::Timestamp(TimeUnit::Second, None),
3428                false,
3429            ),
3430            Field::new(
3431                "array",
3432                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3433                false,
3434            ),
3435            Field::new(
3436                "fixed_size_list",
3437                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3438                false,
3439            ),
3440            Field::new(
3441                "map",
3442                DataType::Map(
3443                    Arc::new(Field::new(
3444                        "entries",
3445                        DataType::Struct(Fields::from(vec![
3446                            Field::new("keys", DataType::Utf8, false),
3447                            Field::new("values", DataType::Utf8, true),
3448                        ])),
3449                        false, // not nullable
3450                    )),
3451                    false, // not sorted
3452                ),
3453                false, // not nullable
3454            ),
3455            Field::new(
3456                "struct",
3457                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3458                false,
3459            ),
3460        ];
3461
3462        // Every field above will have a type conflict with at least one of these values
3463        let json_values = vec![json!(true), json!({"a": 1})];
3464
3465        for field in fields {
3466            let mut decoder = ReaderBuilder::new_with_field(field)
3467                .with_ignore_type_conflicts(true)
3468                .build_decoder()
3469                .unwrap();
3470            decoder.serialize(&json_values).unwrap();
3471            decoder
3472                .flush()
3473                .expect_err("type conflict on non-nullable type");
3474        }
3475    }
3476
3477    #[test]
3478    fn test_ignore_type_conflicts_disabled() {
3479        let fields = [
3480            Field::new("null", DataType::Null, true),
3481            Field::new("bool", DataType::Boolean, true),
3482            Field::new("primitive", DataType::Int32, true),
3483            Field::new("numeric", DataType::Decimal128(10, 3), true),
3484            Field::new("string", DataType::Utf8, true),
3485            Field::new("string_view", DataType::Utf8View, true),
3486            Field::new(
3487                "timestamp",
3488                DataType::Timestamp(TimeUnit::Second, None),
3489                true,
3490            ),
3491            Field::new(
3492                "array",
3493                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3494                true,
3495            ),
3496            Field::new(
3497                "fixed_size_list",
3498                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3499                true,
3500            ),
3501            Field::new(
3502                "map",
3503                DataType::Map(
3504                    Arc::new(Field::new(
3505                        "entries",
3506                        DataType::Struct(Fields::from(vec![
3507                            Field::new("keys", DataType::Utf8, false),
3508                            Field::new("values", DataType::Utf8, true),
3509                        ])),
3510                        false, // not nullable
3511                    )),
3512                    false, // not sorted
3513                ),
3514                true, // not nullable
3515            ),
3516            Field::new(
3517                "struct",
3518                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3519                true,
3520            ),
3521        ];
3522
3523        // Every field above will have a type conflict with at least one of these values
3524        let json_values = vec![json!(true), json!({"a": 1})];
3525
3526        for field in fields {
3527            let mut decoder = ReaderBuilder::new_with_field(field)
3528                .build_decoder()
3529                .unwrap();
3530            decoder.serialize(&json_values).unwrap();
3531            decoder
3532                .flush()
3533                .expect_err("type conflict on non-nullable type");
3534        }
3535    }
3536
3537    #[test]
3538    fn test_read_run_end_encoded() {
3539        let buf = r#"
3540        {"a": "x"}
3541        {"a": "x"}
3542        {"a": "y"}
3543        {"a": "y"}
3544        {"a": "y"}
3545        "#;
3546
3547        let ree_type = DataType::RunEndEncoded(
3548            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3549            Arc::new(Field::new("values", DataType::Utf8, true)),
3550        );
3551        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3552        let batches = do_read(buf, 1024, false, false, schema);
3553        assert_eq!(batches.len(), 1);
3554
3555        let col = batches[0].column(0);
3556        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3557
3558        // 5 logical values compressed into 2 runs
3559        assert_eq!(run_array.len(), 5);
3560        assert_eq!(run_array.run_ends().values(), &[2, 5]);
3561
3562        let values = run_array.values().as_string::<i32>();
3563        assert_eq!(values.len(), 2);
3564        assert_eq!(values.value(0), "x");
3565        assert_eq!(values.value(1), "y");
3566    }
3567
3568    #[test]
3569    fn test_read_run_end_encoded_consecutive_nulls() {
3570        let buf = r#"
3571        {"a": "x"}
3572        {}
3573        {}
3574        {}
3575        {"a": "y"}
3576        "#;
3577
3578        let ree_type = DataType::RunEndEncoded(
3579            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3580            Arc::new(Field::new("values", DataType::Utf8, true)),
3581        );
3582        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3583        let batches = do_read(buf, 1024, false, false, schema);
3584        assert_eq!(batches.len(), 1);
3585
3586        let col = batches[0].column(0);
3587        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3588
3589        // 5 logical values: "x", null, null, null, "y" → 3 runs
3590        assert_eq!(run_array.len(), 5);
3591        assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
3592
3593        let values = run_array.values().as_string::<i32>();
3594        assert_eq!(values.len(), 3);
3595        assert_eq!(values.value(0), "x");
3596        assert!(values.is_null(1));
3597        assert_eq!(values.value(2), "y");
3598    }
3599
3600    #[test]
3601    fn test_read_run_end_encoded_all_unique() {
3602        let buf = r#"
3603        {"a": 1}
3604        {"a": 2}
3605        {"a": 3}
3606        "#;
3607
3608        let ree_type = DataType::RunEndEncoded(
3609            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3610            Arc::new(Field::new("values", DataType::Int32, true)),
3611        );
3612        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3613        let batches = do_read(buf, 1024, false, false, schema);
3614        assert_eq!(batches.len(), 1);
3615
3616        let col = batches[0].column(0);
3617        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3618
3619        // No compression: 3 unique values → 3 runs
3620        assert_eq!(run_array.len(), 3);
3621        assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
3622    }
3623
3624    #[test]
3625    fn test_read_run_end_encoded_int16_run_ends() {
3626        let buf = r#"
3627        {"a": "x"}
3628        {"a": "x"}
3629        {"a": "y"}
3630        "#;
3631
3632        let ree_type = DataType::RunEndEncoded(
3633            Arc::new(Field::new("run_ends", DataType::Int16, false)),
3634            Arc::new(Field::new("values", DataType::Utf8, true)),
3635        );
3636        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3637        let batches = do_read(buf, 1024, false, false, schema);
3638        assert_eq!(batches.len(), 1);
3639
3640        let col = batches[0].column(0);
3641        let run_array = col.as_run::<arrow_array::types::Int16Type>();
3642
3643        assert_eq!(run_array.len(), 3);
3644        assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
3645    }
3646
3647    #[test]
3648    fn test_read_nested_run_end_encoded() {
3649        let buf = r#"
3650        {"a": "x"}
3651        {"a": "x"}
3652        {"a": "y"}
3653        "#;
3654
3655        // The outer REE compresses whole rows, while the inner REE compresses the
3656        // repeated string values produced by decoding those rows.
3657        let inner_type = DataType::RunEndEncoded(
3658            Arc::new(Field::new("run_ends", DataType::Int64, false)),
3659            Arc::new(Field::new("values", DataType::Utf8, true)),
3660        );
3661        let outer_type = DataType::RunEndEncoded(
3662            Arc::new(Field::new("run_ends", DataType::Int64, false)),
3663            Arc::new(Field::new("values", inner_type, true)),
3664        );
3665        let schema = Arc::new(Schema::new(vec![Field::new("a", outer_type, true)]));
3666        let batches = do_read(buf, 1024, false, false, schema);
3667        assert_eq!(batches.len(), 1);
3668
3669        let col = batches[0].column(0);
3670        let outer = col.as_run::<arrow_array::types::Int64Type>();
3671        // Three logical rows compress to two outer runs: ["x", "x"] and ["y"].
3672        assert_eq!(outer.len(), 3);
3673        assert_eq!(outer.run_ends().values(), &[2, 3]);
3674
3675        let nested = outer.values().as_run::<arrow_array::types::Int64Type>();
3676        // The physical values of the outer REE are themselves a two-element REE.
3677        assert_eq!(nested.len(), 2);
3678        assert_eq!(nested.run_ends().values(), &[1, 2]);
3679
3680        let nested_values = nested.values().as_string::<i32>();
3681        assert_eq!(nested_values.len(), 2);
3682        assert_eq!(nested_values.value(0), "x");
3683        assert_eq!(nested_values.value(1), "y");
3684    }
3685}