Skip to main content

arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use std::borrow::Cow;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use arrow_array::cast::AsArray;
141use arrow_array::timezone::Tz;
142use arrow_array::types::*;
143use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer};
144use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
145use chrono::Utc;
146use serde_core::Serialize;
147
148use crate::StructMode;
149use crate::reader::binary_array::{
150    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
151};
152use crate::reader::boolean_array::BooleanArrayDecoder;
153use crate::reader::decimal_array::DecimalArrayDecoder;
154use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder};
155use crate::reader::map_array::MapArrayDecoder;
156use crate::reader::null_array::NullArrayDecoder;
157use crate::reader::primitive_array::PrimitiveArrayDecoder;
158use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
159use crate::reader::string_array::StringArrayDecoder;
160use crate::reader::string_view_array::StringViewArrayDecoder;
161use crate::reader::struct_array::StructArrayDecoder;
162use crate::reader::tape::{Tape, TapeDecoder};
163use crate::reader::timestamp_array::TimestampArrayDecoder;
164
165pub use schema::*;
166pub use value_iter::ValueIter;
167
168mod binary_array;
169mod boolean_array;
170mod decimal_array;
171mod list_array;
172mod map_array;
173mod null_array;
174mod primitive_array;
175mod run_end_array;
176mod schema;
177mod serializer;
178mod string_array;
179mod string_view_array;
180mod struct_array;
181mod tape;
182mod timestamp_array;
183mod value_iter;
184
185/// A builder for [`Reader`] and [`Decoder`]
186pub struct ReaderBuilder {
187    batch_size: usize,
188    coerce_primitive: bool,
189    strict_mode: bool,
190    ignore_type_conflicts: bool,
191    is_field: bool,
192    struct_mode: StructMode,
193
194    schema: SchemaRef,
195}
196
197impl ReaderBuilder {
198    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
199    ///
200    /// This could be obtained using [`infer_json_schema`] if not known
201    ///
202    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
203    /// In this case, an error is returned when a column is missing from `schema`.
204    ///
205    /// [`infer_json_schema`]: crate::reader::infer_json_schema
206    pub fn new(schema: SchemaRef) -> Self {
207        Self {
208            batch_size: 1024,
209            coerce_primitive: false,
210            strict_mode: false,
211            ignore_type_conflicts: false,
212            is_field: false,
213            struct_mode: Default::default(),
214            schema,
215        }
216    }
217
218    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
219    ///
220    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
221    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
222    ///
223    /// ```
224    /// # use std::sync::Arc;
225    /// # use arrow_array::cast::AsArray;
226    /// # use arrow_array::types::Int32Type;
227    /// # use arrow_json::ReaderBuilder;
228    /// # use arrow_schema::{DataType, Field};
229    /// // Root of JSON schema is a numeric type
230    /// let data = "1\n2\n3\n";
231    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
232    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
233    /// let b = reader.next().unwrap().unwrap();
234    /// let values = b.column(0).as_primitive::<Int32Type>().values();
235    /// assert_eq!(values, &[1, 2, 3]);
236    ///
237    /// // Root of JSON schema is a list type
238    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
239    /// let field = Field::new_list("int", field.clone(), true);
240    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
241    /// let b = reader.next().unwrap().unwrap();
242    /// let list = b.column(0).as_list::<i32>();
243    ///
244    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
245    /// let list_values = list.values().as_primitive::<Int32Type>();
246    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
247    /// ```
248    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
249        Self {
250            batch_size: 1024,
251            coerce_primitive: false,
252            strict_mode: false,
253            ignore_type_conflicts: false,
254            is_field: true,
255            struct_mode: Default::default(),
256            schema: Arc::new(Schema::new([field.into()])),
257        }
258    }
259
260    /// Sets the batch size in rows to read
261    pub fn with_batch_size(self, batch_size: usize) -> Self {
262        Self { batch_size, ..self }
263    }
264
265    /// Sets if the decoder should coerce primitive values (bool and number) into string
266    /// when the Schema's column is Utf8 or LargeUtf8.
267    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
268        Self {
269            coerce_primitive,
270            ..self
271        }
272    }
273
274    /// Sets if the decoder should return an error if it encounters a column not
275    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
276    /// `strict_mode` is effectively `true`. It is required for all fields of
277    /// the struct to be in the list: without field names, there is no way to
278    /// determine which field is missing.
279    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
280        Self {
281            strict_mode,
282            ..self
283        }
284    }
285
286    /// Set the [`StructMode`] for the reader, which determines whether structs
287    /// can be decoded from JSON as objects or lists. For more details refer to
288    /// the enum documentation. Default is to use `ObjectOnly`.
289    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
290        Self {
291            struct_mode,
292            ..self
293        }
294    }
295
296    /// Sets whether the decoder should produce NULL instead of returning an error if it encounters
297    /// value that can not be parsed into the specified column type.
298    ///
299    /// For example, if the type is declared to be a nullable array of `DataType::Int32` but the
300    /// reader encounters a string value `"foo"` and the value `ignore_type_conflicts` is:
301    ///
302    /// * `false` (the default): The reader will return an error.
303    ///
304    /// * `true`: The reader will fill in NULL value for that array element.
305    ///
306    /// NOTE: An inferred NULL due to a type conflict will still produce parsing errors for
307    /// non-nullable fields, the same as any other NULL or missing value.
308    pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self {
309        Self {
310            ignore_type_conflicts,
311            ..self
312        }
313    }
314
315    /// Create a [`Reader`] with the provided [`BufRead`]
316    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
317        Ok(Reader {
318            reader,
319            decoder: self.build_decoder()?,
320        })
321    }
322
323    /// Create a [`Decoder`]
324    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
325        let (data_type, nullable) = if self.is_field {
326            let field = &self.schema.fields[0];
327            let data_type = Cow::Borrowed(field.data_type());
328            (data_type, field.is_nullable())
329        } else {
330            let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
331            (data_type, false)
332        };
333
334        let ctx = DecoderContext {
335            coerce_primitive: self.coerce_primitive,
336            strict_mode: self.strict_mode,
337            struct_mode: self.struct_mode,
338            ignore_type_conflicts: self.ignore_type_conflicts,
339        };
340        let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
341
342        let num_fields = self.schema.flattened_fields().len();
343
344        Ok(Decoder {
345            decoder,
346            is_field: self.is_field,
347            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
348            batch_size: self.batch_size,
349            schema: self.schema,
350        })
351    }
352}
353
354/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
355///
356/// Lines consisting solely of ASCII whitespace are ignored
357pub struct Reader<R> {
358    reader: R,
359    decoder: Decoder,
360}
361
362impl<R> std::fmt::Debug for Reader<R> {
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364        f.debug_struct("Reader")
365            .field("decoder", &self.decoder)
366            .finish()
367    }
368}
369
370impl<R: BufRead> Reader<R> {
371    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
372    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
373        loop {
374            let buf = self.reader.fill_buf()?;
375            if buf.is_empty() {
376                break;
377            }
378            let read = buf.len();
379
380            let decoded = self.decoder.decode(buf)?;
381            self.reader.consume(decoded);
382            if decoded != read {
383                break;
384            }
385        }
386        self.decoder.flush()
387    }
388}
389
390impl<R: BufRead> Iterator for Reader<R> {
391    type Item = Result<RecordBatch, ArrowError>;
392
393    fn next(&mut self) -> Option<Self::Item> {
394        self.read().transpose()
395    }
396}
397
398impl<R: BufRead> RecordBatchReader for Reader<R> {
399    fn schema(&self) -> SchemaRef {
400        self.decoder.schema.clone()
401    }
402}
403
404/// A low-level interface for reading JSON data from a byte stream
405///
406/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
407///
408/// The push-based interface facilitates integration with sources that yield arbitrarily
409/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
410/// object storage
411///
412/// ```
413/// # use std::io::BufRead;
414/// # use arrow_array::RecordBatch;
415/// # use arrow_json::reader::{Decoder, ReaderBuilder};
416/// # use arrow_schema::{ArrowError, SchemaRef};
417/// #
418/// fn read_from_json<R: BufRead>(
419///     mut reader: R,
420///     schema: SchemaRef,
421/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
422///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
423///     let mut next = move || {
424///         loop {
425///             // Decoder is agnostic that buf doesn't contain whole records
426///             let buf = reader.fill_buf()?;
427///             if buf.is_empty() {
428///                 break; // Input exhausted
429///             }
430///             let read = buf.len();
431///             let decoded = decoder.decode(buf)?;
432///
433///             // Consume the number of bytes read
434///             reader.consume(decoded);
435///             if decoded != read {
436///                 break; // Read batch size
437///             }
438///         }
439///         decoder.flush()
440///     };
441///     Ok(std::iter::from_fn(move || next().transpose()))
442/// }
443/// ```
444pub struct Decoder {
445    tape_decoder: TapeDecoder,
446    decoder: Box<dyn ArrayDecoder>,
447    batch_size: usize,
448    is_field: bool,
449    schema: SchemaRef,
450}
451
452impl std::fmt::Debug for Decoder {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        f.debug_struct("Decoder")
455            .field("schema", &self.schema)
456            .field("batch_size", &self.batch_size)
457            .finish()
458    }
459}
460
461impl Decoder {
462    /// Read JSON objects from `buf`, returning the number of bytes read
463    ///
464    /// This method returns once `batch_size` objects have been parsed since the
465    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
466    /// should be included in the next call to [`Self::decode`]
467    ///
468    /// There is no requirement that `buf` contains a whole number of records, facilitating
469    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
470    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
471        self.tape_decoder.decode(buf)
472    }
473
474    /// Serialize `rows` to this [`Decoder`]
475    ///
476    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
477    /// [`RecordBatch`].
478    ///
479    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
480    /// especially where the schema is known at compile-time, however, this provides a mechanism
481    /// to get something up and running quickly
482    ///
483    /// It can be used with [`serde_json::Value`]
484    ///
485    /// ```
486    /// # use std::sync::Arc;
487    /// # use serde_json::{Value, json};
488    /// # use arrow_array::cast::AsArray;
489    /// # use arrow_array::types::Float32Type;
490    /// # use arrow_json::ReaderBuilder;
491    /// # use arrow_schema::{DataType, Field, Schema};
492    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
493    ///
494    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
495    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
496    ///
497    /// decoder.serialize(&json).unwrap();
498    /// let batch = decoder.flush().unwrap().unwrap();
499    /// assert_eq!(batch.num_rows(), 2);
500    /// assert_eq!(batch.num_columns(), 1);
501    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
502    /// assert_eq!(values, &[2.3, 5.7])
503    /// ```
504    ///
505    /// Or with arbitrary [`Serialize`] types
506    ///
507    /// ```
508    /// # use std::sync::Arc;
509    /// # use arrow_json::ReaderBuilder;
510    /// # use arrow_schema::{DataType, Field, Schema};
511    /// # use serde::Serialize;
512    /// # use arrow_array::cast::AsArray;
513    /// # use arrow_array::types::{Float32Type, Int32Type};
514    /// #
515    /// #[derive(Serialize)]
516    /// struct MyStruct {
517    ///     int32: i32,
518    ///     float: f32,
519    /// }
520    ///
521    /// let schema = Schema::new(vec![
522    ///     Field::new("int32", DataType::Int32, false),
523    ///     Field::new("float", DataType::Float32, false),
524    /// ]);
525    ///
526    /// let rows = vec![
527    ///     MyStruct{ int32: 0, float: 3. },
528    ///     MyStruct{ int32: 4, float: 67.53 },
529    /// ];
530    ///
531    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
532    /// decoder.serialize(&rows).unwrap();
533    ///
534    /// let batch = decoder.flush().unwrap().unwrap();
535    ///
536    /// // Expect batch containing two columns
537    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
538    /// assert_eq!(int32.values(), &[0, 4]);
539    ///
540    /// let float = batch.column(1).as_primitive::<Float32Type>();
541    /// assert_eq!(float.values(), &[3., 67.53]);
542    /// ```
543    ///
544    /// Or even complex nested types
545    ///
546    /// ```
547    /// # use std::collections::BTreeMap;
548    /// # use std::sync::Arc;
549    /// # use arrow_array::StructArray;
550    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
551    /// # use arrow_json::ReaderBuilder;
552    /// # use arrow_schema::{DataType, Field, Fields, Schema};
553    /// # use serde::Serialize;
554    /// #
555    /// #[derive(Serialize)]
556    /// struct MyStruct {
557    ///     int32: i32,
558    ///     list: Vec<f64>,
559    ///     nested: Vec<Option<Nested>>,
560    /// }
561    ///
562    /// impl MyStruct {
563    ///     /// Returns the [`Fields`] for [`MyStruct`]
564    ///     fn fields() -> Fields {
565    ///         let nested = DataType::Struct(Nested::fields());
566    ///         Fields::from([
567    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
568    ///             Arc::new(Field::new_list(
569    ///                 "list",
570    ///                 Field::new("element", DataType::Float64, false),
571    ///                 false,
572    ///             )),
573    ///             Arc::new(Field::new_list(
574    ///                 "nested",
575    ///                 Field::new("element", nested, true),
576    ///                 true,
577    ///             )),
578    ///         ])
579    ///     }
580    /// }
581    ///
582    /// #[derive(Serialize)]
583    /// struct Nested {
584    ///     map: BTreeMap<String, Vec<String>>
585    /// }
586    ///
587    /// impl Nested {
588    ///     /// Returns the [`Fields`] for [`Nested`]
589    ///     fn fields() -> Fields {
590    ///         let element = Field::new("element", DataType::Utf8, false);
591    ///         Fields::from([
592    ///             Arc::new(Field::new_map(
593    ///                 "map",
594    ///                 "entries",
595    ///                 Field::new("key", DataType::Utf8, false),
596    ///                 Field::new_list("value", element, false),
597    ///                 false, // sorted
598    ///                 false, // nullable
599    ///             ))
600    ///         ])
601    ///     }
602    /// }
603    ///
604    /// let data = vec![
605    ///     MyStruct {
606    ///         int32: 34,
607    ///         list: vec![1., 2., 34.],
608    ///         nested: vec![
609    ///             None,
610    ///             Some(Nested {
611    ///                 map: vec![
612    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
613    ///                     ("key2".to_string(), vec!["baz".to_string()])
614    ///                 ].into_iter().collect()
615    ///             })
616    ///         ]
617    ///     },
618    ///     MyStruct {
619    ///         int32: 56,
620    ///         list: vec![],
621    ///         nested: vec![]
622    ///     },
623    ///     MyStruct {
624    ///         int32: 24,
625    ///         list: vec![-1., 245.],
626    ///         nested: vec![None]
627    ///     }
628    /// ];
629    ///
630    /// let schema = Schema::new(MyStruct::fields());
631    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
632    /// decoder.serialize(&data).unwrap();
633    /// let batch = decoder.flush().unwrap().unwrap();
634    /// assert_eq!(batch.num_rows(), 3);
635    /// assert_eq!(batch.num_columns(), 3);
636    ///
637    /// // Convert to StructArray to format
638    /// let s = StructArray::from(batch);
639    /// let options = FormatOptions::default().with_null("null");
640    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
641    ///
642    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
643    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
644    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
645    /// ```
646    ///
647    /// Note: this ignores any batch size setting, and always decodes all rows
648    ///
649    /// [serde]: https://docs.rs/serde/latest/serde/
650    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
651        self.tape_decoder.serialize(rows)
652    }
653
654    /// True if the decoder is currently part way through decoding a record.
655    pub fn has_partial_record(&self) -> bool {
656        self.tape_decoder.has_partial_row()
657    }
658
659    /// The number of unflushed records, including the partially decoded record (if any).
660    pub fn len(&self) -> usize {
661        self.tape_decoder.num_buffered_rows()
662    }
663
664    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
665    pub fn is_empty(&self) -> bool {
666        self.len() == 0
667    }
668
669    /// Flushes the currently buffered data to a [`RecordBatch`]
670    ///
671    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
672    ///
673    /// Note: This will return an error if called part way through decoding a record,
674    /// i.e. [`Self::has_partial_record`] is true.
675    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
676        let tape = self.tape_decoder.finish()?;
677
678        if tape.num_rows() == 0 {
679            return Ok(None);
680        }
681
682        // First offset is null sentinel
683        let mut next_object = 1;
684        let pos: Vec<_> = (0..tape.num_rows())
685            .map(|_| {
686                let next = tape.next(next_object, "row").unwrap();
687                std::mem::replace(&mut next_object, next)
688            })
689            .collect();
690
691        let decoded = self.decoder.decode(&tape, &pos)?;
692        self.tape_decoder.clear();
693
694        let batch = match self.is_field {
695            true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?,
696            false => {
697                RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())?
698            }
699        };
700
701        Ok(Some(batch))
702    }
703}
704
705trait ArrayDecoder: Send {
706    /// Decode elements from `tape` starting at the indexes contained in `pos`
707    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError>;
708}
709
710/// Context for decoder creation, containing configuration.
711///
712/// This context is passed through the decoder creation process and contains
713/// all the configuration needed to create decoders recursively.
714pub struct DecoderContext {
715    /// Whether to coerce primitives to strings
716    coerce_primitive: bool,
717    /// Whether to validate struct fields strictly
718    strict_mode: bool,
719    /// How to decode struct fields
720    struct_mode: StructMode,
721    /// Whether to treat columns with incompatible types as missing (i.e. NULL)
722    ignore_type_conflicts: bool,
723}
724
725impl DecoderContext {
726    /// Returns whether to coerce primitive types (e.g., number to string)
727    pub fn coerce_primitive(&self) -> bool {
728        self.coerce_primitive
729    }
730
731    /// Returns whether to validate struct fields strictly
732    pub fn strict_mode(&self) -> bool {
733        self.strict_mode
734    }
735
736    /// Returns how to decode struct fields
737    pub fn struct_mode(&self) -> StructMode {
738        self.struct_mode
739    }
740
741    /// Returns whether to treat columns with incompatible types as missing (i.e. NULL)
742    pub fn ignore_type_conflicts(&self) -> bool {
743        self.ignore_type_conflicts
744    }
745
746    /// Create a decoder for a type.
747    ///
748    /// This is the standard way to create child decoders from within a decoder
749    /// implementation.
750    fn make_decoder(
751        &self,
752        data_type: &DataType,
753        is_nullable: bool,
754    ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
755        make_decoder(self, data_type, is_nullable)
756    }
757}
758
759fn make_decoder(
760    ctx: &DecoderContext,
761    data_type: &DataType,
762    is_nullable: bool,
763) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
764    macro_rules! primitive_decoder {
765        ($t:ty, $data_type:expr) => {
766            Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type)))
767        };
768    }
769    macro_rules! timestamp_decoder {
770        ($t:ty, $data_type:expr, $tz:expr) => {{
771            Ok(Box::new(TimestampArrayDecoder::<$t, _>::new(
772                ctx, $data_type, $tz,
773            )))
774        }};
775    }
776    macro_rules! decimal_decoder {
777        ($t:ty, $p:expr, $s:expr) => {
778            Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s)))
779        };
780    }
781
782    downcast_integer! {
783        *data_type => (primitive_decoder, data_type),
784        DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))),
785        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
786        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
787        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
788        DataType::Timestamp(TimeUnit::Second, None) => {
789            timestamp_decoder!(TimestampSecondType, data_type, Utc)
790        },
791        DataType::Timestamp(TimeUnit::Millisecond, None) => {
792            timestamp_decoder!(TimestampMillisecondType, data_type, Utc)
793        },
794        DataType::Timestamp(TimeUnit::Microsecond, None) => {
795            timestamp_decoder!(TimestampMicrosecondType, data_type, Utc)
796        },
797        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
798            timestamp_decoder!(TimestampNanosecondType, data_type, Utc)
799        },
800        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
801            let tz: Tz = tz.parse()?;
802            timestamp_decoder!(TimestampSecondType, data_type, tz)
803        },
804        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
805            let tz: Tz = tz.parse()?;
806            timestamp_decoder!(TimestampMillisecondType, data_type, tz)
807        },
808        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
809            let tz: Tz = tz.parse()?;
810            timestamp_decoder!(TimestampMicrosecondType, data_type, tz)
811        },
812        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
813            let tz: Tz = tz.parse()?;
814            timestamp_decoder!(TimestampNanosecondType, data_type, tz)
815        },
816        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
817        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
818        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
819        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
820        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
821        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
822        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
823        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
824        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
825        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
826        DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s),
827        DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s),
828        DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s),
829        DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s),
830        DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))),
831        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(ctx))),
832        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))),
833        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(ctx))),
834        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
835        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
836        DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837        DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
838        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
839        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
840        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
841        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
842        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
843        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
844        DataType::RunEndEncoded(ref r, _) => match r.data_type() {
845            DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
846            DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
847            DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
848            d => unreachable!("unsupported run end index type: {d}"),
849        },
850        _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
851    }
852}
853
854#[cfg(test)]
855mod tests {
856    use arrow_array::cast::AsArray;
857    use arrow_array::{
858        Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array, ListArray, MapArray,
859        NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray, make_array,
860    };
861    use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer};
862    use arrow_cast::display::{ArrayFormatter, FormatOptions};
863    use arrow_data::ArrayDataBuilder;
864    use arrow_schema::{Field, Fields};
865    use serde_json::json;
866    use std::fs::File;
867    use std::io::{BufReader, Cursor, Seek};
868
869    use super::*;
870
871    fn do_read(
872        buf: &str,
873        batch_size: usize,
874        coerce_primitive: bool,
875        strict_mode: bool,
876        schema: SchemaRef,
877    ) -> Vec<RecordBatch> {
878        let mut unbuffered = vec![];
879
880        // Test with different batch sizes to test for boundary conditions
881        for batch_size in [1, 3, 100, batch_size] {
882            unbuffered = ReaderBuilder::new(schema.clone())
883                .with_batch_size(batch_size)
884                .with_coerce_primitive(coerce_primitive)
885                .build(Cursor::new(buf.as_bytes()))
886                .unwrap()
887                .collect::<Result<Vec<_>, _>>()
888                .unwrap();
889
890            for b in unbuffered.iter().take(unbuffered.len() - 1) {
891                assert_eq!(b.num_rows(), batch_size)
892            }
893
894            // Test with different buffer sizes to test for boundary conditions
895            for b in [1, 3, 5] {
896                let buffered = ReaderBuilder::new(schema.clone())
897                    .with_batch_size(batch_size)
898                    .with_coerce_primitive(coerce_primitive)
899                    .with_strict_mode(strict_mode)
900                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
901                    .unwrap()
902                    .collect::<Result<Vec<_>, _>>()
903                    .unwrap();
904                assert_eq!(unbuffered, buffered);
905            }
906        }
907
908        unbuffered
909    }
910
911    #[test]
912    fn test_basic() {
913        let buf = r#"
914        {"a": 1, "b": 2, "c": true, "d": 1}
915        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
916
917        {"b": 6, "a": 2.0, "d": 45}
918        {"b": "5", "a": 2}
919        {"b": 4e0}
920        {"b": 7, "a": null}
921        "#;
922
923        let schema = Arc::new(Schema::new(vec![
924            Field::new("a", DataType::Int64, true),
925            Field::new("b", DataType::Int32, true),
926            Field::new("c", DataType::Boolean, true),
927            Field::new("d", DataType::Date32, true),
928            Field::new("e", DataType::Date64, true),
929        ]));
930
931        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
932        assert!(decoder.is_empty());
933        assert_eq!(decoder.len(), 0);
934        assert!(!decoder.has_partial_record());
935        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
936        assert!(!decoder.is_empty());
937        assert_eq!(decoder.len(), 6);
938        assert!(!decoder.has_partial_record());
939        let batch = decoder.flush().unwrap().unwrap();
940        assert_eq!(batch.num_rows(), 6);
941        assert!(decoder.is_empty());
942        assert_eq!(decoder.len(), 0);
943        assert!(!decoder.has_partial_record());
944
945        let batches = do_read(buf, 1024, false, false, schema);
946        assert_eq!(batches.len(), 1);
947
948        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
949        assert_eq!(col1.null_count(), 2);
950        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
951        assert!(col1.is_null(4));
952        assert!(col1.is_null(5));
953
954        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
955        assert_eq!(col2.null_count(), 0);
956        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
957
958        let col3 = batches[0].column(2).as_boolean();
959        assert_eq!(col3.null_count(), 4);
960        assert!(col3.value(0));
961        assert!(!col3.is_null(0));
962        assert!(!col3.value(1));
963        assert!(!col3.is_null(1));
964
965        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
966        assert_eq!(col4.null_count(), 3);
967        assert!(col4.is_null(3));
968        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
969
970        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
971        assert_eq!(col5.null_count(), 5);
972        assert!(col5.is_null(0));
973        assert!(col5.is_null(2));
974        assert!(col5.is_null(3));
975        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
976    }
977
978    #[test]
979    fn test_string() {
980        let buf = r#"
981        {"a": "1", "b": "2"}
982        {"a": "hello", "b": "shoo"}
983        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
984
985        {"b": null}
986        {"b": "", "a": null}
987
988        "#;
989        let schema = Arc::new(Schema::new(vec![
990            Field::new("a", DataType::Utf8, true),
991            Field::new("b", DataType::LargeUtf8, true),
992        ]));
993
994        let batches = do_read(buf, 1024, false, false, schema);
995        assert_eq!(batches.len(), 1);
996
997        let col1 = batches[0].column(0).as_string::<i32>();
998        assert_eq!(col1.null_count(), 2);
999        assert_eq!(col1.value(0), "1");
1000        assert_eq!(col1.value(1), "hello");
1001        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1002        assert!(col1.is_null(3));
1003        assert!(col1.is_null(4));
1004
1005        let col2 = batches[0].column(1).as_string::<i64>();
1006        assert_eq!(col2.null_count(), 1);
1007        assert_eq!(col2.value(0), "2");
1008        assert_eq!(col2.value(1), "shoo");
1009        assert_eq!(col2.value(2), "\t😁foo");
1010        assert!(col2.is_null(3));
1011        assert_eq!(col2.value(4), "");
1012    }
1013
1014    #[test]
1015    fn test_long_string_view_allocation() {
1016        // The JSON input contains field "a" with different string lengths.
1017        // According to the implementation in the decoder:
1018        // - For a string, capacity is only increased if its length > 12 bytes.
1019        // Therefore, for:
1020        // Row 1: "short" (5 bytes) -> capacity += 0
1021        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
1022        // Row 3: "hello" (5 bytes) -> capacity += 0
1023        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
1024        // Expected total capacity = 24 + 17 = 41
1025        let expected_capacity: usize = 41;
1026
1027        let buf = r#"
1028        {"a": "short", "b": "dummy"}
1029        {"a": "this is definitely long", "b": "dummy"}
1030        {"a": "hello", "b": "dummy"}
1031        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
1032        "#;
1033
1034        let schema = Arc::new(Schema::new(vec![
1035            Field::new("a", DataType::Utf8View, true),
1036            Field::new("b", DataType::LargeUtf8, true),
1037        ]));
1038
1039        let batches = do_read(buf, 1024, false, false, schema);
1040        assert_eq!(batches.len(), 1, "Expected one record batch");
1041
1042        // Get the first column ("a") as a StringViewArray.
1043        let col_a = batches[0].column(0);
1044        let string_view_array = col_a
1045            .as_any()
1046            .downcast_ref::<StringViewArray>()
1047            .expect("Column should be a StringViewArray");
1048
1049        // Retrieve the underlying data buffer from the array.
1050        // The builder pre-allocates capacity based on the sum of lengths for long strings.
1051        let data_buffer = string_view_array.to_data().buffers()[0].len();
1052
1053        // Check that the allocated capacity is at least what we expected.
1054        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
1055        assert!(
1056            data_buffer >= expected_capacity,
1057            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1058        );
1059
1060        // Additionally, verify that the decoded values are correct.
1061        assert_eq!(string_view_array.value(0), "short");
1062        assert_eq!(string_view_array.value(1), "this is definitely long");
1063        assert_eq!(string_view_array.value(2), "hello");
1064        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1065    }
1066
1067    /// Test the memory capacity allocation logic when converting numeric types to strings.
1068    #[test]
1069    fn test_numeric_view_allocation() {
1070        // For numeric types, the expected capacity calculation is as follows:
1071        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
1072        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
1073        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
1074        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
1075        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
1076        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
1077        let expected_capacity: usize = 33;
1078
1079        let buf = r#"
1080    {"n": 123456789}
1081    {"n": 1000000000000}
1082    {"n": 3.1415}
1083    {"n": 2.718281828459045}
1084    "#;
1085
1086        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1087
1088        let batches = do_read(buf, 1024, true, false, schema);
1089        assert_eq!(batches.len(), 1, "Expected one record batch");
1090
1091        let col_n = batches[0].column(0);
1092        let string_view_array = col_n
1093            .as_any()
1094            .downcast_ref::<StringViewArray>()
1095            .expect("Column should be a StringViewArray");
1096
1097        // Check that the underlying data buffer capacity is at least the expected value.
1098        let data_buffer = string_view_array.to_data().buffers()[0].len();
1099        assert!(
1100            data_buffer >= expected_capacity,
1101            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1102        );
1103
1104        // Verify that the converted string values are correct.
1105        // Note: The format of the number converted to a string should match the actual implementation.
1106        assert_eq!(string_view_array.value(0), "123456789");
1107        assert_eq!(string_view_array.value(1), "1000000000000");
1108        assert_eq!(string_view_array.value(2), "3.1415");
1109        assert_eq!(string_view_array.value(3), "2.718281828459045");
1110    }
1111
1112    #[test]
1113    fn test_string_with_uft8view() {
1114        let buf = r#"
1115        {"a": "1", "b": "2"}
1116        {"a": "hello", "b": "shoo"}
1117        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1118
1119        {"b": null}
1120        {"b": "", "a": null}
1121
1122        "#;
1123        let schema = Arc::new(Schema::new(vec![
1124            Field::new("a", DataType::Utf8View, true),
1125            Field::new("b", DataType::LargeUtf8, true),
1126        ]));
1127
1128        let batches = do_read(buf, 1024, false, false, schema);
1129        assert_eq!(batches.len(), 1);
1130
1131        let col1 = batches[0].column(0).as_string_view();
1132        assert_eq!(col1.null_count(), 2);
1133        assert_eq!(col1.value(0), "1");
1134        assert_eq!(col1.value(1), "hello");
1135        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1136        assert!(col1.is_null(3));
1137        assert!(col1.is_null(4));
1138        assert_eq!(col1.data_type(), &DataType::Utf8View);
1139
1140        let col2 = batches[0].column(1).as_string::<i64>();
1141        assert_eq!(col2.null_count(), 1);
1142        assert_eq!(col2.value(0), "2");
1143        assert_eq!(col2.value(1), "shoo");
1144        assert_eq!(col2.value(2), "\t😁foo");
1145        assert!(col2.is_null(3));
1146        assert_eq!(col2.value(4), "");
1147    }
1148
1149    #[test]
1150    fn test_complex() {
1151        let buf = r#"
1152           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1153           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1154           {"list": null, "nested": {"a": null}}
1155        "#;
1156
1157        let schema = Arc::new(Schema::new(vec![
1158            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1159            Field::new_struct(
1160                "nested",
1161                vec![
1162                    Field::new("a", DataType::Int32, true),
1163                    Field::new("b", DataType::Int32, true),
1164                ],
1165                true,
1166            ),
1167            Field::new_struct(
1168                "nested_list",
1169                vec![Field::new_list(
1170                    "list2",
1171                    Field::new_struct(
1172                        "element",
1173                        vec![Field::new("c", DataType::Int32, false)],
1174                        false,
1175                    ),
1176                    true,
1177                )],
1178                true,
1179            ),
1180        ]));
1181
1182        let batches = do_read(buf, 1024, false, false, schema);
1183        assert_eq!(batches.len(), 1);
1184
1185        let list = batches[0].column(0).as_list::<i32>();
1186        assert_eq!(list.len(), 3);
1187        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1188        assert_eq!(list.null_count(), 1);
1189        assert!(list.is_null(2));
1190        let list_values = list.values().as_primitive::<Int32Type>();
1191        assert_eq!(list_values.values(), &[5, 6]);
1192
1193        let nested = batches[0].column(1).as_struct();
1194        let a = nested.column(0).as_primitive::<Int32Type>();
1195        assert_eq!(list.null_count(), 1);
1196        assert_eq!(a.values(), &[1, 7, 0]);
1197        assert!(list.is_null(2));
1198
1199        let b = nested.column(1).as_primitive::<Int32Type>();
1200        assert_eq!(b.null_count(), 2);
1201        assert_eq!(b.len(), 3);
1202        assert_eq!(b.value(0), 2);
1203        assert!(b.is_null(1));
1204        assert!(b.is_null(2));
1205
1206        let nested_list = batches[0].column(2).as_struct();
1207        assert_eq!(nested_list.len(), 3);
1208        assert_eq!(nested_list.null_count(), 1);
1209        assert!(nested_list.is_null(2));
1210
1211        let list2 = nested_list.column(0).as_list::<i32>();
1212        assert_eq!(list2.len(), 3);
1213        assert_eq!(list2.null_count(), 1);
1214        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1215        assert!(list2.is_null(2));
1216
1217        let list2_values = list2.values().as_struct();
1218
1219        let c = list2_values.column(0).as_primitive::<Int32Type>();
1220        assert_eq!(c.values(), &[3, 4]);
1221    }
1222
1223    #[test]
1224    fn test_projection() {
1225        let buf = r#"
1226           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1227           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1228        "#;
1229
1230        let schema = Arc::new(Schema::new(vec![
1231            Field::new_struct(
1232                "nested",
1233                vec![Field::new("a", DataType::Int32, false)],
1234                true,
1235            ),
1236            Field::new_struct(
1237                "nested_list",
1238                vec![Field::new_list(
1239                    "list2",
1240                    Field::new_struct(
1241                        "element",
1242                        vec![Field::new("d", DataType::Int32, true)],
1243                        false,
1244                    ),
1245                    true,
1246                )],
1247                true,
1248            ),
1249        ]));
1250
1251        let batches = do_read(buf, 1024, false, false, schema);
1252        assert_eq!(batches.len(), 1);
1253
1254        let nested = batches[0].column(0).as_struct();
1255        assert_eq!(nested.num_columns(), 1);
1256        let a = nested.column(0).as_primitive::<Int32Type>();
1257        assert_eq!(a.null_count(), 0);
1258        assert_eq!(a.values(), &[1, 7]);
1259
1260        let nested_list = batches[0].column(1).as_struct();
1261        assert_eq!(nested_list.num_columns(), 1);
1262        assert_eq!(nested_list.null_count(), 0);
1263
1264        let list2 = nested_list.column(0).as_list::<i32>();
1265        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1266        assert_eq!(list2.null_count(), 0);
1267
1268        let child = list2.values().as_struct();
1269        assert_eq!(child.num_columns(), 1);
1270        assert_eq!(child.len(), 2);
1271        assert_eq!(child.null_count(), 0);
1272
1273        let c = child.column(0).as_primitive::<Int32Type>();
1274        assert_eq!(c.values(), &[5, 0]);
1275        assert_eq!(c.null_count(), 1);
1276        assert!(c.is_null(1));
1277    }
1278
1279    #[test]
1280    fn test_map() {
1281        let buf = r#"
1282           {"map": {"a": ["foo", null]}}
1283           {"map": {"a": [null], "b": []}}
1284           {"map": {"c": null, "a": ["baz"]}}
1285        "#;
1286        let map = Field::new_map(
1287            "map",
1288            "entries",
1289            Field::new("key", DataType::Utf8, false),
1290            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1291            false,
1292            true,
1293        );
1294
1295        let schema = Arc::new(Schema::new(vec![map]));
1296
1297        let batches = do_read(buf, 1024, false, false, schema);
1298        assert_eq!(batches.len(), 1);
1299
1300        let map = batches[0].column(0).as_map();
1301        let map_keys = map.keys().as_string::<i32>();
1302        let map_values = map.values().as_list::<i32>();
1303        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1304
1305        let k: Vec<_> = map_keys.iter().flatten().collect();
1306        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1307
1308        let list_values = map_values.values().as_string::<i32>();
1309        let lv: Vec<_> = list_values.iter().collect();
1310        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1311        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1312        assert_eq!(map_values.null_count(), 1);
1313        assert!(map_values.is_null(3));
1314
1315        let options = FormatOptions::default().with_null("null");
1316        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1317        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1318        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1319        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1320    }
1321
1322    #[test]
1323    fn test_not_coercing_primitive_into_string_without_flag() {
1324        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1325
1326        let buf = r#"{"a": 1}"#;
1327        let err = ReaderBuilder::new(schema.clone())
1328            .with_batch_size(1024)
1329            .build(Cursor::new(buf.as_bytes()))
1330            .unwrap()
1331            .read()
1332            .unwrap_err();
1333
1334        assert_eq!(
1335            err.to_string(),
1336            "Json error: whilst decoding field 'a': expected string got 1"
1337        );
1338
1339        let buf = r#"{"a": true}"#;
1340        let err = ReaderBuilder::new(schema)
1341            .with_batch_size(1024)
1342            .build(Cursor::new(buf.as_bytes()))
1343            .unwrap()
1344            .read()
1345            .unwrap_err();
1346
1347        assert_eq!(
1348            err.to_string(),
1349            "Json error: whilst decoding field 'a': expected string got true"
1350        );
1351    }
1352
1353    #[test]
1354    fn test_coercing_primitive_into_string() {
1355        let buf = r#"
1356        {"a": 1, "b": 2, "c": true}
1357        {"a": 2E0, "b": 4, "c": false}
1358
1359        {"b": 6, "a": 2.0}
1360        {"b": "5", "a": 2}
1361        {"b": 4e0}
1362        {"b": 7, "a": null}
1363        "#;
1364
1365        let schema = Arc::new(Schema::new(vec![
1366            Field::new("a", DataType::Utf8, true),
1367            Field::new("b", DataType::Utf8, true),
1368            Field::new("c", DataType::Utf8, true),
1369        ]));
1370
1371        let batches = do_read(buf, 1024, true, false, schema);
1372        assert_eq!(batches.len(), 1);
1373
1374        let col1 = batches[0].column(0).as_string::<i32>();
1375        assert_eq!(col1.null_count(), 2);
1376        assert_eq!(col1.value(0), "1");
1377        assert_eq!(col1.value(1), "2E0");
1378        assert_eq!(col1.value(2), "2.0");
1379        assert_eq!(col1.value(3), "2");
1380        assert!(col1.is_null(4));
1381        assert!(col1.is_null(5));
1382
1383        let col2 = batches[0].column(1).as_string::<i32>();
1384        assert_eq!(col2.null_count(), 0);
1385        assert_eq!(col2.value(0), "2");
1386        assert_eq!(col2.value(1), "4");
1387        assert_eq!(col2.value(2), "6");
1388        assert_eq!(col2.value(3), "5");
1389        assert_eq!(col2.value(4), "4e0");
1390        assert_eq!(col2.value(5), "7");
1391
1392        let col3 = batches[0].column(2).as_string::<i32>();
1393        assert_eq!(col3.null_count(), 4);
1394        assert_eq!(col3.value(0), "true");
1395        assert_eq!(col3.value(1), "false");
1396        assert!(col3.is_null(2));
1397        assert!(col3.is_null(3));
1398        assert!(col3.is_null(4));
1399        assert!(col3.is_null(5));
1400    }
1401
1402    fn test_decimal<T: DecimalType>(data_type: DataType) {
1403        let buf = r#"
1404        {"a": 1, "b": 2, "c": 38.30}
1405        {"a": 2, "b": 4, "c": 123.456}
1406
1407        {"b": 1337, "a": "2.0452"}
1408        {"b": "5", "a": "11034.2"}
1409        {"b": 40}
1410        {"b": 1234, "a": null}
1411        "#;
1412
1413        let schema = Arc::new(Schema::new(vec![
1414            Field::new("a", data_type.clone(), true),
1415            Field::new("b", data_type.clone(), true),
1416            Field::new("c", data_type, true),
1417        ]));
1418
1419        let batches = do_read(buf, 1024, true, false, schema);
1420        assert_eq!(batches.len(), 1);
1421
1422        let col1 = batches[0].column(0).as_primitive::<T>();
1423        assert_eq!(col1.null_count(), 2);
1424        assert!(col1.is_null(4));
1425        assert!(col1.is_null(5));
1426        assert_eq!(
1427            col1.values(),
1428            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1429        );
1430
1431        let col2 = batches[0].column(1).as_primitive::<T>();
1432        assert_eq!(col2.null_count(), 0);
1433        assert_eq!(
1434            col2.values(),
1435            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1436        );
1437
1438        let col3 = batches[0].column(2).as_primitive::<T>();
1439        assert_eq!(col3.null_count(), 4);
1440        assert!(!col3.is_null(0));
1441        assert!(!col3.is_null(1));
1442        assert!(col3.is_null(2));
1443        assert!(col3.is_null(3));
1444        assert!(col3.is_null(4));
1445        assert!(col3.is_null(5));
1446        assert_eq!(
1447            col3.values(),
1448            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1449        );
1450    }
1451
1452    #[test]
1453    fn test_decimals() {
1454        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1455        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1456        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1457        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1458    }
1459
1460    fn test_timestamp<T: ArrowTimestampType>() {
1461        let buf = r#"
1462        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1463        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1464
1465        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1466        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1467        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1468        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1469        "#;
1470
1471        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1472        let schema = Arc::new(Schema::new(vec![
1473            Field::new("a", T::DATA_TYPE, true),
1474            Field::new("b", T::DATA_TYPE, true),
1475            Field::new("c", T::DATA_TYPE, true),
1476            Field::new("d", with_timezone, true),
1477        ]));
1478
1479        let batches = do_read(buf, 1024, true, false, schema);
1480        assert_eq!(batches.len(), 1);
1481
1482        let unit_in_nanos: i64 = match T::UNIT {
1483            TimeUnit::Second => 1_000_000_000,
1484            TimeUnit::Millisecond => 1_000_000,
1485            TimeUnit::Microsecond => 1_000,
1486            TimeUnit::Nanosecond => 1,
1487        };
1488
1489        let col1 = batches[0].column(0).as_primitive::<T>();
1490        assert_eq!(col1.null_count(), 4);
1491        assert!(col1.is_null(2));
1492        assert!(col1.is_null(3));
1493        assert!(col1.is_null(4));
1494        assert!(col1.is_null(5));
1495        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1496
1497        let col2 = batches[0].column(1).as_primitive::<T>();
1498        assert_eq!(col2.null_count(), 1);
1499        assert!(col2.is_null(5));
1500        assert_eq!(
1501            col2.values(),
1502            &[
1503                1599572549190855000 / unit_in_nanos,
1504                1599572549190855000 / unit_in_nanos,
1505                1599572549000000000 / unit_in_nanos,
1506                40,
1507                1234,
1508                0
1509            ]
1510        );
1511
1512        let col3 = batches[0].column(2).as_primitive::<T>();
1513        assert_eq!(col3.null_count(), 0);
1514        assert_eq!(
1515            col3.values(),
1516            &[
1517                38,
1518                123,
1519                854702816123000000 / unit_in_nanos,
1520                1599572549190855000 / unit_in_nanos,
1521                854702816123000000 / unit_in_nanos,
1522                854738816123000000 / unit_in_nanos
1523            ]
1524        );
1525
1526        let col4 = batches[0].column(3).as_primitive::<T>();
1527
1528        assert_eq!(col4.null_count(), 0);
1529        assert_eq!(
1530            col4.values(),
1531            &[
1532                854674016123000000 / unit_in_nanos,
1533                123,
1534                854702816123000000 / unit_in_nanos,
1535                854720816123000000 / unit_in_nanos,
1536                854674016000000000 / unit_in_nanos,
1537                854640000000000000 / unit_in_nanos
1538            ]
1539        );
1540    }
1541
1542    #[test]
1543    fn test_timestamps() {
1544        test_timestamp::<TimestampSecondType>();
1545        test_timestamp::<TimestampMillisecondType>();
1546        test_timestamp::<TimestampMicrosecondType>();
1547        test_timestamp::<TimestampNanosecondType>();
1548    }
1549
1550    fn test_time<T: ArrowTemporalType>() {
1551        let buf = r#"
1552        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1553        {"a": 2, "b": "23:59:59", "c": 123.456}
1554
1555        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1556        {"b": 40, "c": "13:42:29.190855"}
1557        {"b": 1234, "a": null, "c": "09:26:56.123"}
1558        {"c": "14:26:56.123"}
1559        "#;
1560
1561        let unit = match T::DATA_TYPE {
1562            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1563            _ => unreachable!(),
1564        };
1565
1566        let unit_in_nanos = match unit {
1567            TimeUnit::Second => 1_000_000_000,
1568            TimeUnit::Millisecond => 1_000_000,
1569            TimeUnit::Microsecond => 1_000,
1570            TimeUnit::Nanosecond => 1,
1571        };
1572
1573        let schema = Arc::new(Schema::new(vec![
1574            Field::new("a", T::DATA_TYPE, true),
1575            Field::new("b", T::DATA_TYPE, true),
1576            Field::new("c", T::DATA_TYPE, true),
1577        ]));
1578
1579        let batches = do_read(buf, 1024, true, false, schema);
1580        assert_eq!(batches.len(), 1);
1581
1582        let col1 = batches[0].column(0).as_primitive::<T>();
1583        assert_eq!(col1.null_count(), 4);
1584        assert!(col1.is_null(2));
1585        assert!(col1.is_null(3));
1586        assert!(col1.is_null(4));
1587        assert!(col1.is_null(5));
1588        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1589
1590        let col2 = batches[0].column(1).as_primitive::<T>();
1591        assert_eq!(col2.null_count(), 1);
1592        assert!(col2.is_null(5));
1593        assert_eq!(
1594            col2.values(),
1595            &[
1596                34016123000000 / unit_in_nanos,
1597                86399000000000 / unit_in_nanos,
1598                64800000000000 / unit_in_nanos,
1599                40,
1600                1234,
1601                0
1602            ]
1603            .map(T::Native::usize_as)
1604        );
1605
1606        let col3 = batches[0].column(2).as_primitive::<T>();
1607        assert_eq!(col3.null_count(), 0);
1608        assert_eq!(
1609            col3.values(),
1610            &[
1611                38,
1612                123,
1613                34016123000000 / unit_in_nanos,
1614                49349190855000 / unit_in_nanos,
1615                34016123000000 / unit_in_nanos,
1616                52016123000000 / unit_in_nanos
1617            ]
1618            .map(T::Native::usize_as)
1619        );
1620    }
1621
1622    #[test]
1623    fn test_times() {
1624        test_time::<Time32MillisecondType>();
1625        test_time::<Time32SecondType>();
1626        test_time::<Time64MicrosecondType>();
1627        test_time::<Time64NanosecondType>();
1628    }
1629
1630    fn test_duration<T: ArrowTemporalType>() {
1631        let buf = r#"
1632        {"a": 1, "b": "2"}
1633        {"a": 3, "b": null}
1634        "#;
1635
1636        let schema = Arc::new(Schema::new(vec![
1637            Field::new("a", T::DATA_TYPE, true),
1638            Field::new("b", T::DATA_TYPE, true),
1639        ]));
1640
1641        let batches = do_read(buf, 1024, true, false, schema);
1642        assert_eq!(batches.len(), 1);
1643
1644        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1645        assert_eq!(col_a.null_count(), 0);
1646        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1647
1648        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1649        assert_eq!(col2.null_count(), 1);
1650        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1651    }
1652
1653    #[test]
1654    fn test_durations() {
1655        test_duration::<DurationNanosecondType>();
1656        test_duration::<DurationMicrosecondType>();
1657        test_duration::<DurationMillisecondType>();
1658        test_duration::<DurationSecondType>();
1659    }
1660
1661    #[test]
1662    fn test_delta_checkpoint() {
1663        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1664        let schema = Arc::new(Schema::new(vec![
1665            Field::new_struct(
1666                "protocol",
1667                vec![
1668                    Field::new("minReaderVersion", DataType::Int32, true),
1669                    Field::new("minWriterVersion", DataType::Int32, true),
1670                ],
1671                true,
1672            ),
1673            Field::new_struct(
1674                "add",
1675                vec![Field::new_map(
1676                    "partitionValues",
1677                    "key_value",
1678                    Field::new("key", DataType::Utf8, false),
1679                    Field::new("value", DataType::Utf8, true),
1680                    false,
1681                    false,
1682                )],
1683                true,
1684            ),
1685        ]));
1686
1687        let batches = do_read(json, 1024, true, false, schema);
1688        assert_eq!(batches.len(), 1);
1689
1690        let s: StructArray = batches.into_iter().next().unwrap().into();
1691        let opts = FormatOptions::default().with_null("null");
1692        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1693        assert_eq!(
1694            formatter.value(0).to_string(),
1695            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1696        );
1697    }
1698
1699    #[test]
1700    fn struct_nullability() {
1701        let do_test = |child: DataType| {
1702            // Test correctly enforced nullability
1703            let non_null = r#"{"foo": {}}"#;
1704            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1705                "foo",
1706                vec![Field::new("bar", child, false)],
1707                true,
1708            )]));
1709            let mut reader = ReaderBuilder::new(schema.clone())
1710                .build(Cursor::new(non_null.as_bytes()))
1711                .unwrap();
1712            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1713
1714            let null = r#"{"foo": {bar: null}}"#;
1715            let mut reader = ReaderBuilder::new(schema.clone())
1716                .build(Cursor::new(null.as_bytes()))
1717                .unwrap();
1718            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1719
1720            // Test nulls in nullable parent can mask nulls in non-nullable child
1721            let null = r#"{"foo": null}"#;
1722            let mut reader = ReaderBuilder::new(schema)
1723                .build(Cursor::new(null.as_bytes()))
1724                .unwrap();
1725            let batch = reader.next().unwrap().unwrap();
1726            assert_eq!(batch.num_columns(), 1);
1727            let foo = batch.column(0).as_struct();
1728            assert_eq!(foo.len(), 1);
1729            assert!(foo.is_null(0));
1730            assert_eq!(foo.num_columns(), 1);
1731
1732            let bar = foo.column(0);
1733            assert_eq!(bar.len(), 1);
1734            // Non-nullable child can still contain null as masked by parent
1735            assert!(bar.is_null(0));
1736        };
1737
1738        do_test(DataType::Boolean);
1739        do_test(DataType::Int32);
1740        do_test(DataType::Utf8);
1741        do_test(DataType::Decimal128(2, 1));
1742        do_test(DataType::Timestamp(
1743            TimeUnit::Microsecond,
1744            Some("+00:00".into()),
1745        ));
1746    }
1747
1748    #[test]
1749    fn test_truncation() {
1750        let buf = r#"
1751        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1752        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1753        {"i64": -9223372036854775808, "u64": 0 }
1754        {"i64": "-9223372036854775808", "u64": 0 }
1755        "#;
1756
1757        let schema = Arc::new(Schema::new(vec![
1758            Field::new("i64", DataType::Int64, true),
1759            Field::new("u64", DataType::UInt64, true),
1760        ]));
1761
1762        let batches = do_read(buf, 1024, true, false, schema);
1763        assert_eq!(batches.len(), 1);
1764
1765        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1766        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1767
1768        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1769        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1770    }
1771
1772    #[test]
1773    fn test_timestamp_truncation() {
1774        let buf = r#"
1775        {"time": 9223372036854775807 }
1776        {"time": -9223372036854775808 }
1777        {"time": 9e5 }
1778        "#;
1779
1780        let schema = Arc::new(Schema::new(vec![Field::new(
1781            "time",
1782            DataType::Timestamp(TimeUnit::Nanosecond, None),
1783            true,
1784        )]));
1785
1786        let batches = do_read(buf, 1024, true, false, schema);
1787        assert_eq!(batches.len(), 1);
1788
1789        let i64 = batches[0]
1790            .column(0)
1791            .as_primitive::<TimestampNanosecondType>();
1792        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1793    }
1794
1795    #[test]
1796    fn test_strict_mode_no_missing_columns_in_schema() {
1797        let buf = r#"
1798        {"a": 1, "b": "2", "c": true}
1799        {"a": 2E0, "b": "4", "c": false}
1800        "#;
1801
1802        let schema = Arc::new(Schema::new(vec![
1803            Field::new("a", DataType::Int16, false),
1804            Field::new("b", DataType::Utf8, false),
1805            Field::new("c", DataType::Boolean, false),
1806        ]));
1807
1808        let batches = do_read(buf, 1024, true, true, schema);
1809        assert_eq!(batches.len(), 1);
1810
1811        let buf = r#"
1812        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1813        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1814        "#;
1815
1816        let schema = Arc::new(Schema::new(vec![
1817            Field::new("a", DataType::Int16, false),
1818            Field::new("b", DataType::Utf8, false),
1819            Field::new_struct(
1820                "c",
1821                vec![
1822                    Field::new("a", DataType::Boolean, false),
1823                    Field::new("b", DataType::Int16, false),
1824                ],
1825                false,
1826            ),
1827        ]));
1828
1829        let batches = do_read(buf, 1024, true, true, schema);
1830        assert_eq!(batches.len(), 1);
1831    }
1832
1833    #[test]
1834    fn test_strict_mode_missing_columns_in_schema() {
1835        let buf = r#"
1836        {"a": 1, "b": "2", "c": true}
1837        {"a": 2E0, "b": "4", "c": false}
1838        "#;
1839
1840        let schema = Arc::new(Schema::new(vec![
1841            Field::new("a", DataType::Int16, true),
1842            Field::new("c", DataType::Boolean, true),
1843        ]));
1844
1845        let err = ReaderBuilder::new(schema)
1846            .with_batch_size(1024)
1847            .with_strict_mode(true)
1848            .build(Cursor::new(buf.as_bytes()))
1849            .unwrap()
1850            .read()
1851            .unwrap_err();
1852
1853        assert_eq!(
1854            err.to_string(),
1855            "Json error: column 'b' missing from schema"
1856        );
1857
1858        let buf = r#"
1859        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1860        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1861        "#;
1862
1863        let schema = Arc::new(Schema::new(vec![
1864            Field::new("a", DataType::Int16, false),
1865            Field::new("b", DataType::Utf8, false),
1866            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1867        ]));
1868
1869        let err = ReaderBuilder::new(schema)
1870            .with_batch_size(1024)
1871            .with_strict_mode(true)
1872            .build(Cursor::new(buf.as_bytes()))
1873            .unwrap()
1874            .read()
1875            .unwrap_err();
1876
1877        assert_eq!(
1878            err.to_string(),
1879            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1880        );
1881    }
1882
1883    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1884        let file = File::open(path).unwrap();
1885        let mut reader = BufReader::new(file);
1886        let schema = schema.unwrap_or_else(|| {
1887            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1888            reader.rewind().unwrap();
1889            schema
1890        });
1891        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1892        builder.build(reader).unwrap()
1893    }
1894
1895    #[test]
1896    fn test_json_basic() {
1897        let mut reader = read_file("test/data/basic.json", None);
1898        let batch = reader.next().unwrap().unwrap();
1899
1900        assert_eq!(8, batch.num_columns());
1901        assert_eq!(12, batch.num_rows());
1902
1903        let schema = reader.schema();
1904        let batch_schema = batch.schema();
1905        assert_eq!(schema, batch_schema);
1906
1907        let a = schema.column_with_name("a").unwrap();
1908        assert_eq!(0, a.0);
1909        assert_eq!(&DataType::Int64, a.1.data_type());
1910        let b = schema.column_with_name("b").unwrap();
1911        assert_eq!(1, b.0);
1912        assert_eq!(&DataType::Float64, b.1.data_type());
1913        let c = schema.column_with_name("c").unwrap();
1914        assert_eq!(2, c.0);
1915        assert_eq!(&DataType::Boolean, c.1.data_type());
1916        let d = schema.column_with_name("d").unwrap();
1917        assert_eq!(3, d.0);
1918        assert_eq!(&DataType::Utf8, d.1.data_type());
1919
1920        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1921        assert_eq!(1, aa.value(0));
1922        assert_eq!(-10, aa.value(1));
1923        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1924        assert_eq!(2.0, bb.value(0));
1925        assert_eq!(-3.5, bb.value(1));
1926        let cc = batch.column(c.0).as_boolean();
1927        assert!(!cc.value(0));
1928        assert!(cc.value(10));
1929        let dd = batch.column(d.0).as_string::<i32>();
1930        assert_eq!("4", dd.value(0));
1931        assert_eq!("text", dd.value(8));
1932    }
1933
1934    #[test]
1935    fn test_json_empty_projection() {
1936        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1937        let batch = reader.next().unwrap().unwrap();
1938
1939        assert_eq!(0, batch.num_columns());
1940        assert_eq!(12, batch.num_rows());
1941    }
1942
1943    #[test]
1944    fn test_json_basic_with_nulls() {
1945        let mut reader = read_file("test/data/basic_nulls.json", None);
1946        let batch = reader.next().unwrap().unwrap();
1947
1948        assert_eq!(4, batch.num_columns());
1949        assert_eq!(12, batch.num_rows());
1950
1951        let schema = reader.schema();
1952        let batch_schema = batch.schema();
1953        assert_eq!(schema, batch_schema);
1954
1955        let a = schema.column_with_name("a").unwrap();
1956        assert_eq!(&DataType::Int64, a.1.data_type());
1957        let b = schema.column_with_name("b").unwrap();
1958        assert_eq!(&DataType::Float64, b.1.data_type());
1959        let c = schema.column_with_name("c").unwrap();
1960        assert_eq!(&DataType::Boolean, c.1.data_type());
1961        let d = schema.column_with_name("d").unwrap();
1962        assert_eq!(&DataType::Utf8, d.1.data_type());
1963
1964        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1965        assert!(aa.is_valid(0));
1966        assert!(!aa.is_valid(1));
1967        assert!(!aa.is_valid(11));
1968        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1969        assert!(bb.is_valid(0));
1970        assert!(!bb.is_valid(2));
1971        assert!(!bb.is_valid(11));
1972        let cc = batch.column(c.0).as_boolean();
1973        assert!(cc.is_valid(0));
1974        assert!(!cc.is_valid(4));
1975        assert!(!cc.is_valid(11));
1976        let dd = batch.column(d.0).as_string::<i32>();
1977        assert!(!dd.is_valid(0));
1978        assert!(dd.is_valid(1));
1979        assert!(!dd.is_valid(4));
1980        assert!(!dd.is_valid(11));
1981    }
1982
1983    #[test]
1984    fn test_json_basic_schema() {
1985        let schema = Schema::new(vec![
1986            Field::new("a", DataType::Int64, true),
1987            Field::new("b", DataType::Float32, false),
1988            Field::new("c", DataType::Boolean, false),
1989            Field::new("d", DataType::Utf8, false),
1990        ]);
1991
1992        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1993        let reader_schema = reader.schema();
1994        assert_eq!(reader_schema.as_ref(), &schema);
1995        let batch = reader.next().unwrap().unwrap();
1996
1997        assert_eq!(4, batch.num_columns());
1998        assert_eq!(12, batch.num_rows());
1999
2000        let schema = batch.schema();
2001
2002        let a = schema.column_with_name("a").unwrap();
2003        assert_eq!(&DataType::Int64, a.1.data_type());
2004        let b = schema.column_with_name("b").unwrap();
2005        assert_eq!(&DataType::Float32, b.1.data_type());
2006        let c = schema.column_with_name("c").unwrap();
2007        assert_eq!(&DataType::Boolean, c.1.data_type());
2008        let d = schema.column_with_name("d").unwrap();
2009        assert_eq!(&DataType::Utf8, d.1.data_type());
2010
2011        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2012        assert_eq!(1, aa.value(0));
2013        assert_eq!(100000000000000, aa.value(11));
2014        let bb = batch.column(b.0).as_primitive::<Float32Type>();
2015        assert_eq!(2.0, bb.value(0));
2016        assert_eq!(-3.5, bb.value(1));
2017    }
2018
2019    #[test]
2020    fn test_json_basic_schema_projection() {
2021        let schema = Schema::new(vec![
2022            Field::new("a", DataType::Int64, true),
2023            Field::new("c", DataType::Boolean, false),
2024        ]);
2025
2026        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
2027        let batch = reader.next().unwrap().unwrap();
2028
2029        assert_eq!(2, batch.num_columns());
2030        assert_eq!(2, batch.schema().fields().len());
2031        assert_eq!(12, batch.num_rows());
2032
2033        assert_eq!(batch.schema().as_ref(), &schema);
2034
2035        let a = schema.column_with_name("a").unwrap();
2036        assert_eq!(0, a.0);
2037        assert_eq!(&DataType::Int64, a.1.data_type());
2038        let c = schema.column_with_name("c").unwrap();
2039        assert_eq!(1, c.0);
2040        assert_eq!(&DataType::Boolean, c.1.data_type());
2041    }
2042
2043    #[test]
2044    fn test_json_arrays() {
2045        let mut reader = read_file("test/data/arrays.json", None);
2046        let batch = reader.next().unwrap().unwrap();
2047
2048        assert_eq!(4, batch.num_columns());
2049        assert_eq!(3, batch.num_rows());
2050
2051        let schema = batch.schema();
2052
2053        let a = schema.column_with_name("a").unwrap();
2054        assert_eq!(&DataType::Int64, a.1.data_type());
2055        let b = schema.column_with_name("b").unwrap();
2056        assert_eq!(
2057            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2058            b.1.data_type()
2059        );
2060        let c = schema.column_with_name("c").unwrap();
2061        assert_eq!(
2062            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2063            c.1.data_type()
2064        );
2065        let d = schema.column_with_name("d").unwrap();
2066        assert_eq!(&DataType::Utf8, d.1.data_type());
2067
2068        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2069        assert_eq!(1, aa.value(0));
2070        assert_eq!(-10, aa.value(1));
2071        assert_eq!(1627668684594000000, aa.value(2));
2072        let bb = batch.column(b.0).as_list::<i32>();
2073        let bb = bb.values().as_primitive::<Float64Type>();
2074        assert_eq!(9, bb.len());
2075        assert_eq!(2.0, bb.value(0));
2076        assert_eq!(-6.1, bb.value(5));
2077        assert!(!bb.is_valid(7));
2078
2079        let cc = batch
2080            .column(c.0)
2081            .as_any()
2082            .downcast_ref::<ListArray>()
2083            .unwrap();
2084        let cc = cc.values().as_boolean();
2085        assert_eq!(6, cc.len());
2086        assert!(!cc.value(0));
2087        assert!(!cc.value(4));
2088        assert!(!cc.is_valid(5));
2089    }
2090
2091    #[test]
2092    fn test_empty_json_arrays() {
2093        let json_content = r#"
2094            {"items": []}
2095            {"items": null}
2096            {}
2097            "#;
2098
2099        let schema = Arc::new(Schema::new(vec![Field::new(
2100            "items",
2101            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2102            true,
2103        )]));
2104
2105        let batches = do_read(json_content, 1024, false, false, schema);
2106        assert_eq!(batches.len(), 1);
2107
2108        let col1 = batches[0].column(0).as_list::<i32>();
2109        assert_eq!(col1.null_count(), 2);
2110        assert!(col1.value(0).is_empty());
2111        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2112        assert!(col1.is_null(1));
2113        assert!(col1.is_null(2));
2114    }
2115
2116    #[test]
2117    fn test_nested_empty_json_arrays() {
2118        let json_content = r#"
2119            {"items": [[],[]]}
2120            {"items": [[null, null],[null]]}
2121            "#;
2122
2123        let schema = Arc::new(Schema::new(vec![Field::new(
2124            "items",
2125            DataType::List(FieldRef::new(Field::new_list_field(
2126                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2127                true,
2128            ))),
2129            true,
2130        )]));
2131
2132        let batches = do_read(json_content, 1024, false, false, schema);
2133        assert_eq!(batches.len(), 1);
2134
2135        let col1 = batches[0].column(0).as_list::<i32>();
2136        assert_eq!(col1.null_count(), 0);
2137        assert_eq!(col1.value(0).len(), 2);
2138        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2139        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2140
2141        assert_eq!(col1.value(1).len(), 2);
2142        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2143        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2144    }
2145
2146    #[test]
2147    fn test_nested_list_json_arrays() {
2148        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2149        let a_struct_field = Field::new_struct(
2150            "a",
2151            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2152            true,
2153        );
2154        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2155        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2156        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2157        let json_content = r#"
2158        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2159        {"a": [{"b": false, "c": null}]}
2160        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2161        {"a": null}
2162        {"a": []}
2163        {"a": [null]}
2164        "#;
2165        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2166
2167        // build expected output
2168        let d = StringArray::from(vec![
2169            Some("a_text"),
2170            Some("b_text"),
2171            None,
2172            Some("c_text"),
2173            Some("d_text"),
2174            None,
2175            None,
2176        ]);
2177        let c = ArrayDataBuilder::new(c_field.data_type().clone())
2178            .len(7)
2179            .add_child_data(d.to_data())
2180            .null_bit_buffer(Some(Buffer::from([0b00111011])))
2181            .build()
2182            .unwrap();
2183        let b = BooleanArray::from(vec![
2184            Some(true),
2185            Some(false),
2186            Some(false),
2187            Some(true),
2188            None,
2189            Some(true),
2190            None,
2191        ]);
2192        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2193            .len(7)
2194            .add_child_data(b.to_data())
2195            .add_child_data(c.clone())
2196            .null_bit_buffer(Some(Buffer::from([0b00111111])))
2197            .build()
2198            .unwrap();
2199        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2200            .len(6)
2201            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2202            .add_child_data(a)
2203            .null_bit_buffer(Some(Buffer::from([0b00110111])))
2204            .build()
2205            .unwrap();
2206        let expected = make_array(a_list);
2207
2208        // compare `a` with result from json reader
2209        let batch = reader.next().unwrap().unwrap();
2210        let read = batch.column(0);
2211        assert_eq!(read.len(), 6);
2212        // compare the arrays the long way around, to better detect differences
2213        let read: &ListArray = read.as_list::<i32>();
2214        let expected = expected.as_list::<i32>();
2215        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2216        // compare list null buffers
2217        assert_eq!(read.nulls(), expected.nulls());
2218        // build struct from list
2219        let struct_array = read.values().as_struct();
2220        let expected_struct_array = expected.values().as_struct();
2221
2222        assert_eq!(7, struct_array.len());
2223        assert_eq!(1, struct_array.null_count());
2224        assert_eq!(7, expected_struct_array.len());
2225        assert_eq!(1, expected_struct_array.null_count());
2226        // test struct's nulls
2227        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2228        // test struct's fields
2229        let read_b = struct_array.column(0);
2230        assert_eq!(read_b.as_ref(), &b);
2231        let read_c = struct_array.column(1);
2232        assert_eq!(read_c.to_data(), c);
2233        let read_c = read_c.as_struct();
2234        let read_d = read_c.column(0);
2235        assert_eq!(read_d.as_ref(), &d);
2236
2237        assert_eq!(read, expected);
2238    }
2239
2240    fn assert_read_list_view<O: OffsetSizeTrait>() {
2241        let field = Arc::new(Field::new("item", DataType::Int32, true));
2242        let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
2243        let schema = Arc::new(Schema::new(vec![Field::new("lv", data_type, true)]));
2244
2245        let buf = r#"
2246        {"lv": [1, 2, 3]}
2247        {"lv": [4, null]}
2248        {"lv": null}
2249        {"lv": [6]}
2250        {"lv": []}
2251        "#;
2252
2253        let batches = do_read(buf, 1024, false, false, schema);
2254        assert_eq!(batches.len(), 1);
2255        let batch = &batches[0];
2256        let col = batch.column(0);
2257        let list_view = col
2258            .as_any()
2259            .downcast_ref::<GenericListViewArray<O>>()
2260            .unwrap();
2261
2262        assert_eq!(list_view.len(), 5);
2263
2264        // Check offsets and sizes
2265        let expected_offsets: Vec<O> = vec![0, 3, 5, 5, 6]
2266            .into_iter()
2267            .map(|v| O::usize_as(v))
2268            .collect();
2269        let expected_sizes: Vec<O> = vec![3, 2, 0, 1, 0]
2270            .into_iter()
2271            .map(|v| O::usize_as(v))
2272            .collect();
2273        assert_eq!(list_view.value_offsets(), &expected_offsets);
2274        assert_eq!(list_view.value_sizes(), &expected_sizes);
2275
2276        // Row 0: [1, 2, 3]
2277        assert!(list_view.is_valid(0));
2278        let vals = list_view.value(0);
2279        let ints = vals.as_primitive::<Int32Type>();
2280        assert_eq!(ints.values(), &[1, 2, 3]);
2281
2282        // Row 1: [4, null]
2283        assert!(list_view.is_valid(1));
2284        let vals = list_view.value(1);
2285        let ints = vals.as_primitive::<Int32Type>();
2286        assert_eq!(ints.len(), 2);
2287        assert_eq!(ints.value(0), 4);
2288        assert!(ints.is_null(1));
2289
2290        // Row 2: null
2291        assert!(list_view.is_null(2));
2292
2293        // Row 3: [6]
2294        assert!(list_view.is_valid(3));
2295        let vals = list_view.value(3);
2296        let ints = vals.as_primitive::<Int32Type>();
2297        assert_eq!(ints.values(), &[6]);
2298
2299        // Row 4: []
2300        assert!(list_view.is_valid(4));
2301        let vals = list_view.value(4);
2302        assert_eq!(vals.len(), 0);
2303    }
2304
2305    #[test]
2306    fn test_read_list_view() {
2307        assert_read_list_view::<i32>();
2308        assert_read_list_view::<i64>();
2309    }
2310
2311    #[test]
2312    fn test_skip_empty_lines() {
2313        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2314        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2315        let json_content = "
2316        {\"a\": 1}
2317        {\"a\": 2}
2318        {\"a\": 3}";
2319        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2320        let batch = reader.next().unwrap().unwrap();
2321
2322        assert_eq!(1, batch.num_columns());
2323        assert_eq!(3, batch.num_rows());
2324
2325        let schema = reader.schema();
2326        let c = schema.column_with_name("a").unwrap();
2327        assert_eq!(&DataType::Int64, c.1.data_type());
2328    }
2329
2330    #[test]
2331    fn test_with_multiple_batches() {
2332        let file = File::open("test/data/basic_nulls.json").unwrap();
2333        let mut reader = BufReader::new(file);
2334        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2335        reader.rewind().unwrap();
2336
2337        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2338        let mut reader = builder.build(reader).unwrap();
2339
2340        let mut num_records = Vec::new();
2341        while let Some(rb) = reader.next().transpose().unwrap() {
2342            num_records.push(rb.num_rows());
2343        }
2344
2345        assert_eq!(vec![5, 5, 2], num_records);
2346    }
2347
2348    #[test]
2349    fn test_timestamp_from_json_seconds() {
2350        let schema = Schema::new(vec![Field::new(
2351            "a",
2352            DataType::Timestamp(TimeUnit::Second, None),
2353            true,
2354        )]);
2355
2356        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2357        let batch = reader.next().unwrap().unwrap();
2358
2359        assert_eq!(1, batch.num_columns());
2360        assert_eq!(12, batch.num_rows());
2361
2362        let schema = reader.schema();
2363        let batch_schema = batch.schema();
2364        assert_eq!(schema, batch_schema);
2365
2366        let a = schema.column_with_name("a").unwrap();
2367        assert_eq!(
2368            &DataType::Timestamp(TimeUnit::Second, None),
2369            a.1.data_type()
2370        );
2371
2372        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2373        assert!(aa.is_valid(0));
2374        assert!(!aa.is_valid(1));
2375        assert!(!aa.is_valid(2));
2376        assert_eq!(1, aa.value(0));
2377        assert_eq!(1, aa.value(3));
2378        assert_eq!(5, aa.value(7));
2379    }
2380
2381    #[test]
2382    fn test_timestamp_from_json_milliseconds() {
2383        let schema = Schema::new(vec![Field::new(
2384            "a",
2385            DataType::Timestamp(TimeUnit::Millisecond, None),
2386            true,
2387        )]);
2388
2389        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2390        let batch = reader.next().unwrap().unwrap();
2391
2392        assert_eq!(1, batch.num_columns());
2393        assert_eq!(12, batch.num_rows());
2394
2395        let schema = reader.schema();
2396        let batch_schema = batch.schema();
2397        assert_eq!(schema, batch_schema);
2398
2399        let a = schema.column_with_name("a").unwrap();
2400        assert_eq!(
2401            &DataType::Timestamp(TimeUnit::Millisecond, None),
2402            a.1.data_type()
2403        );
2404
2405        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2406        assert!(aa.is_valid(0));
2407        assert!(!aa.is_valid(1));
2408        assert!(!aa.is_valid(2));
2409        assert_eq!(1, aa.value(0));
2410        assert_eq!(1, aa.value(3));
2411        assert_eq!(5, aa.value(7));
2412    }
2413
2414    #[test]
2415    fn test_date_from_json_milliseconds() {
2416        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2417
2418        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2419        let batch = reader.next().unwrap().unwrap();
2420
2421        assert_eq!(1, batch.num_columns());
2422        assert_eq!(12, batch.num_rows());
2423
2424        let schema = reader.schema();
2425        let batch_schema = batch.schema();
2426        assert_eq!(schema, batch_schema);
2427
2428        let a = schema.column_with_name("a").unwrap();
2429        assert_eq!(&DataType::Date64, a.1.data_type());
2430
2431        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2432        assert!(aa.is_valid(0));
2433        assert!(!aa.is_valid(1));
2434        assert!(!aa.is_valid(2));
2435        assert_eq!(1, aa.value(0));
2436        assert_eq!(1, aa.value(3));
2437        assert_eq!(5, aa.value(7));
2438    }
2439
2440    #[test]
2441    fn test_time_from_json_nanoseconds() {
2442        let schema = Schema::new(vec![Field::new(
2443            "a",
2444            DataType::Time64(TimeUnit::Nanosecond),
2445            true,
2446        )]);
2447
2448        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2449        let batch = reader.next().unwrap().unwrap();
2450
2451        assert_eq!(1, batch.num_columns());
2452        assert_eq!(12, batch.num_rows());
2453
2454        let schema = reader.schema();
2455        let batch_schema = batch.schema();
2456        assert_eq!(schema, batch_schema);
2457
2458        let a = schema.column_with_name("a").unwrap();
2459        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2460
2461        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2462        assert!(aa.is_valid(0));
2463        assert!(!aa.is_valid(1));
2464        assert!(!aa.is_valid(2));
2465        assert_eq!(1, aa.value(0));
2466        assert_eq!(1, aa.value(3));
2467        assert_eq!(5, aa.value(7));
2468    }
2469
2470    #[test]
2471    fn test_json_iterator() {
2472        let file = File::open("test/data/basic.json").unwrap();
2473        let mut reader = BufReader::new(file);
2474        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2475        reader.rewind().unwrap();
2476
2477        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2478        let reader = builder.build(reader).unwrap();
2479        let schema = reader.schema();
2480        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2481
2482        let mut sum_num_rows = 0;
2483        let mut num_batches = 0;
2484        let mut sum_a = 0;
2485        for batch in reader {
2486            let batch = batch.unwrap();
2487            assert_eq!(8, batch.num_columns());
2488            sum_num_rows += batch.num_rows();
2489            num_batches += 1;
2490            let batch_schema = batch.schema();
2491            assert_eq!(schema, batch_schema);
2492            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2493            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2494        }
2495        assert_eq!(12, sum_num_rows);
2496        assert_eq!(3, num_batches);
2497        assert_eq!(100000000000011, sum_a);
2498    }
2499
2500    #[test]
2501    fn test_decoder_error() {
2502        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2503            "a",
2504            vec![Field::new("child", DataType::Int32, false)],
2505            true,
2506        )]));
2507
2508        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2509        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2510        assert!(decoder.tape_decoder.has_partial_row());
2511        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2512        let _ = decoder.flush().unwrap_err();
2513        assert!(decoder.tape_decoder.has_partial_row());
2514        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2515
2516        let parse_err = |s: &str| {
2517            ReaderBuilder::new(schema.clone())
2518                .build(Cursor::new(s.as_bytes()))
2519                .unwrap()
2520                .next()
2521                .unwrap()
2522                .unwrap_err()
2523                .to_string()
2524        };
2525
2526        let err = parse_err(r#"{"a": 123}"#);
2527        assert_eq!(
2528            err,
2529            "Json error: whilst decoding field 'a': expected { got 123"
2530        );
2531
2532        let err = parse_err(r#"{"a": ["bar"]}"#);
2533        assert_eq!(
2534            err,
2535            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2536        );
2537
2538        let err = parse_err(r#"{"a": []}"#);
2539        assert_eq!(
2540            err,
2541            "Json error: whilst decoding field 'a': expected { got []"
2542        );
2543
2544        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2545        assert_eq!(
2546            err,
2547            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2548        );
2549
2550        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2551        assert_eq!(
2552            err,
2553            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2554        );
2555
2556        let err = parse_err(r#"{"a": true}"#);
2557        assert_eq!(
2558            err,
2559            "Json error: whilst decoding field 'a': expected { got true"
2560        );
2561
2562        let err = parse_err(r#"{"a": false}"#);
2563        assert_eq!(
2564            err,
2565            "Json error: whilst decoding field 'a': expected { got false"
2566        );
2567
2568        let err = parse_err(r#"{"a": "foo"}"#);
2569        assert_eq!(
2570            err,
2571            "Json error: whilst decoding field 'a': expected { got \"foo\""
2572        );
2573
2574        let err = parse_err(r#"{"a": {"child": false}}"#);
2575        assert_eq!(
2576            err,
2577            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2578        );
2579
2580        let err = parse_err(r#"{"a": {"child": []}}"#);
2581        assert_eq!(
2582            err,
2583            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2584        );
2585
2586        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2587        assert_eq!(
2588            err,
2589            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2590        );
2591
2592        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2593        assert_eq!(
2594            err,
2595            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2596        );
2597    }
2598
2599    #[test]
2600    fn test_serialize_timestamp() {
2601        let json = vec![
2602            json!({"timestamp": 1681319393}),
2603            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2604        ];
2605        let schema = Schema::new(vec![Field::new(
2606            "timestamp",
2607            DataType::Timestamp(TimeUnit::Second, None),
2608            true,
2609        )]);
2610        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2611            .build_decoder()
2612            .unwrap();
2613        decoder.serialize(&json).unwrap();
2614        let batch = decoder.flush().unwrap().unwrap();
2615        assert_eq!(batch.num_rows(), 2);
2616        assert_eq!(batch.num_columns(), 1);
2617        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2618        assert_eq!(values.values(), &[1681319393, -7200]);
2619    }
2620
2621    #[test]
2622    fn test_serialize_decimal() {
2623        let json = vec![
2624            json!({"decimal": 1.234}),
2625            json!({"decimal": "1.234"}),
2626            json!({"decimal": 1234}),
2627            json!({"decimal": "1234"}),
2628        ];
2629        let schema = Schema::new(vec![Field::new(
2630            "decimal",
2631            DataType::Decimal128(10, 3),
2632            true,
2633        )]);
2634        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2635            .build_decoder()
2636            .unwrap();
2637        decoder.serialize(&json).unwrap();
2638        let batch = decoder.flush().unwrap().unwrap();
2639        assert_eq!(batch.num_rows(), 4);
2640        assert_eq!(batch.num_columns(), 1);
2641        let values = batch.column(0).as_primitive::<Decimal128Type>();
2642        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2643    }
2644
2645    #[test]
2646    fn test_serde_field() {
2647        let field = Field::new("int", DataType::Int32, true);
2648        let mut decoder = ReaderBuilder::new_with_field(field)
2649            .build_decoder()
2650            .unwrap();
2651        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2652        let b = decoder.flush().unwrap().unwrap();
2653        let values = b.column(0).as_primitive::<Int32Type>().values();
2654        assert_eq!(values, &[1, 2, 3, 4]);
2655    }
2656
2657    #[test]
2658    fn test_serde_large_numbers() {
2659        let field = Field::new("int", DataType::Int64, true);
2660        let mut decoder = ReaderBuilder::new_with_field(field)
2661            .build_decoder()
2662            .unwrap();
2663
2664        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2665        let b = decoder.flush().unwrap().unwrap();
2666        let values = b.column(0).as_primitive::<Int64Type>().values();
2667        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2668
2669        let field = Field::new(
2670            "int",
2671            DataType::Timestamp(TimeUnit::Microsecond, None),
2672            true,
2673        );
2674        let mut decoder = ReaderBuilder::new_with_field(field)
2675            .build_decoder()
2676            .unwrap();
2677
2678        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2679        let b = decoder.flush().unwrap().unwrap();
2680        let values = b
2681            .column(0)
2682            .as_primitive::<TimestampMicrosecondType>()
2683            .values();
2684        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2685    }
2686
2687    #[test]
2688    fn test_coercing_primitive_into_string_decoder() {
2689        let buf = &format!(
2690            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2691            (i32::MAX as i64 + 10),
2692            i64::MAX - 10
2693        );
2694        let schema = Schema::new(vec![
2695            Field::new("a", DataType::Float64, true),
2696            Field::new("b", DataType::Utf8, true),
2697            Field::new("c", DataType::Utf8, true),
2698        ]);
2699        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2700        let schema_ref = Arc::new(schema);
2701
2702        // read record batches
2703        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2704        let mut decoder = reader.build_decoder().unwrap();
2705        decoder.serialize(json_array.as_slice()).unwrap();
2706        let batch = decoder.flush().unwrap().unwrap();
2707        assert_eq!(
2708            batch,
2709            RecordBatch::try_new(
2710                schema_ref,
2711                vec![
2712                    Arc::new(Float64Array::from(vec![
2713                        1.0,
2714                        2.0,
2715                        (i32::MAX as i64 + 10) as f64,
2716                        (i64::MAX - 10) as f64
2717                    ])),
2718                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2719                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2720                ]
2721            )
2722            .unwrap()
2723        );
2724    }
2725
2726    // Parse the given `row` in `struct_mode` as a type given by fields.
2727    //
2728    // If as_struct == true, wrap the fields in a Struct field with name "r".
2729    // If as_struct == false, wrap the fields in a Schema.
2730    fn _parse_structs(
2731        row: &str,
2732        struct_mode: StructMode,
2733        fields: Fields,
2734        as_struct: bool,
2735    ) -> Result<RecordBatch, ArrowError> {
2736        let builder = if as_struct {
2737            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2738        } else {
2739            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2740        };
2741        builder
2742            .with_struct_mode(struct_mode)
2743            .build(Cursor::new(row.as_bytes()))
2744            .unwrap()
2745            .next()
2746            .unwrap()
2747    }
2748
2749    #[test]
2750    fn test_struct_decoding_list_length() {
2751        use arrow_array::array;
2752
2753        let row = "[1, 2]";
2754
2755        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2756        let too_few_fields = Fields::from(fields.clone());
2757        fields.push(Field::new("b", DataType::Int32, true));
2758        let correct_fields = Fields::from(fields.clone());
2759        fields.push(Field::new("c", DataType::Int32, true));
2760        let too_many_fields = Fields::from(fields.clone());
2761
2762        let parse = |fields: Fields, as_struct: bool| {
2763            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2764        };
2765
2766        let expected_row = StructArray::new(
2767            correct_fields.clone(),
2768            vec![
2769                Arc::new(array::Int32Array::from(vec![1])),
2770                Arc::new(array::Int32Array::from(vec![2])),
2771            ],
2772            None,
2773        );
2774        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2775
2776        assert_eq!(
2777            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2778            "Json error: found extra columns for 1 fields".to_string()
2779        );
2780        assert_eq!(
2781            parse(too_few_fields, false).unwrap_err().to_string(),
2782            "Json error: found extra columns for 1 fields".to_string()
2783        );
2784        assert_eq!(
2785            parse(correct_fields.clone(), true).unwrap(),
2786            RecordBatch::try_new(
2787                Arc::new(Schema::new(vec![row_field])),
2788                vec![Arc::new(expected_row.clone())]
2789            )
2790            .unwrap()
2791        );
2792        assert_eq!(
2793            parse(correct_fields, false).unwrap(),
2794            RecordBatch::from(expected_row)
2795        );
2796        assert_eq!(
2797            parse(too_many_fields.clone(), true)
2798                .unwrap_err()
2799                .to_string(),
2800            "Json error: found 2 columns for 3 fields".to_string()
2801        );
2802        assert_eq!(
2803            parse(too_many_fields, false).unwrap_err().to_string(),
2804            "Json error: found 2 columns for 3 fields".to_string()
2805        );
2806    }
2807
2808    #[test]
2809    fn test_struct_decoding() {
2810        use arrow_array::builder;
2811
2812        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2813        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2814        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2815
2816        let struct_fields = Fields::from(vec![
2817            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2818            Field::new_map(
2819                "c",
2820                "entries",
2821                Field::new("keys", DataType::Utf8, false),
2822                Field::new("values", DataType::Int32, true),
2823                false,
2824                false,
2825            ),
2826        ]);
2827
2828        let list_array =
2829            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2830
2831        let map_array = {
2832            let mut map_builder = builder::MapBuilder::new(
2833                None,
2834                builder::StringBuilder::new(),
2835                builder::Int32Builder::new(),
2836            );
2837            map_builder.keys().append_value("d");
2838            map_builder.values().append_value(3);
2839            map_builder.append(true).unwrap();
2840            map_builder.finish()
2841        };
2842
2843        let struct_array = StructArray::new(
2844            struct_fields.clone(),
2845            vec![Arc::new(list_array), Arc::new(map_array)],
2846            None,
2847        );
2848
2849        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2850        let schema = Arc::new(Schema::new(fields.clone()));
2851        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2852
2853        let parse = |row: &str, struct_mode: StructMode| {
2854            _parse_structs(row, struct_mode, fields.clone(), false)
2855        };
2856
2857        assert_eq!(
2858            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2859            expected
2860        );
2861        assert_eq!(
2862            parse(nested_list_json, StructMode::ObjectOnly)
2863                .unwrap_err()
2864                .to_string(),
2865            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2866        );
2867        assert_eq!(
2868            parse(nested_mixed_json, StructMode::ObjectOnly)
2869                .unwrap_err()
2870                .to_string(),
2871            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2872        );
2873
2874        assert_eq!(
2875            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2876            expected
2877        );
2878        assert_eq!(
2879            parse(nested_object_json, StructMode::ListOnly)
2880                .unwrap_err()
2881                .to_string(),
2882            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2883        );
2884        assert_eq!(
2885            parse(nested_mixed_json, StructMode::ListOnly)
2886                .unwrap_err()
2887                .to_string(),
2888            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2889        );
2890    }
2891
2892    // Test cases:
2893    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2894    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2895    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2896    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2897    #[test]
2898    fn test_struct_decoding_empty_list() {
2899        let int_field = Field::new("a", DataType::Int32, true);
2900        let struct_field = Field::new(
2901            "r",
2902            DataType::Struct(Fields::from(vec![int_field.clone()])),
2903            true,
2904        );
2905
2906        let parse = |row: &str, as_struct: bool, field: Field| {
2907            _parse_structs(
2908                row,
2909                StructMode::ListOnly,
2910                Fields::from(vec![field]),
2911                as_struct,
2912            )
2913        };
2914
2915        // Missing fields
2916        assert_eq!(
2917            parse("[]", true, struct_field.clone())
2918                .unwrap_err()
2919                .to_string(),
2920            "Json error: found 0 columns for 1 fields".to_owned()
2921        );
2922        assert_eq!(
2923            parse("[]", false, int_field.clone())
2924                .unwrap_err()
2925                .to_string(),
2926            "Json error: found 0 columns for 1 fields".to_owned()
2927        );
2928        assert_eq!(
2929            parse("[]", false, struct_field.clone())
2930                .unwrap_err()
2931                .to_string(),
2932            "Json error: found 0 columns for 1 fields".to_owned()
2933        );
2934        assert_eq!(
2935            parse("[[]]", false, struct_field.clone())
2936                .unwrap_err()
2937                .to_string(),
2938            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2939        );
2940    }
2941
2942    #[test]
2943    fn test_decode_list_struct_with_wrong_types() {
2944        let int_field = Field::new("a", DataType::Int32, true);
2945        let struct_field = Field::new(
2946            "r",
2947            DataType::Struct(Fields::from(vec![int_field.clone()])),
2948            true,
2949        );
2950
2951        let parse = |row: &str, as_struct: bool, field: Field| {
2952            _parse_structs(
2953                row,
2954                StructMode::ListOnly,
2955                Fields::from(vec![field]),
2956                as_struct,
2957            )
2958        };
2959
2960        // Wrong values
2961        assert_eq!(
2962            parse(r#"[["a"]]"#, false, struct_field.clone())
2963                .unwrap_err()
2964                .to_string(),
2965            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2966        );
2967        assert_eq!(
2968            parse(r#"[["a"]]"#, true, struct_field.clone())
2969                .unwrap_err()
2970                .to_string(),
2971            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2972        );
2973        assert_eq!(
2974            parse(r#"["a"]"#, true, int_field.clone())
2975                .unwrap_err()
2976                .to_string(),
2977            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2978        );
2979        assert_eq!(
2980            parse(r#"["a"]"#, false, int_field.clone())
2981                .unwrap_err()
2982                .to_string(),
2983            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2984        );
2985    }
2986
2987    #[test]
2988    fn test_type_conflict_nulls() {
2989        let schema = Schema::new(vec![
2990            Field::new("null", DataType::Null, true),
2991            Field::new("bool", DataType::Boolean, true),
2992            Field::new("primitive", DataType::Int32, true),
2993            Field::new("numeric", DataType::Decimal128(10, 3), true),
2994            Field::new("string", DataType::Utf8, true),
2995            Field::new("string_view", DataType::Utf8View, true),
2996            Field::new(
2997                "timestamp",
2998                DataType::Timestamp(TimeUnit::Second, None),
2999                true,
3000            ),
3001            Field::new(
3002                "array",
3003                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3004                true,
3005            ),
3006            Field::new(
3007                "map",
3008                DataType::Map(
3009                    Arc::new(Field::new(
3010                        "entries",
3011                        DataType::Struct(Fields::from(vec![
3012                            Field::new("keys", DataType::Utf8, false),
3013                            Field::new("values", DataType::Utf8, true),
3014                        ])),
3015                        false, // not nullable
3016                    )),
3017                    false, // not sorted
3018                ),
3019                true, // nullable
3020            ),
3021            Field::new(
3022                "struct",
3023                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3024                true,
3025            ),
3026        ]);
3027
3028        // A compatible value for each schema field above, in schema order
3029        let json_values = vec![
3030            json!(null),
3031            json!(true),
3032            json!(42),
3033            json!(1.234),
3034            json!("hi"),
3035            json!("ho"),
3036            json!("1970-01-01T00:00:00+02:00"),
3037            json!([1, "ho", 3]),
3038            json!({"k": "value"}),
3039            json!({"a": 1}),
3040        ];
3041
3042        // Create a set of JSON rows that rotates each value past every field
3043        let json: Vec<_> = (0..json_values.len())
3044            .map(|i| {
3045                let pairs = json_values[i..]
3046                    .iter()
3047                    .chain(json_values[..i].iter())
3048                    .zip(&schema.fields)
3049                    .map(|(v, f)| (f.name().to_string(), v.clone()))
3050                    .collect();
3051                serde_json::Value::Object(pairs)
3052            })
3053            .collect();
3054        let mut decoder = ReaderBuilder::new(Arc::new(schema))
3055            .with_ignore_type_conflicts(true)
3056            .with_coerce_primitive(true)
3057            .build_decoder()
3058            .unwrap();
3059        decoder.serialize(&json).unwrap();
3060        let batch = decoder.flush().unwrap().unwrap();
3061        assert_eq!(batch.num_rows(), 10);
3062        assert_eq!(batch.num_columns(), 10);
3063
3064        // NOTE: NullArray doesn't materialize any values (they're all NULL by definition)
3065        let _ = batch
3066            .column(0)
3067            .as_any()
3068            .downcast_ref::<NullArray>()
3069            .unwrap();
3070
3071        assert!(
3072            batch
3073                .column(1)
3074                .as_any()
3075                .downcast_ref::<BooleanArray>()
3076                .unwrap()
3077                .iter()
3078                .eq([
3079                    Some(true),
3080                    None,
3081                    None,
3082                    None,
3083                    None,
3084                    None,
3085                    None,
3086                    None,
3087                    None,
3088                    None
3089                ])
3090        );
3091
3092        assert!(batch.column(2).as_primitive::<Int32Type>().iter().eq([
3093            Some(42),
3094            Some(1),
3095            None,
3096            None,
3097            None,
3098            None,
3099            None,
3100            None,
3101            None,
3102            None
3103        ]));
3104
3105        assert!(batch.column(3).as_primitive::<Decimal128Type>().iter().eq([
3106            Some(1234),
3107            None,
3108            None,
3109            None,
3110            None,
3111            None,
3112            None,
3113            None,
3114            None,
3115            Some(42000)
3116        ]));
3117
3118        assert!(
3119            batch
3120                .column(4)
3121                .as_any()
3122                .downcast_ref::<StringArray>()
3123                .unwrap()
3124                .iter()
3125                .eq([
3126                    Some("hi"),
3127                    Some("ho"),
3128                    Some("1970-01-01T00:00:00+02:00"),
3129                    None,
3130                    None,
3131                    None,
3132                    None,
3133                    Some("true"),
3134                    Some("42"),
3135                    Some("1.234"),
3136                ])
3137        );
3138
3139        assert!(
3140            batch
3141                .column(5)
3142                .as_any()
3143                .downcast_ref::<StringViewArray>()
3144                .unwrap()
3145                .iter()
3146                .eq([
3147                    Some("ho"),
3148                    Some("1970-01-01T00:00:00+02:00"),
3149                    None,
3150                    None,
3151                    None,
3152                    None,
3153                    Some("true"),
3154                    Some("42"),
3155                    Some("1.234"),
3156                    Some("hi"),
3157                ])
3158        );
3159
3160        assert!(
3161            batch
3162                .column(6)
3163                .as_primitive::<TimestampSecondType>()
3164                .iter()
3165                .eq([
3166                    Some(-7200),
3167                    None,
3168                    None,
3169                    None,
3170                    None,
3171                    None,
3172                    Some(42),
3173                    None,
3174                    None,
3175                    None,
3176                ])
3177        );
3178
3179        let arrays = batch
3180            .column(7)
3181            .as_any()
3182            .downcast_ref::<ListArray>()
3183            .unwrap();
3184        assert_eq!(
3185            arrays.nulls(),
3186            Some(&NullBuffer::from(
3187                &[
3188                    true, false, false, false, false, false, false, false, false, false
3189                ][..]
3190            ))
3191        );
3192        assert_eq!(arrays.offsets()[1], 3);
3193        let array_values = arrays
3194            .values()
3195            .as_any()
3196            .downcast_ref::<Int32Array>()
3197            .unwrap();
3198        assert!(array_values.iter().eq([Some(1), None, Some(3)]));
3199
3200        let maps = batch.column(8).as_any().downcast_ref::<MapArray>().unwrap();
3201        assert_eq!(
3202            maps.nulls(),
3203            Some(&NullBuffer::from(
3204                // Both map and struct can parse
3205                &[
3206                    true, true, false, false, false, false, false, false, false, false
3207                ][..]
3208            ))
3209        );
3210        let map_keys = maps.keys().as_any().downcast_ref::<StringArray>().unwrap();
3211        assert!(map_keys.iter().eq([Some("k"), Some("a")]));
3212        let map_values = maps
3213            .values()
3214            .as_any()
3215            .downcast_ref::<StringArray>()
3216            .unwrap();
3217        assert!(map_values.iter().eq([Some("value"), Some("1")]));
3218
3219        let structs = batch
3220            .column(9)
3221            .as_any()
3222            .downcast_ref::<StructArray>()
3223            .unwrap();
3224        assert_eq!(
3225            structs.nulls(),
3226            Some(&NullBuffer::from(
3227                // Both map and struct can parse
3228                &[
3229                    true, false, false, false, false, false, false, false, false, true
3230                ][..]
3231            ))
3232        );
3233        let struct_fields = structs
3234            .column(0)
3235            .as_any()
3236            .downcast_ref::<Int32Array>()
3237            .unwrap();
3238        assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None]));
3239    }
3240
3241    #[test]
3242    fn test_type_conflict_non_nullable() {
3243        let fields = [
3244            Field::new("bool", DataType::Boolean, false),
3245            Field::new("primitive", DataType::Int32, false),
3246            Field::new("numeric", DataType::Decimal128(10, 3), false),
3247            Field::new("string", DataType::Utf8, false),
3248            Field::new("string_view", DataType::Utf8View, false),
3249            Field::new(
3250                "timestamp",
3251                DataType::Timestamp(TimeUnit::Second, None),
3252                false,
3253            ),
3254            Field::new(
3255                "array",
3256                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3257                false,
3258            ),
3259            Field::new(
3260                "map",
3261                DataType::Map(
3262                    Arc::new(Field::new(
3263                        "entries",
3264                        DataType::Struct(Fields::from(vec![
3265                            Field::new("keys", DataType::Utf8, false),
3266                            Field::new("values", DataType::Utf8, true),
3267                        ])),
3268                        false, // not nullable
3269                    )),
3270                    false, // not sorted
3271                ),
3272                false, // not nullable
3273            ),
3274            Field::new(
3275                "struct",
3276                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3277                false,
3278            ),
3279        ];
3280
3281        // Every field above will have a type conflict with at least one of these values
3282        let json_values = vec![json!(true), json!({"a": 1})];
3283
3284        for field in fields {
3285            let mut decoder = ReaderBuilder::new_with_field(field)
3286                .with_ignore_type_conflicts(true)
3287                .build_decoder()
3288                .unwrap();
3289            decoder.serialize(&json_values).unwrap();
3290            decoder
3291                .flush()
3292                .expect_err("type conflict on non-nullable type");
3293        }
3294    }
3295
3296    #[test]
3297    fn test_ignore_type_conflicts_disabled() {
3298        let fields = [
3299            Field::new("null", DataType::Null, true),
3300            Field::new("bool", DataType::Boolean, true),
3301            Field::new("primitive", DataType::Int32, true),
3302            Field::new("numeric", DataType::Decimal128(10, 3), true),
3303            Field::new("string", DataType::Utf8, true),
3304            Field::new("string_view", DataType::Utf8View, true),
3305            Field::new(
3306                "timestamp",
3307                DataType::Timestamp(TimeUnit::Second, None),
3308                true,
3309            ),
3310            Field::new(
3311                "array",
3312                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3313                true,
3314            ),
3315            Field::new(
3316                "map",
3317                DataType::Map(
3318                    Arc::new(Field::new(
3319                        "entries",
3320                        DataType::Struct(Fields::from(vec![
3321                            Field::new("keys", DataType::Utf8, false),
3322                            Field::new("values", DataType::Utf8, true),
3323                        ])),
3324                        false, // not nullable
3325                    )),
3326                    false, // not sorted
3327                ),
3328                true, // not nullable
3329            ),
3330            Field::new(
3331                "struct",
3332                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3333                true,
3334            ),
3335        ];
3336
3337        // Every field above will have a type conflict with at least one of these values
3338        let json_values = vec![json!(true), json!({"a": 1})];
3339
3340        for field in fields {
3341            let mut decoder = ReaderBuilder::new_with_field(field)
3342                .build_decoder()
3343                .unwrap();
3344            decoder.serialize(&json_values).unwrap();
3345            decoder
3346                .flush()
3347                .expect_err("type conflict on non-nullable type");
3348        }
3349    }
3350
3351    #[test]
3352    fn test_read_run_end_encoded() {
3353        let buf = r#"
3354        {"a": "x"}
3355        {"a": "x"}
3356        {"a": "y"}
3357        {"a": "y"}
3358        {"a": "y"}
3359        "#;
3360
3361        let ree_type = DataType::RunEndEncoded(
3362            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3363            Arc::new(Field::new("values", DataType::Utf8, true)),
3364        );
3365        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3366        let batches = do_read(buf, 1024, false, false, schema);
3367        assert_eq!(batches.len(), 1);
3368
3369        let col = batches[0].column(0);
3370        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3371
3372        // 5 logical values compressed into 2 runs
3373        assert_eq!(run_array.len(), 5);
3374        assert_eq!(run_array.run_ends().values(), &[2, 5]);
3375
3376        let values = run_array.values().as_string::<i32>();
3377        assert_eq!(values.len(), 2);
3378        assert_eq!(values.value(0), "x");
3379        assert_eq!(values.value(1), "y");
3380    }
3381
3382    #[test]
3383    fn test_read_run_end_encoded_consecutive_nulls() {
3384        let buf = r#"
3385        {"a": "x"}
3386        {}
3387        {}
3388        {}
3389        {"a": "y"}
3390        "#;
3391
3392        let ree_type = DataType::RunEndEncoded(
3393            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3394            Arc::new(Field::new("values", DataType::Utf8, true)),
3395        );
3396        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3397        let batches = do_read(buf, 1024, false, false, schema);
3398        assert_eq!(batches.len(), 1);
3399
3400        let col = batches[0].column(0);
3401        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3402
3403        // 5 logical values: "x", null, null, null, "y" → 3 runs
3404        assert_eq!(run_array.len(), 5);
3405        assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
3406
3407        let values = run_array.values().as_string::<i32>();
3408        assert_eq!(values.len(), 3);
3409        assert_eq!(values.value(0), "x");
3410        assert!(values.is_null(1));
3411        assert_eq!(values.value(2), "y");
3412    }
3413
3414    #[test]
3415    fn test_read_run_end_encoded_all_unique() {
3416        let buf = r#"
3417        {"a": 1}
3418        {"a": 2}
3419        {"a": 3}
3420        "#;
3421
3422        let ree_type = DataType::RunEndEncoded(
3423            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3424            Arc::new(Field::new("values", DataType::Int32, true)),
3425        );
3426        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3427        let batches = do_read(buf, 1024, false, false, schema);
3428        assert_eq!(batches.len(), 1);
3429
3430        let col = batches[0].column(0);
3431        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3432
3433        // No compression: 3 unique values → 3 runs
3434        assert_eq!(run_array.len(), 3);
3435        assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
3436    }
3437
3438    #[test]
3439    fn test_read_run_end_encoded_int16_run_ends() {
3440        let buf = r#"
3441        {"a": "x"}
3442        {"a": "x"}
3443        {"a": "y"}
3444        "#;
3445
3446        let ree_type = DataType::RunEndEncoded(
3447            Arc::new(Field::new("run_ends", DataType::Int16, false)),
3448            Arc::new(Field::new("values", DataType::Utf8, true)),
3449        );
3450        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3451        let batches = do_read(buf, 1024, false, false, schema);
3452        assert_eq!(batches.len(), 1);
3453
3454        let col = batches[0].column(0);
3455        let run_array = col.as_run::<arrow_array::types::Int16Type>();
3456
3457        assert_eq!(run_array.len(), 3);
3458        assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
3459    }
3460}