Skip to main content

arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use crate::StructMode;
137use crate::reader::binary_array::{
138    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::borrow::Cow;
141use std::io::BufRead;
142use std::sync::Arc;
143
144use chrono::Utc;
145use serde_core::Serialize;
146
147use arrow_array::timezone::Tz;
148use arrow_array::types::*;
149use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
150use arrow_data::ArrayData;
151use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
152pub use schema::*;
153pub use value_iter::ValueIter;
154
155use crate::reader::boolean_array::BooleanArrayDecoder;
156use crate::reader::decimal_array::DecimalArrayDecoder;
157use crate::reader::list_array::ListArrayDecoder;
158use crate::reader::map_array::MapArrayDecoder;
159use crate::reader::null_array::NullArrayDecoder;
160use crate::reader::primitive_array::PrimitiveArrayDecoder;
161use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
162use crate::reader::string_array::StringArrayDecoder;
163use crate::reader::string_view_array::StringViewArrayDecoder;
164use crate::reader::struct_array::StructArrayDecoder;
165use crate::reader::tape::{Tape, TapeDecoder};
166use crate::reader::timestamp_array::TimestampArrayDecoder;
167
168mod binary_array;
169mod boolean_array;
170mod decimal_array;
171mod list_array;
172mod map_array;
173mod null_array;
174mod primitive_array;
175mod run_end_array;
176mod schema;
177mod serializer;
178mod string_array;
179mod string_view_array;
180mod struct_array;
181mod tape;
182mod timestamp_array;
183mod value_iter;
184
185/// A builder for [`Reader`] and [`Decoder`]
186pub struct ReaderBuilder {
187    batch_size: usize,
188    coerce_primitive: bool,
189    strict_mode: bool,
190    is_field: bool,
191    struct_mode: StructMode,
192
193    schema: SchemaRef,
194}
195
196impl ReaderBuilder {
197    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
198    ///
199    /// This could be obtained using [`infer_json_schema`] if not known
200    ///
201    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
202    /// In this case, an error is returned when a column is missing from `schema`.
203    ///
204    /// [`infer_json_schema`]: crate::reader::infer_json_schema
205    pub fn new(schema: SchemaRef) -> Self {
206        Self {
207            batch_size: 1024,
208            coerce_primitive: false,
209            strict_mode: false,
210            is_field: false,
211            struct_mode: Default::default(),
212            schema,
213        }
214    }
215
216    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
217    ///
218    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
219    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
220    ///
221    /// ```
222    /// # use std::sync::Arc;
223    /// # use arrow_array::cast::AsArray;
224    /// # use arrow_array::types::Int32Type;
225    /// # use arrow_json::ReaderBuilder;
226    /// # use arrow_schema::{DataType, Field};
227    /// // Root of JSON schema is a numeric type
228    /// let data = "1\n2\n3\n";
229    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
230    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
231    /// let b = reader.next().unwrap().unwrap();
232    /// let values = b.column(0).as_primitive::<Int32Type>().values();
233    /// assert_eq!(values, &[1, 2, 3]);
234    ///
235    /// // Root of JSON schema is a list type
236    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
237    /// let field = Field::new_list("int", field.clone(), true);
238    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
239    /// let b = reader.next().unwrap().unwrap();
240    /// let list = b.column(0).as_list::<i32>();
241    ///
242    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
243    /// let list_values = list.values().as_primitive::<Int32Type>();
244    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
245    /// ```
246    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
247        Self {
248            batch_size: 1024,
249            coerce_primitive: false,
250            strict_mode: false,
251            is_field: true,
252            struct_mode: Default::default(),
253            schema: Arc::new(Schema::new([field.into()])),
254        }
255    }
256
257    /// Sets the batch size in rows to read
258    pub fn with_batch_size(self, batch_size: usize) -> Self {
259        Self { batch_size, ..self }
260    }
261
262    /// Sets if the decoder should coerce primitive values (bool and number) into string
263    /// when the Schema's column is Utf8 or LargeUtf8.
264    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
265        Self {
266            coerce_primitive,
267            ..self
268        }
269    }
270
271    /// Sets if the decoder should return an error if it encounters a column not
272    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
273    /// `strict_mode` is effectively `true`. It is required for all fields of
274    /// the struct to be in the list: without field names, there is no way to
275    /// determine which field is missing.
276    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
277        Self {
278            strict_mode,
279            ..self
280        }
281    }
282
283    /// Set the [`StructMode`] for the reader, which determines whether structs
284    /// can be decoded from JSON as objects or lists. For more details refer to
285    /// the enum documentation. Default is to use `ObjectOnly`.
286    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
287        Self {
288            struct_mode,
289            ..self
290        }
291    }
292
293    /// Create a [`Reader`] with the provided [`BufRead`]
294    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
295        Ok(Reader {
296            reader,
297            decoder: self.build_decoder()?,
298        })
299    }
300
301    /// Create a [`Decoder`]
302    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
303        let (data_type, nullable) = if self.is_field {
304            let field = &self.schema.fields[0];
305            let data_type = Cow::Borrowed(field.data_type());
306            (data_type, field.is_nullable())
307        } else {
308            let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
309            (data_type, false)
310        };
311
312        let ctx = DecoderContext {
313            coerce_primitive: self.coerce_primitive,
314            strict_mode: self.strict_mode,
315            struct_mode: self.struct_mode,
316        };
317        let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
318
319        let num_fields = self.schema.flattened_fields().len();
320
321        Ok(Decoder {
322            decoder,
323            is_field: self.is_field,
324            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
325            batch_size: self.batch_size,
326            schema: self.schema,
327        })
328    }
329}
330
331/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
332///
333/// Lines consisting solely of ASCII whitespace are ignored
334pub struct Reader<R> {
335    reader: R,
336    decoder: Decoder,
337}
338
339impl<R> std::fmt::Debug for Reader<R> {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        f.debug_struct("Reader")
342            .field("decoder", &self.decoder)
343            .finish()
344    }
345}
346
347impl<R: BufRead> Reader<R> {
348    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
349    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
350        loop {
351            let buf = self.reader.fill_buf()?;
352            if buf.is_empty() {
353                break;
354            }
355            let read = buf.len();
356
357            let decoded = self.decoder.decode(buf)?;
358            self.reader.consume(decoded);
359            if decoded != read {
360                break;
361            }
362        }
363        self.decoder.flush()
364    }
365}
366
367impl<R: BufRead> Iterator for Reader<R> {
368    type Item = Result<RecordBatch, ArrowError>;
369
370    fn next(&mut self) -> Option<Self::Item> {
371        self.read().transpose()
372    }
373}
374
375impl<R: BufRead> RecordBatchReader for Reader<R> {
376    fn schema(&self) -> SchemaRef {
377        self.decoder.schema.clone()
378    }
379}
380
381/// A low-level interface for reading JSON data from a byte stream
382///
383/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
384///
385/// The push-based interface facilitates integration with sources that yield arbitrarily
386/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
387/// object storage
388///
389/// ```
390/// # use std::io::BufRead;
391/// # use arrow_array::RecordBatch;
392/// # use arrow_json::reader::{Decoder, ReaderBuilder};
393/// # use arrow_schema::{ArrowError, SchemaRef};
394/// #
395/// fn read_from_json<R: BufRead>(
396///     mut reader: R,
397///     schema: SchemaRef,
398/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
399///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
400///     let mut next = move || {
401///         loop {
402///             // Decoder is agnostic that buf doesn't contain whole records
403///             let buf = reader.fill_buf()?;
404///             if buf.is_empty() {
405///                 break; // Input exhausted
406///             }
407///             let read = buf.len();
408///             let decoded = decoder.decode(buf)?;
409///
410///             // Consume the number of bytes read
411///             reader.consume(decoded);
412///             if decoded != read {
413///                 break; // Read batch size
414///             }
415///         }
416///         decoder.flush()
417///     };
418///     Ok(std::iter::from_fn(move || next().transpose()))
419/// }
420/// ```
421pub struct Decoder {
422    tape_decoder: TapeDecoder,
423    decoder: Box<dyn ArrayDecoder>,
424    batch_size: usize,
425    is_field: bool,
426    schema: SchemaRef,
427}
428
429impl std::fmt::Debug for Decoder {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        f.debug_struct("Decoder")
432            .field("schema", &self.schema)
433            .field("batch_size", &self.batch_size)
434            .finish()
435    }
436}
437
438impl Decoder {
439    /// Read JSON objects from `buf`, returning the number of bytes read
440    ///
441    /// This method returns once `batch_size` objects have been parsed since the
442    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
443    /// should be included in the next call to [`Self::decode`]
444    ///
445    /// There is no requirement that `buf` contains a whole number of records, facilitating
446    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
447    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
448        self.tape_decoder.decode(buf)
449    }
450
451    /// Serialize `rows` to this [`Decoder`]
452    ///
453    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
454    /// [`RecordBatch`].
455    ///
456    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
457    /// especially where the schema is known at compile-time, however, this provides a mechanism
458    /// to get something up and running quickly
459    ///
460    /// It can be used with [`serde_json::Value`]
461    ///
462    /// ```
463    /// # use std::sync::Arc;
464    /// # use serde_json::{Value, json};
465    /// # use arrow_array::cast::AsArray;
466    /// # use arrow_array::types::Float32Type;
467    /// # use arrow_json::ReaderBuilder;
468    /// # use arrow_schema::{DataType, Field, Schema};
469    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
470    ///
471    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
472    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
473    ///
474    /// decoder.serialize(&json).unwrap();
475    /// let batch = decoder.flush().unwrap().unwrap();
476    /// assert_eq!(batch.num_rows(), 2);
477    /// assert_eq!(batch.num_columns(), 1);
478    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
479    /// assert_eq!(values, &[2.3, 5.7])
480    /// ```
481    ///
482    /// Or with arbitrary [`Serialize`] types
483    ///
484    /// ```
485    /// # use std::sync::Arc;
486    /// # use arrow_json::ReaderBuilder;
487    /// # use arrow_schema::{DataType, Field, Schema};
488    /// # use serde::Serialize;
489    /// # use arrow_array::cast::AsArray;
490    /// # use arrow_array::types::{Float32Type, Int32Type};
491    /// #
492    /// #[derive(Serialize)]
493    /// struct MyStruct {
494    ///     int32: i32,
495    ///     float: f32,
496    /// }
497    ///
498    /// let schema = Schema::new(vec![
499    ///     Field::new("int32", DataType::Int32, false),
500    ///     Field::new("float", DataType::Float32, false),
501    /// ]);
502    ///
503    /// let rows = vec![
504    ///     MyStruct{ int32: 0, float: 3. },
505    ///     MyStruct{ int32: 4, float: 67.53 },
506    /// ];
507    ///
508    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
509    /// decoder.serialize(&rows).unwrap();
510    ///
511    /// let batch = decoder.flush().unwrap().unwrap();
512    ///
513    /// // Expect batch containing two columns
514    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
515    /// assert_eq!(int32.values(), &[0, 4]);
516    ///
517    /// let float = batch.column(1).as_primitive::<Float32Type>();
518    /// assert_eq!(float.values(), &[3., 67.53]);
519    /// ```
520    ///
521    /// Or even complex nested types
522    ///
523    /// ```
524    /// # use std::collections::BTreeMap;
525    /// # use std::sync::Arc;
526    /// # use arrow_array::StructArray;
527    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
528    /// # use arrow_json::ReaderBuilder;
529    /// # use arrow_schema::{DataType, Field, Fields, Schema};
530    /// # use serde::Serialize;
531    /// #
532    /// #[derive(Serialize)]
533    /// struct MyStruct {
534    ///     int32: i32,
535    ///     list: Vec<f64>,
536    ///     nested: Vec<Option<Nested>>,
537    /// }
538    ///
539    /// impl MyStruct {
540    ///     /// Returns the [`Fields`] for [`MyStruct`]
541    ///     fn fields() -> Fields {
542    ///         let nested = DataType::Struct(Nested::fields());
543    ///         Fields::from([
544    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
545    ///             Arc::new(Field::new_list(
546    ///                 "list",
547    ///                 Field::new("element", DataType::Float64, false),
548    ///                 false,
549    ///             )),
550    ///             Arc::new(Field::new_list(
551    ///                 "nested",
552    ///                 Field::new("element", nested, true),
553    ///                 true,
554    ///             )),
555    ///         ])
556    ///     }
557    /// }
558    ///
559    /// #[derive(Serialize)]
560    /// struct Nested {
561    ///     map: BTreeMap<String, Vec<String>>
562    /// }
563    ///
564    /// impl Nested {
565    ///     /// Returns the [`Fields`] for [`Nested`]
566    ///     fn fields() -> Fields {
567    ///         let element = Field::new("element", DataType::Utf8, false);
568    ///         Fields::from([
569    ///             Arc::new(Field::new_map(
570    ///                 "map",
571    ///                 "entries",
572    ///                 Field::new("key", DataType::Utf8, false),
573    ///                 Field::new_list("value", element, false),
574    ///                 false, // sorted
575    ///                 false, // nullable
576    ///             ))
577    ///         ])
578    ///     }
579    /// }
580    ///
581    /// let data = vec![
582    ///     MyStruct {
583    ///         int32: 34,
584    ///         list: vec![1., 2., 34.],
585    ///         nested: vec![
586    ///             None,
587    ///             Some(Nested {
588    ///                 map: vec![
589    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
590    ///                     ("key2".to_string(), vec!["baz".to_string()])
591    ///                 ].into_iter().collect()
592    ///             })
593    ///         ]
594    ///     },
595    ///     MyStruct {
596    ///         int32: 56,
597    ///         list: vec![],
598    ///         nested: vec![]
599    ///     },
600    ///     MyStruct {
601    ///         int32: 24,
602    ///         list: vec![-1., 245.],
603    ///         nested: vec![None]
604    ///     }
605    /// ];
606    ///
607    /// let schema = Schema::new(MyStruct::fields());
608    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
609    /// decoder.serialize(&data).unwrap();
610    /// let batch = decoder.flush().unwrap().unwrap();
611    /// assert_eq!(batch.num_rows(), 3);
612    /// assert_eq!(batch.num_columns(), 3);
613    ///
614    /// // Convert to StructArray to format
615    /// let s = StructArray::from(batch);
616    /// let options = FormatOptions::default().with_null("null");
617    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
618    ///
619    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
620    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
621    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
622    /// ```
623    ///
624    /// Note: this ignores any batch size setting, and always decodes all rows
625    ///
626    /// [serde]: https://docs.rs/serde/latest/serde/
627    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
628        self.tape_decoder.serialize(rows)
629    }
630
631    /// True if the decoder is currently part way through decoding a record.
632    pub fn has_partial_record(&self) -> bool {
633        self.tape_decoder.has_partial_row()
634    }
635
636    /// The number of unflushed records, including the partially decoded record (if any).
637    pub fn len(&self) -> usize {
638        self.tape_decoder.num_buffered_rows()
639    }
640
641    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
642    pub fn is_empty(&self) -> bool {
643        self.len() == 0
644    }
645
646    /// Flushes the currently buffered data to a [`RecordBatch`]
647    ///
648    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
649    ///
650    /// Note: This will return an error if called part way through decoding a record,
651    /// i.e. [`Self::has_partial_record`] is true.
652    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
653        let tape = self.tape_decoder.finish()?;
654
655        if tape.num_rows() == 0 {
656            return Ok(None);
657        }
658
659        // First offset is null sentinel
660        let mut next_object = 1;
661        let pos: Vec<_> = (0..tape.num_rows())
662            .map(|_| {
663                let next = tape.next(next_object, "row").unwrap();
664                std::mem::replace(&mut next_object, next)
665            })
666            .collect();
667
668        let decoded = self.decoder.decode(&tape, &pos)?;
669        self.tape_decoder.clear();
670
671        let batch = match self.is_field {
672            true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
673            false => {
674                RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
675            }
676        };
677
678        Ok(Some(batch))
679    }
680}
681
682trait ArrayDecoder: Send {
683    /// Decode elements from `tape` starting at the indexes contained in `pos`
684    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
685}
686
687/// Context for decoder creation, containing configuration.
688///
689/// This context is passed through the decoder creation process and contains
690/// all the configuration needed to create decoders recursively.
691pub struct DecoderContext {
692    /// Whether to coerce primitives to strings
693    coerce_primitive: bool,
694    /// Whether to validate struct fields strictly
695    strict_mode: bool,
696    /// How to decode struct fields
697    struct_mode: StructMode,
698}
699
700impl DecoderContext {
701    /// Returns whether to coerce primitive types (e.g., number to string)
702    pub fn coerce_primitive(&self) -> bool {
703        self.coerce_primitive
704    }
705
706    /// Returns whether to validate struct fields strictly
707    pub fn strict_mode(&self) -> bool {
708        self.strict_mode
709    }
710
711    /// Returns how to decode struct fields
712    pub fn struct_mode(&self) -> StructMode {
713        self.struct_mode
714    }
715
716    /// Create a decoder for a type.
717    ///
718    /// This is the standard way to create child decoders from within a decoder
719    /// implementation.
720    fn make_decoder(
721        &self,
722        data_type: &DataType,
723        is_nullable: bool,
724    ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
725        make_decoder(self, data_type, is_nullable)
726    }
727}
728
729macro_rules! primitive_decoder {
730    ($t:ty, $data_type:expr) => {
731        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
732    };
733}
734
735fn make_decoder(
736    ctx: &DecoderContext,
737    data_type: &DataType,
738    is_nullable: bool,
739) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
740    let coerce_primitive = ctx.coerce_primitive();
741    downcast_integer! {
742        *data_type => (primitive_decoder, data_type),
743        DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
744        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
745        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
746        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
747        DataType::Timestamp(TimeUnit::Second, None) => {
748            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
749        },
750        DataType::Timestamp(TimeUnit::Millisecond, None) => {
751            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
752        },
753        DataType::Timestamp(TimeUnit::Microsecond, None) => {
754            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
755        },
756        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
757            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
758        },
759        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
760            let tz: Tz = tz.parse()?;
761            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
762        },
763        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
764            let tz: Tz = tz.parse()?;
765            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
766        },
767        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
768            let tz: Tz = tz.parse()?;
769            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
770        },
771        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
772            let tz: Tz = tz.parse()?;
773            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
774        },
775        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
776        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
777        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
778        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
779        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
780        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
781        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
782        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
783        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
784        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
785        DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
786        DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
787        DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
788        DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
789        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
790        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
791        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
792        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
793        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
794        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
795        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
796        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
797        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
798        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
799        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
800        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
801        DataType::RunEndEncoded(ref r, _) => match r.data_type() {
802            DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
803            DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
804            DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
805            d => unreachable!("unsupported run end index type: {d}"),
806        },
807        _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use serde_json::json;
814    use std::fs::File;
815    use std::io::{BufReader, Cursor, Seek};
816
817    use arrow_array::cast::AsArray;
818    use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
819    use arrow_buffer::{ArrowNativeType, Buffer};
820    use arrow_cast::display::{ArrayFormatter, FormatOptions};
821    use arrow_data::ArrayDataBuilder;
822    use arrow_schema::{Field, Fields};
823
824    use super::*;
825
826    fn do_read(
827        buf: &str,
828        batch_size: usize,
829        coerce_primitive: bool,
830        strict_mode: bool,
831        schema: SchemaRef,
832    ) -> Vec<RecordBatch> {
833        let mut unbuffered = vec![];
834
835        // Test with different batch sizes to test for boundary conditions
836        for batch_size in [1, 3, 100, batch_size] {
837            unbuffered = ReaderBuilder::new(schema.clone())
838                .with_batch_size(batch_size)
839                .with_coerce_primitive(coerce_primitive)
840                .build(Cursor::new(buf.as_bytes()))
841                .unwrap()
842                .collect::<Result<Vec<_>, _>>()
843                .unwrap();
844
845            for b in unbuffered.iter().take(unbuffered.len() - 1) {
846                assert_eq!(b.num_rows(), batch_size)
847            }
848
849            // Test with different buffer sizes to test for boundary conditions
850            for b in [1, 3, 5] {
851                let buffered = ReaderBuilder::new(schema.clone())
852                    .with_batch_size(batch_size)
853                    .with_coerce_primitive(coerce_primitive)
854                    .with_strict_mode(strict_mode)
855                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
856                    .unwrap()
857                    .collect::<Result<Vec<_>, _>>()
858                    .unwrap();
859                assert_eq!(unbuffered, buffered);
860            }
861        }
862
863        unbuffered
864    }
865
866    #[test]
867    fn test_basic() {
868        let buf = r#"
869        {"a": 1, "b": 2, "c": true, "d": 1}
870        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
871
872        {"b": 6, "a": 2.0, "d": 45}
873        {"b": "5", "a": 2}
874        {"b": 4e0}
875        {"b": 7, "a": null}
876        "#;
877
878        let schema = Arc::new(Schema::new(vec![
879            Field::new("a", DataType::Int64, true),
880            Field::new("b", DataType::Int32, true),
881            Field::new("c", DataType::Boolean, true),
882            Field::new("d", DataType::Date32, true),
883            Field::new("e", DataType::Date64, true),
884        ]));
885
886        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
887        assert!(decoder.is_empty());
888        assert_eq!(decoder.len(), 0);
889        assert!(!decoder.has_partial_record());
890        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
891        assert!(!decoder.is_empty());
892        assert_eq!(decoder.len(), 6);
893        assert!(!decoder.has_partial_record());
894        let batch = decoder.flush().unwrap().unwrap();
895        assert_eq!(batch.num_rows(), 6);
896        assert!(decoder.is_empty());
897        assert_eq!(decoder.len(), 0);
898        assert!(!decoder.has_partial_record());
899
900        let batches = do_read(buf, 1024, false, false, schema);
901        assert_eq!(batches.len(), 1);
902
903        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
904        assert_eq!(col1.null_count(), 2);
905        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
906        assert!(col1.is_null(4));
907        assert!(col1.is_null(5));
908
909        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
910        assert_eq!(col2.null_count(), 0);
911        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
912
913        let col3 = batches[0].column(2).as_boolean();
914        assert_eq!(col3.null_count(), 4);
915        assert!(col3.value(0));
916        assert!(!col3.is_null(0));
917        assert!(!col3.value(1));
918        assert!(!col3.is_null(1));
919
920        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
921        assert_eq!(col4.null_count(), 3);
922        assert!(col4.is_null(3));
923        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
924
925        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
926        assert_eq!(col5.null_count(), 5);
927        assert!(col5.is_null(0));
928        assert!(col5.is_null(2));
929        assert!(col5.is_null(3));
930        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
931    }
932
933    #[test]
934    fn test_string() {
935        let buf = r#"
936        {"a": "1", "b": "2"}
937        {"a": "hello", "b": "shoo"}
938        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
939
940        {"b": null}
941        {"b": "", "a": null}
942
943        "#;
944        let schema = Arc::new(Schema::new(vec![
945            Field::new("a", DataType::Utf8, true),
946            Field::new("b", DataType::LargeUtf8, true),
947        ]));
948
949        let batches = do_read(buf, 1024, false, false, schema);
950        assert_eq!(batches.len(), 1);
951
952        let col1 = batches[0].column(0).as_string::<i32>();
953        assert_eq!(col1.null_count(), 2);
954        assert_eq!(col1.value(0), "1");
955        assert_eq!(col1.value(1), "hello");
956        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
957        assert!(col1.is_null(3));
958        assert!(col1.is_null(4));
959
960        let col2 = batches[0].column(1).as_string::<i64>();
961        assert_eq!(col2.null_count(), 1);
962        assert_eq!(col2.value(0), "2");
963        assert_eq!(col2.value(1), "shoo");
964        assert_eq!(col2.value(2), "\t😁foo");
965        assert!(col2.is_null(3));
966        assert_eq!(col2.value(4), "");
967    }
968
969    #[test]
970    fn test_long_string_view_allocation() {
971        // The JSON input contains field "a" with different string lengths.
972        // According to the implementation in the decoder:
973        // - For a string, capacity is only increased if its length > 12 bytes.
974        // Therefore, for:
975        // Row 1: "short" (5 bytes) -> capacity += 0
976        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
977        // Row 3: "hello" (5 bytes) -> capacity += 0
978        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
979        // Expected total capacity = 24 + 17 = 41
980        let expected_capacity: usize = 41;
981
982        let buf = r#"
983        {"a": "short", "b": "dummy"}
984        {"a": "this is definitely long", "b": "dummy"}
985        {"a": "hello", "b": "dummy"}
986        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
987        "#;
988
989        let schema = Arc::new(Schema::new(vec![
990            Field::new("a", DataType::Utf8View, true),
991            Field::new("b", DataType::LargeUtf8, true),
992        ]));
993
994        let batches = do_read(buf, 1024, false, false, schema);
995        assert_eq!(batches.len(), 1, "Expected one record batch");
996
997        // Get the first column ("a") as a StringViewArray.
998        let col_a = batches[0].column(0);
999        let string_view_array = col_a
1000            .as_any()
1001            .downcast_ref::<StringViewArray>()
1002            .expect("Column should be a StringViewArray");
1003
1004        // Retrieve the underlying data buffer from the array.
1005        // The builder pre-allocates capacity based on the sum of lengths for long strings.
1006        let data_buffer = string_view_array.to_data().buffers()[0].len();
1007
1008        // Check that the allocated capacity is at least what we expected.
1009        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
1010        assert!(
1011            data_buffer >= expected_capacity,
1012            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1013        );
1014
1015        // Additionally, verify that the decoded values are correct.
1016        assert_eq!(string_view_array.value(0), "short");
1017        assert_eq!(string_view_array.value(1), "this is definitely long");
1018        assert_eq!(string_view_array.value(2), "hello");
1019        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1020    }
1021
1022    /// Test the memory capacity allocation logic when converting numeric types to strings.
1023    #[test]
1024    fn test_numeric_view_allocation() {
1025        // For numeric types, the expected capacity calculation is as follows:
1026        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
1027        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
1028        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
1029        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
1030        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
1031        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
1032        let expected_capacity: usize = 33;
1033
1034        let buf = r#"
1035    {"n": 123456789}
1036    {"n": 1000000000000}
1037    {"n": 3.1415}
1038    {"n": 2.718281828459045}
1039    "#;
1040
1041        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1042
1043        let batches = do_read(buf, 1024, true, false, schema);
1044        assert_eq!(batches.len(), 1, "Expected one record batch");
1045
1046        let col_n = batches[0].column(0);
1047        let string_view_array = col_n
1048            .as_any()
1049            .downcast_ref::<StringViewArray>()
1050            .expect("Column should be a StringViewArray");
1051
1052        // Check that the underlying data buffer capacity is at least the expected value.
1053        let data_buffer = string_view_array.to_data().buffers()[0].len();
1054        assert!(
1055            data_buffer >= expected_capacity,
1056            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1057        );
1058
1059        // Verify that the converted string values are correct.
1060        // Note: The format of the number converted to a string should match the actual implementation.
1061        assert_eq!(string_view_array.value(0), "123456789");
1062        assert_eq!(string_view_array.value(1), "1000000000000");
1063        assert_eq!(string_view_array.value(2), "3.1415");
1064        assert_eq!(string_view_array.value(3), "2.718281828459045");
1065    }
1066
1067    #[test]
1068    fn test_string_with_uft8view() {
1069        let buf = r#"
1070        {"a": "1", "b": "2"}
1071        {"a": "hello", "b": "shoo"}
1072        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1073
1074        {"b": null}
1075        {"b": "", "a": null}
1076
1077        "#;
1078        let schema = Arc::new(Schema::new(vec![
1079            Field::new("a", DataType::Utf8View, true),
1080            Field::new("b", DataType::LargeUtf8, true),
1081        ]));
1082
1083        let batches = do_read(buf, 1024, false, false, schema);
1084        assert_eq!(batches.len(), 1);
1085
1086        let col1 = batches[0].column(0).as_string_view();
1087        assert_eq!(col1.null_count(), 2);
1088        assert_eq!(col1.value(0), "1");
1089        assert_eq!(col1.value(1), "hello");
1090        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1091        assert!(col1.is_null(3));
1092        assert!(col1.is_null(4));
1093        assert_eq!(col1.data_type(), &DataType::Utf8View);
1094
1095        let col2 = batches[0].column(1).as_string::<i64>();
1096        assert_eq!(col2.null_count(), 1);
1097        assert_eq!(col2.value(0), "2");
1098        assert_eq!(col2.value(1), "shoo");
1099        assert_eq!(col2.value(2), "\t😁foo");
1100        assert!(col2.is_null(3));
1101        assert_eq!(col2.value(4), "");
1102    }
1103
1104    #[test]
1105    fn test_complex() {
1106        let buf = r#"
1107           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1108           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1109           {"list": null, "nested": {"a": null}}
1110        "#;
1111
1112        let schema = Arc::new(Schema::new(vec![
1113            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1114            Field::new_struct(
1115                "nested",
1116                vec![
1117                    Field::new("a", DataType::Int32, true),
1118                    Field::new("b", DataType::Int32, true),
1119                ],
1120                true,
1121            ),
1122            Field::new_struct(
1123                "nested_list",
1124                vec![Field::new_list(
1125                    "list2",
1126                    Field::new_struct(
1127                        "element",
1128                        vec![Field::new("c", DataType::Int32, false)],
1129                        false,
1130                    ),
1131                    true,
1132                )],
1133                true,
1134            ),
1135        ]));
1136
1137        let batches = do_read(buf, 1024, false, false, schema);
1138        assert_eq!(batches.len(), 1);
1139
1140        let list = batches[0].column(0).as_list::<i32>();
1141        assert_eq!(list.len(), 3);
1142        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1143        assert_eq!(list.null_count(), 1);
1144        assert!(list.is_null(2));
1145        let list_values = list.values().as_primitive::<Int32Type>();
1146        assert_eq!(list_values.values(), &[5, 6]);
1147
1148        let nested = batches[0].column(1).as_struct();
1149        let a = nested.column(0).as_primitive::<Int32Type>();
1150        assert_eq!(list.null_count(), 1);
1151        assert_eq!(a.values(), &[1, 7, 0]);
1152        assert!(list.is_null(2));
1153
1154        let b = nested.column(1).as_primitive::<Int32Type>();
1155        assert_eq!(b.null_count(), 2);
1156        assert_eq!(b.len(), 3);
1157        assert_eq!(b.value(0), 2);
1158        assert!(b.is_null(1));
1159        assert!(b.is_null(2));
1160
1161        let nested_list = batches[0].column(2).as_struct();
1162        assert_eq!(nested_list.len(), 3);
1163        assert_eq!(nested_list.null_count(), 1);
1164        assert!(nested_list.is_null(2));
1165
1166        let list2 = nested_list.column(0).as_list::<i32>();
1167        assert_eq!(list2.len(), 3);
1168        assert_eq!(list2.null_count(), 1);
1169        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1170        assert!(list2.is_null(2));
1171
1172        let list2_values = list2.values().as_struct();
1173
1174        let c = list2_values.column(0).as_primitive::<Int32Type>();
1175        assert_eq!(c.values(), &[3, 4]);
1176    }
1177
1178    #[test]
1179    fn test_projection() {
1180        let buf = r#"
1181           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1182           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1183        "#;
1184
1185        let schema = Arc::new(Schema::new(vec![
1186            Field::new_struct(
1187                "nested",
1188                vec![Field::new("a", DataType::Int32, false)],
1189                true,
1190            ),
1191            Field::new_struct(
1192                "nested_list",
1193                vec![Field::new_list(
1194                    "list2",
1195                    Field::new_struct(
1196                        "element",
1197                        vec![Field::new("d", DataType::Int32, true)],
1198                        false,
1199                    ),
1200                    true,
1201                )],
1202                true,
1203            ),
1204        ]));
1205
1206        let batches = do_read(buf, 1024, false, false, schema);
1207        assert_eq!(batches.len(), 1);
1208
1209        let nested = batches[0].column(0).as_struct();
1210        assert_eq!(nested.num_columns(), 1);
1211        let a = nested.column(0).as_primitive::<Int32Type>();
1212        assert_eq!(a.null_count(), 0);
1213        assert_eq!(a.values(), &[1, 7]);
1214
1215        let nested_list = batches[0].column(1).as_struct();
1216        assert_eq!(nested_list.num_columns(), 1);
1217        assert_eq!(nested_list.null_count(), 0);
1218
1219        let list2 = nested_list.column(0).as_list::<i32>();
1220        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1221        assert_eq!(list2.null_count(), 0);
1222
1223        let child = list2.values().as_struct();
1224        assert_eq!(child.num_columns(), 1);
1225        assert_eq!(child.len(), 2);
1226        assert_eq!(child.null_count(), 0);
1227
1228        let c = child.column(0).as_primitive::<Int32Type>();
1229        assert_eq!(c.values(), &[5, 0]);
1230        assert_eq!(c.null_count(), 1);
1231        assert!(c.is_null(1));
1232    }
1233
1234    #[test]
1235    fn test_map() {
1236        let buf = r#"
1237           {"map": {"a": ["foo", null]}}
1238           {"map": {"a": [null], "b": []}}
1239           {"map": {"c": null, "a": ["baz"]}}
1240        "#;
1241        let map = Field::new_map(
1242            "map",
1243            "entries",
1244            Field::new("key", DataType::Utf8, false),
1245            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1246            false,
1247            true,
1248        );
1249
1250        let schema = Arc::new(Schema::new(vec![map]));
1251
1252        let batches = do_read(buf, 1024, false, false, schema);
1253        assert_eq!(batches.len(), 1);
1254
1255        let map = batches[0].column(0).as_map();
1256        let map_keys = map.keys().as_string::<i32>();
1257        let map_values = map.values().as_list::<i32>();
1258        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1259
1260        let k: Vec<_> = map_keys.iter().flatten().collect();
1261        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1262
1263        let list_values = map_values.values().as_string::<i32>();
1264        let lv: Vec<_> = list_values.iter().collect();
1265        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1266        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1267        assert_eq!(map_values.null_count(), 1);
1268        assert!(map_values.is_null(3));
1269
1270        let options = FormatOptions::default().with_null("null");
1271        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1272        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1273        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1274        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1275    }
1276
1277    #[test]
1278    fn test_not_coercing_primitive_into_string_without_flag() {
1279        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1280
1281        let buf = r#"{"a": 1}"#;
1282        let err = ReaderBuilder::new(schema.clone())
1283            .with_batch_size(1024)
1284            .build(Cursor::new(buf.as_bytes()))
1285            .unwrap()
1286            .read()
1287            .unwrap_err();
1288
1289        assert_eq!(
1290            err.to_string(),
1291            "Json error: whilst decoding field 'a': expected string got 1"
1292        );
1293
1294        let buf = r#"{"a": true}"#;
1295        let err = ReaderBuilder::new(schema)
1296            .with_batch_size(1024)
1297            .build(Cursor::new(buf.as_bytes()))
1298            .unwrap()
1299            .read()
1300            .unwrap_err();
1301
1302        assert_eq!(
1303            err.to_string(),
1304            "Json error: whilst decoding field 'a': expected string got true"
1305        );
1306    }
1307
1308    #[test]
1309    fn test_coercing_primitive_into_string() {
1310        let buf = r#"
1311        {"a": 1, "b": 2, "c": true}
1312        {"a": 2E0, "b": 4, "c": false}
1313
1314        {"b": 6, "a": 2.0}
1315        {"b": "5", "a": 2}
1316        {"b": 4e0}
1317        {"b": 7, "a": null}
1318        "#;
1319
1320        let schema = Arc::new(Schema::new(vec![
1321            Field::new("a", DataType::Utf8, true),
1322            Field::new("b", DataType::Utf8, true),
1323            Field::new("c", DataType::Utf8, true),
1324        ]));
1325
1326        let batches = do_read(buf, 1024, true, false, schema);
1327        assert_eq!(batches.len(), 1);
1328
1329        let col1 = batches[0].column(0).as_string::<i32>();
1330        assert_eq!(col1.null_count(), 2);
1331        assert_eq!(col1.value(0), "1");
1332        assert_eq!(col1.value(1), "2E0");
1333        assert_eq!(col1.value(2), "2.0");
1334        assert_eq!(col1.value(3), "2");
1335        assert!(col1.is_null(4));
1336        assert!(col1.is_null(5));
1337
1338        let col2 = batches[0].column(1).as_string::<i32>();
1339        assert_eq!(col2.null_count(), 0);
1340        assert_eq!(col2.value(0), "2");
1341        assert_eq!(col2.value(1), "4");
1342        assert_eq!(col2.value(2), "6");
1343        assert_eq!(col2.value(3), "5");
1344        assert_eq!(col2.value(4), "4e0");
1345        assert_eq!(col2.value(5), "7");
1346
1347        let col3 = batches[0].column(2).as_string::<i32>();
1348        assert_eq!(col3.null_count(), 4);
1349        assert_eq!(col3.value(0), "true");
1350        assert_eq!(col3.value(1), "false");
1351        assert!(col3.is_null(2));
1352        assert!(col3.is_null(3));
1353        assert!(col3.is_null(4));
1354        assert!(col3.is_null(5));
1355    }
1356
1357    fn test_decimal<T: DecimalType>(data_type: DataType) {
1358        let buf = r#"
1359        {"a": 1, "b": 2, "c": 38.30}
1360        {"a": 2, "b": 4, "c": 123.456}
1361
1362        {"b": 1337, "a": "2.0452"}
1363        {"b": "5", "a": "11034.2"}
1364        {"b": 40}
1365        {"b": 1234, "a": null}
1366        "#;
1367
1368        let schema = Arc::new(Schema::new(vec![
1369            Field::new("a", data_type.clone(), true),
1370            Field::new("b", data_type.clone(), true),
1371            Field::new("c", data_type, true),
1372        ]));
1373
1374        let batches = do_read(buf, 1024, true, false, schema);
1375        assert_eq!(batches.len(), 1);
1376
1377        let col1 = batches[0].column(0).as_primitive::<T>();
1378        assert_eq!(col1.null_count(), 2);
1379        assert!(col1.is_null(4));
1380        assert!(col1.is_null(5));
1381        assert_eq!(
1382            col1.values(),
1383            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1384        );
1385
1386        let col2 = batches[0].column(1).as_primitive::<T>();
1387        assert_eq!(col2.null_count(), 0);
1388        assert_eq!(
1389            col2.values(),
1390            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1391        );
1392
1393        let col3 = batches[0].column(2).as_primitive::<T>();
1394        assert_eq!(col3.null_count(), 4);
1395        assert!(!col3.is_null(0));
1396        assert!(!col3.is_null(1));
1397        assert!(col3.is_null(2));
1398        assert!(col3.is_null(3));
1399        assert!(col3.is_null(4));
1400        assert!(col3.is_null(5));
1401        assert_eq!(
1402            col3.values(),
1403            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1404        );
1405    }
1406
1407    #[test]
1408    fn test_decimals() {
1409        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1410        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1411        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1412        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1413    }
1414
1415    fn test_timestamp<T: ArrowTimestampType>() {
1416        let buf = r#"
1417        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1418        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1419
1420        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1421        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1422        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1423        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1424        "#;
1425
1426        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1427        let schema = Arc::new(Schema::new(vec![
1428            Field::new("a", T::DATA_TYPE, true),
1429            Field::new("b", T::DATA_TYPE, true),
1430            Field::new("c", T::DATA_TYPE, true),
1431            Field::new("d", with_timezone, true),
1432        ]));
1433
1434        let batches = do_read(buf, 1024, true, false, schema);
1435        assert_eq!(batches.len(), 1);
1436
1437        let unit_in_nanos: i64 = match T::UNIT {
1438            TimeUnit::Second => 1_000_000_000,
1439            TimeUnit::Millisecond => 1_000_000,
1440            TimeUnit::Microsecond => 1_000,
1441            TimeUnit::Nanosecond => 1,
1442        };
1443
1444        let col1 = batches[0].column(0).as_primitive::<T>();
1445        assert_eq!(col1.null_count(), 4);
1446        assert!(col1.is_null(2));
1447        assert!(col1.is_null(3));
1448        assert!(col1.is_null(4));
1449        assert!(col1.is_null(5));
1450        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1451
1452        let col2 = batches[0].column(1).as_primitive::<T>();
1453        assert_eq!(col2.null_count(), 1);
1454        assert!(col2.is_null(5));
1455        assert_eq!(
1456            col2.values(),
1457            &[
1458                1599572549190855000 / unit_in_nanos,
1459                1599572549190855000 / unit_in_nanos,
1460                1599572549000000000 / unit_in_nanos,
1461                40,
1462                1234,
1463                0
1464            ]
1465        );
1466
1467        let col3 = batches[0].column(2).as_primitive::<T>();
1468        assert_eq!(col3.null_count(), 0);
1469        assert_eq!(
1470            col3.values(),
1471            &[
1472                38,
1473                123,
1474                854702816123000000 / unit_in_nanos,
1475                1599572549190855000 / unit_in_nanos,
1476                854702816123000000 / unit_in_nanos,
1477                854738816123000000 / unit_in_nanos
1478            ]
1479        );
1480
1481        let col4 = batches[0].column(3).as_primitive::<T>();
1482
1483        assert_eq!(col4.null_count(), 0);
1484        assert_eq!(
1485            col4.values(),
1486            &[
1487                854674016123000000 / unit_in_nanos,
1488                123,
1489                854702816123000000 / unit_in_nanos,
1490                854720816123000000 / unit_in_nanos,
1491                854674016000000000 / unit_in_nanos,
1492                854640000000000000 / unit_in_nanos
1493            ]
1494        );
1495    }
1496
1497    #[test]
1498    fn test_timestamps() {
1499        test_timestamp::<TimestampSecondType>();
1500        test_timestamp::<TimestampMillisecondType>();
1501        test_timestamp::<TimestampMicrosecondType>();
1502        test_timestamp::<TimestampNanosecondType>();
1503    }
1504
1505    fn test_time<T: ArrowTemporalType>() {
1506        let buf = r#"
1507        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1508        {"a": 2, "b": "23:59:59", "c": 123.456}
1509
1510        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1511        {"b": 40, "c": "13:42:29.190855"}
1512        {"b": 1234, "a": null, "c": "09:26:56.123"}
1513        {"c": "14:26:56.123"}
1514        "#;
1515
1516        let unit = match T::DATA_TYPE {
1517            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1518            _ => unreachable!(),
1519        };
1520
1521        let unit_in_nanos = match unit {
1522            TimeUnit::Second => 1_000_000_000,
1523            TimeUnit::Millisecond => 1_000_000,
1524            TimeUnit::Microsecond => 1_000,
1525            TimeUnit::Nanosecond => 1,
1526        };
1527
1528        let schema = Arc::new(Schema::new(vec![
1529            Field::new("a", T::DATA_TYPE, true),
1530            Field::new("b", T::DATA_TYPE, true),
1531            Field::new("c", T::DATA_TYPE, true),
1532        ]));
1533
1534        let batches = do_read(buf, 1024, true, false, schema);
1535        assert_eq!(batches.len(), 1);
1536
1537        let col1 = batches[0].column(0).as_primitive::<T>();
1538        assert_eq!(col1.null_count(), 4);
1539        assert!(col1.is_null(2));
1540        assert!(col1.is_null(3));
1541        assert!(col1.is_null(4));
1542        assert!(col1.is_null(5));
1543        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1544
1545        let col2 = batches[0].column(1).as_primitive::<T>();
1546        assert_eq!(col2.null_count(), 1);
1547        assert!(col2.is_null(5));
1548        assert_eq!(
1549            col2.values(),
1550            &[
1551                34016123000000 / unit_in_nanos,
1552                86399000000000 / unit_in_nanos,
1553                64800000000000 / unit_in_nanos,
1554                40,
1555                1234,
1556                0
1557            ]
1558            .map(T::Native::usize_as)
1559        );
1560
1561        let col3 = batches[0].column(2).as_primitive::<T>();
1562        assert_eq!(col3.null_count(), 0);
1563        assert_eq!(
1564            col3.values(),
1565            &[
1566                38,
1567                123,
1568                34016123000000 / unit_in_nanos,
1569                49349190855000 / unit_in_nanos,
1570                34016123000000 / unit_in_nanos,
1571                52016123000000 / unit_in_nanos
1572            ]
1573            .map(T::Native::usize_as)
1574        );
1575    }
1576
1577    #[test]
1578    fn test_times() {
1579        test_time::<Time32MillisecondType>();
1580        test_time::<Time32SecondType>();
1581        test_time::<Time64MicrosecondType>();
1582        test_time::<Time64NanosecondType>();
1583    }
1584
1585    fn test_duration<T: ArrowTemporalType>() {
1586        let buf = r#"
1587        {"a": 1, "b": "2"}
1588        {"a": 3, "b": null}
1589        "#;
1590
1591        let schema = Arc::new(Schema::new(vec![
1592            Field::new("a", T::DATA_TYPE, true),
1593            Field::new("b", T::DATA_TYPE, true),
1594        ]));
1595
1596        let batches = do_read(buf, 1024, true, false, schema);
1597        assert_eq!(batches.len(), 1);
1598
1599        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1600        assert_eq!(col_a.null_count(), 0);
1601        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1602
1603        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1604        assert_eq!(col2.null_count(), 1);
1605        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1606    }
1607
1608    #[test]
1609    fn test_durations() {
1610        test_duration::<DurationNanosecondType>();
1611        test_duration::<DurationMicrosecondType>();
1612        test_duration::<DurationMillisecondType>();
1613        test_duration::<DurationSecondType>();
1614    }
1615
1616    #[test]
1617    fn test_delta_checkpoint() {
1618        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1619        let schema = Arc::new(Schema::new(vec![
1620            Field::new_struct(
1621                "protocol",
1622                vec![
1623                    Field::new("minReaderVersion", DataType::Int32, true),
1624                    Field::new("minWriterVersion", DataType::Int32, true),
1625                ],
1626                true,
1627            ),
1628            Field::new_struct(
1629                "add",
1630                vec![Field::new_map(
1631                    "partitionValues",
1632                    "key_value",
1633                    Field::new("key", DataType::Utf8, false),
1634                    Field::new("value", DataType::Utf8, true),
1635                    false,
1636                    false,
1637                )],
1638                true,
1639            ),
1640        ]));
1641
1642        let batches = do_read(json, 1024, true, false, schema);
1643        assert_eq!(batches.len(), 1);
1644
1645        let s: StructArray = batches.into_iter().next().unwrap().into();
1646        let opts = FormatOptions::default().with_null("null");
1647        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1648        assert_eq!(
1649            formatter.value(0).to_string(),
1650            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1651        );
1652    }
1653
1654    #[test]
1655    fn struct_nullability() {
1656        let do_test = |child: DataType| {
1657            // Test correctly enforced nullability
1658            let non_null = r#"{"foo": {}}"#;
1659            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1660                "foo",
1661                vec![Field::new("bar", child, false)],
1662                true,
1663            )]));
1664            let mut reader = ReaderBuilder::new(schema.clone())
1665                .build(Cursor::new(non_null.as_bytes()))
1666                .unwrap();
1667            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1668
1669            let null = r#"{"foo": {bar: null}}"#;
1670            let mut reader = ReaderBuilder::new(schema.clone())
1671                .build(Cursor::new(null.as_bytes()))
1672                .unwrap();
1673            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1674
1675            // Test nulls in nullable parent can mask nulls in non-nullable child
1676            let null = r#"{"foo": null}"#;
1677            let mut reader = ReaderBuilder::new(schema)
1678                .build(Cursor::new(null.as_bytes()))
1679                .unwrap();
1680            let batch = reader.next().unwrap().unwrap();
1681            assert_eq!(batch.num_columns(), 1);
1682            let foo = batch.column(0).as_struct();
1683            assert_eq!(foo.len(), 1);
1684            assert!(foo.is_null(0));
1685            assert_eq!(foo.num_columns(), 1);
1686
1687            let bar = foo.column(0);
1688            assert_eq!(bar.len(), 1);
1689            // Non-nullable child can still contain null as masked by parent
1690            assert!(bar.is_null(0));
1691        };
1692
1693        do_test(DataType::Boolean);
1694        do_test(DataType::Int32);
1695        do_test(DataType::Utf8);
1696        do_test(DataType::Decimal128(2, 1));
1697        do_test(DataType::Timestamp(
1698            TimeUnit::Microsecond,
1699            Some("+00:00".into()),
1700        ));
1701    }
1702
1703    #[test]
1704    fn test_truncation() {
1705        let buf = r#"
1706        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1707        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1708        {"i64": -9223372036854775808, "u64": 0 }
1709        {"i64": "-9223372036854775808", "u64": 0 }
1710        "#;
1711
1712        let schema = Arc::new(Schema::new(vec![
1713            Field::new("i64", DataType::Int64, true),
1714            Field::new("u64", DataType::UInt64, true),
1715        ]));
1716
1717        let batches = do_read(buf, 1024, true, false, schema);
1718        assert_eq!(batches.len(), 1);
1719
1720        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1721        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1722
1723        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1724        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1725    }
1726
1727    #[test]
1728    fn test_timestamp_truncation() {
1729        let buf = r#"
1730        {"time": 9223372036854775807 }
1731        {"time": -9223372036854775808 }
1732        {"time": 9e5 }
1733        "#;
1734
1735        let schema = Arc::new(Schema::new(vec![Field::new(
1736            "time",
1737            DataType::Timestamp(TimeUnit::Nanosecond, None),
1738            true,
1739        )]));
1740
1741        let batches = do_read(buf, 1024, true, false, schema);
1742        assert_eq!(batches.len(), 1);
1743
1744        let i64 = batches[0]
1745            .column(0)
1746            .as_primitive::<TimestampNanosecondType>();
1747        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1748    }
1749
1750    #[test]
1751    fn test_strict_mode_no_missing_columns_in_schema() {
1752        let buf = r#"
1753        {"a": 1, "b": "2", "c": true}
1754        {"a": 2E0, "b": "4", "c": false}
1755        "#;
1756
1757        let schema = Arc::new(Schema::new(vec![
1758            Field::new("a", DataType::Int16, false),
1759            Field::new("b", DataType::Utf8, false),
1760            Field::new("c", DataType::Boolean, false),
1761        ]));
1762
1763        let batches = do_read(buf, 1024, true, true, schema);
1764        assert_eq!(batches.len(), 1);
1765
1766        let buf = r#"
1767        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1768        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1769        "#;
1770
1771        let schema = Arc::new(Schema::new(vec![
1772            Field::new("a", DataType::Int16, false),
1773            Field::new("b", DataType::Utf8, false),
1774            Field::new_struct(
1775                "c",
1776                vec![
1777                    Field::new("a", DataType::Boolean, false),
1778                    Field::new("b", DataType::Int16, false),
1779                ],
1780                false,
1781            ),
1782        ]));
1783
1784        let batches = do_read(buf, 1024, true, true, schema);
1785        assert_eq!(batches.len(), 1);
1786    }
1787
1788    #[test]
1789    fn test_strict_mode_missing_columns_in_schema() {
1790        let buf = r#"
1791        {"a": 1, "b": "2", "c": true}
1792        {"a": 2E0, "b": "4", "c": false}
1793        "#;
1794
1795        let schema = Arc::new(Schema::new(vec![
1796            Field::new("a", DataType::Int16, true),
1797            Field::new("c", DataType::Boolean, true),
1798        ]));
1799
1800        let err = ReaderBuilder::new(schema)
1801            .with_batch_size(1024)
1802            .with_strict_mode(true)
1803            .build(Cursor::new(buf.as_bytes()))
1804            .unwrap()
1805            .read()
1806            .unwrap_err();
1807
1808        assert_eq!(
1809            err.to_string(),
1810            "Json error: column 'b' missing from schema"
1811        );
1812
1813        let buf = r#"
1814        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1815        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1816        "#;
1817
1818        let schema = Arc::new(Schema::new(vec![
1819            Field::new("a", DataType::Int16, false),
1820            Field::new("b", DataType::Utf8, false),
1821            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1822        ]));
1823
1824        let err = ReaderBuilder::new(schema)
1825            .with_batch_size(1024)
1826            .with_strict_mode(true)
1827            .build(Cursor::new(buf.as_bytes()))
1828            .unwrap()
1829            .read()
1830            .unwrap_err();
1831
1832        assert_eq!(
1833            err.to_string(),
1834            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1835        );
1836    }
1837
1838    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1839        let file = File::open(path).unwrap();
1840        let mut reader = BufReader::new(file);
1841        let schema = schema.unwrap_or_else(|| {
1842            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1843            reader.rewind().unwrap();
1844            schema
1845        });
1846        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1847        builder.build(reader).unwrap()
1848    }
1849
1850    #[test]
1851    fn test_json_basic() {
1852        let mut reader = read_file("test/data/basic.json", None);
1853        let batch = reader.next().unwrap().unwrap();
1854
1855        assert_eq!(8, batch.num_columns());
1856        assert_eq!(12, batch.num_rows());
1857
1858        let schema = reader.schema();
1859        let batch_schema = batch.schema();
1860        assert_eq!(schema, batch_schema);
1861
1862        let a = schema.column_with_name("a").unwrap();
1863        assert_eq!(0, a.0);
1864        assert_eq!(&DataType::Int64, a.1.data_type());
1865        let b = schema.column_with_name("b").unwrap();
1866        assert_eq!(1, b.0);
1867        assert_eq!(&DataType::Float64, b.1.data_type());
1868        let c = schema.column_with_name("c").unwrap();
1869        assert_eq!(2, c.0);
1870        assert_eq!(&DataType::Boolean, c.1.data_type());
1871        let d = schema.column_with_name("d").unwrap();
1872        assert_eq!(3, d.0);
1873        assert_eq!(&DataType::Utf8, d.1.data_type());
1874
1875        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1876        assert_eq!(1, aa.value(0));
1877        assert_eq!(-10, aa.value(1));
1878        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1879        assert_eq!(2.0, bb.value(0));
1880        assert_eq!(-3.5, bb.value(1));
1881        let cc = batch.column(c.0).as_boolean();
1882        assert!(!cc.value(0));
1883        assert!(cc.value(10));
1884        let dd = batch.column(d.0).as_string::<i32>();
1885        assert_eq!("4", dd.value(0));
1886        assert_eq!("text", dd.value(8));
1887    }
1888
1889    #[test]
1890    fn test_json_empty_projection() {
1891        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1892        let batch = reader.next().unwrap().unwrap();
1893
1894        assert_eq!(0, batch.num_columns());
1895        assert_eq!(12, batch.num_rows());
1896    }
1897
1898    #[test]
1899    fn test_json_basic_with_nulls() {
1900        let mut reader = read_file("test/data/basic_nulls.json", None);
1901        let batch = reader.next().unwrap().unwrap();
1902
1903        assert_eq!(4, batch.num_columns());
1904        assert_eq!(12, batch.num_rows());
1905
1906        let schema = reader.schema();
1907        let batch_schema = batch.schema();
1908        assert_eq!(schema, batch_schema);
1909
1910        let a = schema.column_with_name("a").unwrap();
1911        assert_eq!(&DataType::Int64, a.1.data_type());
1912        let b = schema.column_with_name("b").unwrap();
1913        assert_eq!(&DataType::Float64, b.1.data_type());
1914        let c = schema.column_with_name("c").unwrap();
1915        assert_eq!(&DataType::Boolean, c.1.data_type());
1916        let d = schema.column_with_name("d").unwrap();
1917        assert_eq!(&DataType::Utf8, d.1.data_type());
1918
1919        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1920        assert!(aa.is_valid(0));
1921        assert!(!aa.is_valid(1));
1922        assert!(!aa.is_valid(11));
1923        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1924        assert!(bb.is_valid(0));
1925        assert!(!bb.is_valid(2));
1926        assert!(!bb.is_valid(11));
1927        let cc = batch.column(c.0).as_boolean();
1928        assert!(cc.is_valid(0));
1929        assert!(!cc.is_valid(4));
1930        assert!(!cc.is_valid(11));
1931        let dd = batch.column(d.0).as_string::<i32>();
1932        assert!(!dd.is_valid(0));
1933        assert!(dd.is_valid(1));
1934        assert!(!dd.is_valid(4));
1935        assert!(!dd.is_valid(11));
1936    }
1937
1938    #[test]
1939    fn test_json_basic_schema() {
1940        let schema = Schema::new(vec![
1941            Field::new("a", DataType::Int64, true),
1942            Field::new("b", DataType::Float32, false),
1943            Field::new("c", DataType::Boolean, false),
1944            Field::new("d", DataType::Utf8, false),
1945        ]);
1946
1947        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1948        let reader_schema = reader.schema();
1949        assert_eq!(reader_schema.as_ref(), &schema);
1950        let batch = reader.next().unwrap().unwrap();
1951
1952        assert_eq!(4, batch.num_columns());
1953        assert_eq!(12, batch.num_rows());
1954
1955        let schema = batch.schema();
1956
1957        let a = schema.column_with_name("a").unwrap();
1958        assert_eq!(&DataType::Int64, a.1.data_type());
1959        let b = schema.column_with_name("b").unwrap();
1960        assert_eq!(&DataType::Float32, b.1.data_type());
1961        let c = schema.column_with_name("c").unwrap();
1962        assert_eq!(&DataType::Boolean, c.1.data_type());
1963        let d = schema.column_with_name("d").unwrap();
1964        assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967        assert_eq!(1, aa.value(0));
1968        assert_eq!(100000000000000, aa.value(11));
1969        let bb = batch.column(b.0).as_primitive::<Float32Type>();
1970        assert_eq!(2.0, bb.value(0));
1971        assert_eq!(-3.5, bb.value(1));
1972    }
1973
1974    #[test]
1975    fn test_json_basic_schema_projection() {
1976        let schema = Schema::new(vec![
1977            Field::new("a", DataType::Int64, true),
1978            Field::new("c", DataType::Boolean, false),
1979        ]);
1980
1981        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1982        let batch = reader.next().unwrap().unwrap();
1983
1984        assert_eq!(2, batch.num_columns());
1985        assert_eq!(2, batch.schema().fields().len());
1986        assert_eq!(12, batch.num_rows());
1987
1988        assert_eq!(batch.schema().as_ref(), &schema);
1989
1990        let a = schema.column_with_name("a").unwrap();
1991        assert_eq!(0, a.0);
1992        assert_eq!(&DataType::Int64, a.1.data_type());
1993        let c = schema.column_with_name("c").unwrap();
1994        assert_eq!(1, c.0);
1995        assert_eq!(&DataType::Boolean, c.1.data_type());
1996    }
1997
1998    #[test]
1999    fn test_json_arrays() {
2000        let mut reader = read_file("test/data/arrays.json", None);
2001        let batch = reader.next().unwrap().unwrap();
2002
2003        assert_eq!(4, batch.num_columns());
2004        assert_eq!(3, batch.num_rows());
2005
2006        let schema = batch.schema();
2007
2008        let a = schema.column_with_name("a").unwrap();
2009        assert_eq!(&DataType::Int64, a.1.data_type());
2010        let b = schema.column_with_name("b").unwrap();
2011        assert_eq!(
2012            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2013            b.1.data_type()
2014        );
2015        let c = schema.column_with_name("c").unwrap();
2016        assert_eq!(
2017            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2018            c.1.data_type()
2019        );
2020        let d = schema.column_with_name("d").unwrap();
2021        assert_eq!(&DataType::Utf8, d.1.data_type());
2022
2023        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2024        assert_eq!(1, aa.value(0));
2025        assert_eq!(-10, aa.value(1));
2026        assert_eq!(1627668684594000000, aa.value(2));
2027        let bb = batch.column(b.0).as_list::<i32>();
2028        let bb = bb.values().as_primitive::<Float64Type>();
2029        assert_eq!(9, bb.len());
2030        assert_eq!(2.0, bb.value(0));
2031        assert_eq!(-6.1, bb.value(5));
2032        assert!(!bb.is_valid(7));
2033
2034        let cc = batch
2035            .column(c.0)
2036            .as_any()
2037            .downcast_ref::<ListArray>()
2038            .unwrap();
2039        let cc = cc.values().as_boolean();
2040        assert_eq!(6, cc.len());
2041        assert!(!cc.value(0));
2042        assert!(!cc.value(4));
2043        assert!(!cc.is_valid(5));
2044    }
2045
2046    #[test]
2047    fn test_empty_json_arrays() {
2048        let json_content = r#"
2049            {"items": []}
2050            {"items": null}
2051            {}
2052            "#;
2053
2054        let schema = Arc::new(Schema::new(vec![Field::new(
2055            "items",
2056            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2057            true,
2058        )]));
2059
2060        let batches = do_read(json_content, 1024, false, false, schema);
2061        assert_eq!(batches.len(), 1);
2062
2063        let col1 = batches[0].column(0).as_list::<i32>();
2064        assert_eq!(col1.null_count(), 2);
2065        assert!(col1.value(0).is_empty());
2066        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2067        assert!(col1.is_null(1));
2068        assert!(col1.is_null(2));
2069    }
2070
2071    #[test]
2072    fn test_nested_empty_json_arrays() {
2073        let json_content = r#"
2074            {"items": [[],[]]}
2075            {"items": [[null, null],[null]]}
2076            "#;
2077
2078        let schema = Arc::new(Schema::new(vec![Field::new(
2079            "items",
2080            DataType::List(FieldRef::new(Field::new_list_field(
2081                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2082                true,
2083            ))),
2084            true,
2085        )]));
2086
2087        let batches = do_read(json_content, 1024, false, false, schema);
2088        assert_eq!(batches.len(), 1);
2089
2090        let col1 = batches[0].column(0).as_list::<i32>();
2091        assert_eq!(col1.null_count(), 0);
2092        assert_eq!(col1.value(0).len(), 2);
2093        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2094        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2095
2096        assert_eq!(col1.value(1).len(), 2);
2097        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2098        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2099    }
2100
2101    #[test]
2102    fn test_nested_list_json_arrays() {
2103        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2104        let a_struct_field = Field::new_struct(
2105            "a",
2106            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2107            true,
2108        );
2109        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2110        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2111        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2112        let json_content = r#"
2113        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2114        {"a": [{"b": false, "c": null}]}
2115        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2116        {"a": null}
2117        {"a": []}
2118        {"a": [null]}
2119        "#;
2120        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2121
2122        // build expected output
2123        let d = StringArray::from(vec![
2124            Some("a_text"),
2125            Some("b_text"),
2126            None,
2127            Some("c_text"),
2128            Some("d_text"),
2129            None,
2130            None,
2131        ]);
2132        let c = ArrayDataBuilder::new(c_field.data_type().clone())
2133            .len(7)
2134            .add_child_data(d.to_data())
2135            .null_bit_buffer(Some(Buffer::from([0b00111011])))
2136            .build()
2137            .unwrap();
2138        let b = BooleanArray::from(vec![
2139            Some(true),
2140            Some(false),
2141            Some(false),
2142            Some(true),
2143            None,
2144            Some(true),
2145            None,
2146        ]);
2147        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2148            .len(7)
2149            .add_child_data(b.to_data())
2150            .add_child_data(c.clone())
2151            .null_bit_buffer(Some(Buffer::from([0b00111111])))
2152            .build()
2153            .unwrap();
2154        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2155            .len(6)
2156            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2157            .add_child_data(a)
2158            .null_bit_buffer(Some(Buffer::from([0b00110111])))
2159            .build()
2160            .unwrap();
2161        let expected = make_array(a_list);
2162
2163        // compare `a` with result from json reader
2164        let batch = reader.next().unwrap().unwrap();
2165        let read = batch.column(0);
2166        assert_eq!(read.len(), 6);
2167        // compare the arrays the long way around, to better detect differences
2168        let read: &ListArray = read.as_list::<i32>();
2169        let expected = expected.as_list::<i32>();
2170        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2171        // compare list null buffers
2172        assert_eq!(read.nulls(), expected.nulls());
2173        // build struct from list
2174        let struct_array = read.values().as_struct();
2175        let expected_struct_array = expected.values().as_struct();
2176
2177        assert_eq!(7, struct_array.len());
2178        assert_eq!(1, struct_array.null_count());
2179        assert_eq!(7, expected_struct_array.len());
2180        assert_eq!(1, expected_struct_array.null_count());
2181        // test struct's nulls
2182        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2183        // test struct's fields
2184        let read_b = struct_array.column(0);
2185        assert_eq!(read_b.as_ref(), &b);
2186        let read_c = struct_array.column(1);
2187        assert_eq!(read_c.to_data(), c);
2188        let read_c = read_c.as_struct();
2189        let read_d = read_c.column(0);
2190        assert_eq!(read_d.as_ref(), &d);
2191
2192        assert_eq!(read, expected);
2193    }
2194
2195    #[test]
2196    fn test_skip_empty_lines() {
2197        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2198        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2199        let json_content = "
2200        {\"a\": 1}
2201        {\"a\": 2}
2202        {\"a\": 3}";
2203        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2204        let batch = reader.next().unwrap().unwrap();
2205
2206        assert_eq!(1, batch.num_columns());
2207        assert_eq!(3, batch.num_rows());
2208
2209        let schema = reader.schema();
2210        let c = schema.column_with_name("a").unwrap();
2211        assert_eq!(&DataType::Int64, c.1.data_type());
2212    }
2213
2214    #[test]
2215    fn test_with_multiple_batches() {
2216        let file = File::open("test/data/basic_nulls.json").unwrap();
2217        let mut reader = BufReader::new(file);
2218        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2219        reader.rewind().unwrap();
2220
2221        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2222        let mut reader = builder.build(reader).unwrap();
2223
2224        let mut num_records = Vec::new();
2225        while let Some(rb) = reader.next().transpose().unwrap() {
2226            num_records.push(rb.num_rows());
2227        }
2228
2229        assert_eq!(vec![5, 5, 2], num_records);
2230    }
2231
2232    #[test]
2233    fn test_timestamp_from_json_seconds() {
2234        let schema = Schema::new(vec![Field::new(
2235            "a",
2236            DataType::Timestamp(TimeUnit::Second, None),
2237            true,
2238        )]);
2239
2240        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2241        let batch = reader.next().unwrap().unwrap();
2242
2243        assert_eq!(1, batch.num_columns());
2244        assert_eq!(12, batch.num_rows());
2245
2246        let schema = reader.schema();
2247        let batch_schema = batch.schema();
2248        assert_eq!(schema, batch_schema);
2249
2250        let a = schema.column_with_name("a").unwrap();
2251        assert_eq!(
2252            &DataType::Timestamp(TimeUnit::Second, None),
2253            a.1.data_type()
2254        );
2255
2256        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2257        assert!(aa.is_valid(0));
2258        assert!(!aa.is_valid(1));
2259        assert!(!aa.is_valid(2));
2260        assert_eq!(1, aa.value(0));
2261        assert_eq!(1, aa.value(3));
2262        assert_eq!(5, aa.value(7));
2263    }
2264
2265    #[test]
2266    fn test_timestamp_from_json_milliseconds() {
2267        let schema = Schema::new(vec![Field::new(
2268            "a",
2269            DataType::Timestamp(TimeUnit::Millisecond, None),
2270            true,
2271        )]);
2272
2273        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2274        let batch = reader.next().unwrap().unwrap();
2275
2276        assert_eq!(1, batch.num_columns());
2277        assert_eq!(12, batch.num_rows());
2278
2279        let schema = reader.schema();
2280        let batch_schema = batch.schema();
2281        assert_eq!(schema, batch_schema);
2282
2283        let a = schema.column_with_name("a").unwrap();
2284        assert_eq!(
2285            &DataType::Timestamp(TimeUnit::Millisecond, None),
2286            a.1.data_type()
2287        );
2288
2289        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2290        assert!(aa.is_valid(0));
2291        assert!(!aa.is_valid(1));
2292        assert!(!aa.is_valid(2));
2293        assert_eq!(1, aa.value(0));
2294        assert_eq!(1, aa.value(3));
2295        assert_eq!(5, aa.value(7));
2296    }
2297
2298    #[test]
2299    fn test_date_from_json_milliseconds() {
2300        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2301
2302        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2303        let batch = reader.next().unwrap().unwrap();
2304
2305        assert_eq!(1, batch.num_columns());
2306        assert_eq!(12, batch.num_rows());
2307
2308        let schema = reader.schema();
2309        let batch_schema = batch.schema();
2310        assert_eq!(schema, batch_schema);
2311
2312        let a = schema.column_with_name("a").unwrap();
2313        assert_eq!(&DataType::Date64, a.1.data_type());
2314
2315        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2316        assert!(aa.is_valid(0));
2317        assert!(!aa.is_valid(1));
2318        assert!(!aa.is_valid(2));
2319        assert_eq!(1, aa.value(0));
2320        assert_eq!(1, aa.value(3));
2321        assert_eq!(5, aa.value(7));
2322    }
2323
2324    #[test]
2325    fn test_time_from_json_nanoseconds() {
2326        let schema = Schema::new(vec![Field::new(
2327            "a",
2328            DataType::Time64(TimeUnit::Nanosecond),
2329            true,
2330        )]);
2331
2332        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2333        let batch = reader.next().unwrap().unwrap();
2334
2335        assert_eq!(1, batch.num_columns());
2336        assert_eq!(12, batch.num_rows());
2337
2338        let schema = reader.schema();
2339        let batch_schema = batch.schema();
2340        assert_eq!(schema, batch_schema);
2341
2342        let a = schema.column_with_name("a").unwrap();
2343        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2344
2345        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2346        assert!(aa.is_valid(0));
2347        assert!(!aa.is_valid(1));
2348        assert!(!aa.is_valid(2));
2349        assert_eq!(1, aa.value(0));
2350        assert_eq!(1, aa.value(3));
2351        assert_eq!(5, aa.value(7));
2352    }
2353
2354    #[test]
2355    fn test_json_iterator() {
2356        let file = File::open("test/data/basic.json").unwrap();
2357        let mut reader = BufReader::new(file);
2358        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2359        reader.rewind().unwrap();
2360
2361        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2362        let reader = builder.build(reader).unwrap();
2363        let schema = reader.schema();
2364        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2365
2366        let mut sum_num_rows = 0;
2367        let mut num_batches = 0;
2368        let mut sum_a = 0;
2369        for batch in reader {
2370            let batch = batch.unwrap();
2371            assert_eq!(8, batch.num_columns());
2372            sum_num_rows += batch.num_rows();
2373            num_batches += 1;
2374            let batch_schema = batch.schema();
2375            assert_eq!(schema, batch_schema);
2376            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2377            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2378        }
2379        assert_eq!(12, sum_num_rows);
2380        assert_eq!(3, num_batches);
2381        assert_eq!(100000000000011, sum_a);
2382    }
2383
2384    #[test]
2385    fn test_decoder_error() {
2386        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2387            "a",
2388            vec![Field::new("child", DataType::Int32, false)],
2389            true,
2390        )]));
2391
2392        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2393        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2394        assert!(decoder.tape_decoder.has_partial_row());
2395        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2396        let _ = decoder.flush().unwrap_err();
2397        assert!(decoder.tape_decoder.has_partial_row());
2398        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2399
2400        let parse_err = |s: &str| {
2401            ReaderBuilder::new(schema.clone())
2402                .build(Cursor::new(s.as_bytes()))
2403                .unwrap()
2404                .next()
2405                .unwrap()
2406                .unwrap_err()
2407                .to_string()
2408        };
2409
2410        let err = parse_err(r#"{"a": 123}"#);
2411        assert_eq!(
2412            err,
2413            "Json error: whilst decoding field 'a': expected { got 123"
2414        );
2415
2416        let err = parse_err(r#"{"a": ["bar"]}"#);
2417        assert_eq!(
2418            err,
2419            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2420        );
2421
2422        let err = parse_err(r#"{"a": []}"#);
2423        assert_eq!(
2424            err,
2425            "Json error: whilst decoding field 'a': expected { got []"
2426        );
2427
2428        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2429        assert_eq!(
2430            err,
2431            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2432        );
2433
2434        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2435        assert_eq!(
2436            err,
2437            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2438        );
2439
2440        let err = parse_err(r#"{"a": true}"#);
2441        assert_eq!(
2442            err,
2443            "Json error: whilst decoding field 'a': expected { got true"
2444        );
2445
2446        let err = parse_err(r#"{"a": false}"#);
2447        assert_eq!(
2448            err,
2449            "Json error: whilst decoding field 'a': expected { got false"
2450        );
2451
2452        let err = parse_err(r#"{"a": "foo"}"#);
2453        assert_eq!(
2454            err,
2455            "Json error: whilst decoding field 'a': expected { got \"foo\""
2456        );
2457
2458        let err = parse_err(r#"{"a": {"child": false}}"#);
2459        assert_eq!(
2460            err,
2461            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2462        );
2463
2464        let err = parse_err(r#"{"a": {"child": []}}"#);
2465        assert_eq!(
2466            err,
2467            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2468        );
2469
2470        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2471        assert_eq!(
2472            err,
2473            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2474        );
2475
2476        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2477        assert_eq!(
2478            err,
2479            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2480        );
2481    }
2482
2483    #[test]
2484    fn test_serialize_timestamp() {
2485        let json = vec![
2486            json!({"timestamp": 1681319393}),
2487            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2488        ];
2489        let schema = Schema::new(vec![Field::new(
2490            "timestamp",
2491            DataType::Timestamp(TimeUnit::Second, None),
2492            true,
2493        )]);
2494        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2495            .build_decoder()
2496            .unwrap();
2497        decoder.serialize(&json).unwrap();
2498        let batch = decoder.flush().unwrap().unwrap();
2499        assert_eq!(batch.num_rows(), 2);
2500        assert_eq!(batch.num_columns(), 1);
2501        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2502        assert_eq!(values.values(), &[1681319393, -7200]);
2503    }
2504
2505    #[test]
2506    fn test_serialize_decimal() {
2507        let json = vec![
2508            json!({"decimal": 1.234}),
2509            json!({"decimal": "1.234"}),
2510            json!({"decimal": 1234}),
2511            json!({"decimal": "1234"}),
2512        ];
2513        let schema = Schema::new(vec![Field::new(
2514            "decimal",
2515            DataType::Decimal128(10, 3),
2516            true,
2517        )]);
2518        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2519            .build_decoder()
2520            .unwrap();
2521        decoder.serialize(&json).unwrap();
2522        let batch = decoder.flush().unwrap().unwrap();
2523        assert_eq!(batch.num_rows(), 4);
2524        assert_eq!(batch.num_columns(), 1);
2525        let values = batch.column(0).as_primitive::<Decimal128Type>();
2526        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2527    }
2528
2529    #[test]
2530    fn test_serde_field() {
2531        let field = Field::new("int", DataType::Int32, true);
2532        let mut decoder = ReaderBuilder::new_with_field(field)
2533            .build_decoder()
2534            .unwrap();
2535        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2536        let b = decoder.flush().unwrap().unwrap();
2537        let values = b.column(0).as_primitive::<Int32Type>().values();
2538        assert_eq!(values, &[1, 2, 3, 4]);
2539    }
2540
2541    #[test]
2542    fn test_serde_large_numbers() {
2543        let field = Field::new("int", DataType::Int64, true);
2544        let mut decoder = ReaderBuilder::new_with_field(field)
2545            .build_decoder()
2546            .unwrap();
2547
2548        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2549        let b = decoder.flush().unwrap().unwrap();
2550        let values = b.column(0).as_primitive::<Int64Type>().values();
2551        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2552
2553        let field = Field::new(
2554            "int",
2555            DataType::Timestamp(TimeUnit::Microsecond, None),
2556            true,
2557        );
2558        let mut decoder = ReaderBuilder::new_with_field(field)
2559            .build_decoder()
2560            .unwrap();
2561
2562        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2563        let b = decoder.flush().unwrap().unwrap();
2564        let values = b
2565            .column(0)
2566            .as_primitive::<TimestampMicrosecondType>()
2567            .values();
2568        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2569    }
2570
2571    #[test]
2572    fn test_coercing_primitive_into_string_decoder() {
2573        let buf = &format!(
2574            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2575            (i32::MAX as i64 + 10),
2576            i64::MAX - 10
2577        );
2578        let schema = Schema::new(vec![
2579            Field::new("a", DataType::Float64, true),
2580            Field::new("b", DataType::Utf8, true),
2581            Field::new("c", DataType::Utf8, true),
2582        ]);
2583        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2584        let schema_ref = Arc::new(schema);
2585
2586        // read record batches
2587        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2588        let mut decoder = reader.build_decoder().unwrap();
2589        decoder.serialize(json_array.as_slice()).unwrap();
2590        let batch = decoder.flush().unwrap().unwrap();
2591        assert_eq!(
2592            batch,
2593            RecordBatch::try_new(
2594                schema_ref,
2595                vec![
2596                    Arc::new(Float64Array::from(vec![
2597                        1.0,
2598                        2.0,
2599                        (i32::MAX as i64 + 10) as f64,
2600                        (i64::MAX - 10) as f64
2601                    ])),
2602                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2603                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2604                ]
2605            )
2606            .unwrap()
2607        );
2608    }
2609
2610    // Parse the given `row` in `struct_mode` as a type given by fields.
2611    //
2612    // If as_struct == true, wrap the fields in a Struct field with name "r".
2613    // If as_struct == false, wrap the fields in a Schema.
2614    fn _parse_structs(
2615        row: &str,
2616        struct_mode: StructMode,
2617        fields: Fields,
2618        as_struct: bool,
2619    ) -> Result<RecordBatch, ArrowError> {
2620        let builder = if as_struct {
2621            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2622        } else {
2623            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2624        };
2625        builder
2626            .with_struct_mode(struct_mode)
2627            .build(Cursor::new(row.as_bytes()))
2628            .unwrap()
2629            .next()
2630            .unwrap()
2631    }
2632
2633    #[test]
2634    fn test_struct_decoding_list_length() {
2635        use arrow_array::array;
2636
2637        let row = "[1, 2]";
2638
2639        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2640        let too_few_fields = Fields::from(fields.clone());
2641        fields.push(Field::new("b", DataType::Int32, true));
2642        let correct_fields = Fields::from(fields.clone());
2643        fields.push(Field::new("c", DataType::Int32, true));
2644        let too_many_fields = Fields::from(fields.clone());
2645
2646        let parse = |fields: Fields, as_struct: bool| {
2647            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2648        };
2649
2650        let expected_row = StructArray::new(
2651            correct_fields.clone(),
2652            vec![
2653                Arc::new(array::Int32Array::from(vec![1])),
2654                Arc::new(array::Int32Array::from(vec![2])),
2655            ],
2656            None,
2657        );
2658        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2659
2660        assert_eq!(
2661            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2662            "Json error: found extra columns for 1 fields".to_string()
2663        );
2664        assert_eq!(
2665            parse(too_few_fields, false).unwrap_err().to_string(),
2666            "Json error: found extra columns for 1 fields".to_string()
2667        );
2668        assert_eq!(
2669            parse(correct_fields.clone(), true).unwrap(),
2670            RecordBatch::try_new(
2671                Arc::new(Schema::new(vec![row_field])),
2672                vec![Arc::new(expected_row.clone())]
2673            )
2674            .unwrap()
2675        );
2676        assert_eq!(
2677            parse(correct_fields, false).unwrap(),
2678            RecordBatch::from(expected_row)
2679        );
2680        assert_eq!(
2681            parse(too_many_fields.clone(), true)
2682                .unwrap_err()
2683                .to_string(),
2684            "Json error: found 2 columns for 3 fields".to_string()
2685        );
2686        assert_eq!(
2687            parse(too_many_fields, false).unwrap_err().to_string(),
2688            "Json error: found 2 columns for 3 fields".to_string()
2689        );
2690    }
2691
2692    #[test]
2693    fn test_struct_decoding() {
2694        use arrow_array::builder;
2695
2696        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2697        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2698        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2699
2700        let struct_fields = Fields::from(vec![
2701            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2702            Field::new_map(
2703                "c",
2704                "entries",
2705                Field::new("keys", DataType::Utf8, false),
2706                Field::new("values", DataType::Int32, true),
2707                false,
2708                false,
2709            ),
2710        ]);
2711
2712        let list_array =
2713            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2714
2715        let map_array = {
2716            let mut map_builder = builder::MapBuilder::new(
2717                None,
2718                builder::StringBuilder::new(),
2719                builder::Int32Builder::new(),
2720            );
2721            map_builder.keys().append_value("d");
2722            map_builder.values().append_value(3);
2723            map_builder.append(true).unwrap();
2724            map_builder.finish()
2725        };
2726
2727        let struct_array = StructArray::new(
2728            struct_fields.clone(),
2729            vec![Arc::new(list_array), Arc::new(map_array)],
2730            None,
2731        );
2732
2733        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2734        let schema = Arc::new(Schema::new(fields.clone()));
2735        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2736
2737        let parse = |row: &str, struct_mode: StructMode| {
2738            _parse_structs(row, struct_mode, fields.clone(), false)
2739        };
2740
2741        assert_eq!(
2742            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2743            expected
2744        );
2745        assert_eq!(
2746            parse(nested_list_json, StructMode::ObjectOnly)
2747                .unwrap_err()
2748                .to_string(),
2749            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2750        );
2751        assert_eq!(
2752            parse(nested_mixed_json, StructMode::ObjectOnly)
2753                .unwrap_err()
2754                .to_string(),
2755            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2756        );
2757
2758        assert_eq!(
2759            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2760            expected
2761        );
2762        assert_eq!(
2763            parse(nested_object_json, StructMode::ListOnly)
2764                .unwrap_err()
2765                .to_string(),
2766            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2767        );
2768        assert_eq!(
2769            parse(nested_mixed_json, StructMode::ListOnly)
2770                .unwrap_err()
2771                .to_string(),
2772            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2773        );
2774    }
2775
2776    // Test cases:
2777    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2778    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2779    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2780    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2781    #[test]
2782    fn test_struct_decoding_empty_list() {
2783        let int_field = Field::new("a", DataType::Int32, true);
2784        let struct_field = Field::new(
2785            "r",
2786            DataType::Struct(Fields::from(vec![int_field.clone()])),
2787            true,
2788        );
2789
2790        let parse = |row: &str, as_struct: bool, field: Field| {
2791            _parse_structs(
2792                row,
2793                StructMode::ListOnly,
2794                Fields::from(vec![field]),
2795                as_struct,
2796            )
2797        };
2798
2799        // Missing fields
2800        assert_eq!(
2801            parse("[]", true, struct_field.clone())
2802                .unwrap_err()
2803                .to_string(),
2804            "Json error: found 0 columns for 1 fields".to_owned()
2805        );
2806        assert_eq!(
2807            parse("[]", false, int_field.clone())
2808                .unwrap_err()
2809                .to_string(),
2810            "Json error: found 0 columns for 1 fields".to_owned()
2811        );
2812        assert_eq!(
2813            parse("[]", false, struct_field.clone())
2814                .unwrap_err()
2815                .to_string(),
2816            "Json error: found 0 columns for 1 fields".to_owned()
2817        );
2818        assert_eq!(
2819            parse("[[]]", false, struct_field.clone())
2820                .unwrap_err()
2821                .to_string(),
2822            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2823        );
2824    }
2825
2826    #[test]
2827    fn test_decode_list_struct_with_wrong_types() {
2828        let int_field = Field::new("a", DataType::Int32, true);
2829        let struct_field = Field::new(
2830            "r",
2831            DataType::Struct(Fields::from(vec![int_field.clone()])),
2832            true,
2833        );
2834
2835        let parse = |row: &str, as_struct: bool, field: Field| {
2836            _parse_structs(
2837                row,
2838                StructMode::ListOnly,
2839                Fields::from(vec![field]),
2840                as_struct,
2841            )
2842        };
2843
2844        // Wrong values
2845        assert_eq!(
2846            parse(r#"[["a"]]"#, false, struct_field.clone())
2847                .unwrap_err()
2848                .to_string(),
2849            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2850        );
2851        assert_eq!(
2852            parse(r#"[["a"]]"#, true, struct_field.clone())
2853                .unwrap_err()
2854                .to_string(),
2855            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2856        );
2857        assert_eq!(
2858            parse(r#"["a"]"#, true, int_field.clone())
2859                .unwrap_err()
2860                .to_string(),
2861            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2862        );
2863        assert_eq!(
2864            parse(r#"["a"]"#, false, int_field.clone())
2865                .unwrap_err()
2866                .to_string(),
2867            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2868        );
2869    }
2870
2871    #[test]
2872    fn test_read_run_end_encoded() {
2873        let buf = r#"
2874        {"a": "x"}
2875        {"a": "x"}
2876        {"a": "y"}
2877        {"a": "y"}
2878        {"a": "y"}
2879        "#;
2880
2881        let ree_type = DataType::RunEndEncoded(
2882            Arc::new(Field::new("run_ends", DataType::Int32, false)),
2883            Arc::new(Field::new("values", DataType::Utf8, true)),
2884        );
2885        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2886        let batches = do_read(buf, 1024, false, false, schema);
2887        assert_eq!(batches.len(), 1);
2888
2889        let col = batches[0].column(0);
2890        let run_array = col.as_run::<arrow_array::types::Int32Type>();
2891
2892        // 5 logical values compressed into 2 runs
2893        assert_eq!(run_array.len(), 5);
2894        assert_eq!(run_array.run_ends().values(), &[2, 5]);
2895
2896        let values = run_array.values().as_string::<i32>();
2897        assert_eq!(values.len(), 2);
2898        assert_eq!(values.value(0), "x");
2899        assert_eq!(values.value(1), "y");
2900    }
2901
2902    #[test]
2903    fn test_read_run_end_encoded_consecutive_nulls() {
2904        let buf = r#"
2905        {"a": "x"}
2906        {}
2907        {}
2908        {}
2909        {"a": "y"}
2910        "#;
2911
2912        let ree_type = DataType::RunEndEncoded(
2913            Arc::new(Field::new("run_ends", DataType::Int32, false)),
2914            Arc::new(Field::new("values", DataType::Utf8, true)),
2915        );
2916        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2917        let batches = do_read(buf, 1024, false, false, schema);
2918        assert_eq!(batches.len(), 1);
2919
2920        let col = batches[0].column(0);
2921        let run_array = col.as_run::<arrow_array::types::Int32Type>();
2922
2923        // 5 logical values: "x", null, null, null, "y" → 3 runs
2924        assert_eq!(run_array.len(), 5);
2925        assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
2926
2927        let values = run_array.values().as_string::<i32>();
2928        assert_eq!(values.len(), 3);
2929        assert_eq!(values.value(0), "x");
2930        assert!(values.is_null(1));
2931        assert_eq!(values.value(2), "y");
2932    }
2933
2934    #[test]
2935    fn test_read_run_end_encoded_all_unique() {
2936        let buf = r#"
2937        {"a": 1}
2938        {"a": 2}
2939        {"a": 3}
2940        "#;
2941
2942        let ree_type = DataType::RunEndEncoded(
2943            Arc::new(Field::new("run_ends", DataType::Int32, false)),
2944            Arc::new(Field::new("values", DataType::Int32, true)),
2945        );
2946        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2947        let batches = do_read(buf, 1024, false, false, schema);
2948        assert_eq!(batches.len(), 1);
2949
2950        let col = batches[0].column(0);
2951        let run_array = col.as_run::<arrow_array::types::Int32Type>();
2952
2953        // No compression: 3 unique values → 3 runs
2954        assert_eq!(run_array.len(), 3);
2955        assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
2956    }
2957
2958    #[test]
2959    fn test_read_run_end_encoded_int16_run_ends() {
2960        let buf = r#"
2961        {"a": "x"}
2962        {"a": "x"}
2963        {"a": "y"}
2964        "#;
2965
2966        let ree_type = DataType::RunEndEncoded(
2967            Arc::new(Field::new("run_ends", DataType::Int16, false)),
2968            Arc::new(Field::new("values", DataType::Utf8, true)),
2969        );
2970        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2971        let batches = do_read(buf, 1024, false, false, schema);
2972        assert_eq!(batches.len(), 1);
2973
2974        let col = batches[0].column(0);
2975        let run_array = col.as_run::<arrow_array::types::Int16Type>();
2976
2977        assert_eq!(run_array.len(), 3);
2978        assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
2979    }
2980}