arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use crate::StructMode;
137use crate::reader::binary_array::{
138    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::io::BufRead;
141use std::sync::Arc;
142
143use chrono::Utc;
144use serde_core::Serialize;
145
146use arrow_array::timezone::Tz;
147use arrow_array::types::*;
148use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
149use arrow_data::ArrayData;
150use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
151pub use schema::*;
152
153use crate::reader::boolean_array::BooleanArrayDecoder;
154use crate::reader::decimal_array::DecimalArrayDecoder;
155use crate::reader::list_array::ListArrayDecoder;
156use crate::reader::map_array::MapArrayDecoder;
157use crate::reader::null_array::NullArrayDecoder;
158use crate::reader::primitive_array::PrimitiveArrayDecoder;
159use crate::reader::string_array::StringArrayDecoder;
160use crate::reader::string_view_array::StringViewArrayDecoder;
161use crate::reader::struct_array::StructArrayDecoder;
162use crate::reader::tape::{Tape, TapeDecoder};
163use crate::reader::timestamp_array::TimestampArrayDecoder;
164
165mod binary_array;
166mod boolean_array;
167mod decimal_array;
168mod list_array;
169mod map_array;
170mod null_array;
171mod primitive_array;
172mod schema;
173mod serializer;
174mod string_array;
175mod string_view_array;
176mod struct_array;
177mod tape;
178mod timestamp_array;
179
180/// A builder for [`Reader`] and [`Decoder`]
181pub struct ReaderBuilder {
182    batch_size: usize,
183    coerce_primitive: bool,
184    strict_mode: bool,
185    is_field: bool,
186    struct_mode: StructMode,
187
188    schema: SchemaRef,
189}
190
191impl ReaderBuilder {
192    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
193    ///
194    /// This could be obtained using [`infer_json_schema`] if not known
195    ///
196    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
197    /// In this case, an error is returned when a column is missing from `schema`.
198    ///
199    /// [`infer_json_schema`]: crate::reader::infer_json_schema
200    pub fn new(schema: SchemaRef) -> Self {
201        Self {
202            batch_size: 1024,
203            coerce_primitive: false,
204            strict_mode: false,
205            is_field: false,
206            struct_mode: Default::default(),
207            schema,
208        }
209    }
210
211    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
212    ///
213    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
214    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
215    ///
216    /// ```
217    /// # use std::sync::Arc;
218    /// # use arrow_array::cast::AsArray;
219    /// # use arrow_array::types::Int32Type;
220    /// # use arrow_json::ReaderBuilder;
221    /// # use arrow_schema::{DataType, Field};
222    /// // Root of JSON schema is a numeric type
223    /// let data = "1\n2\n3\n";
224    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
225    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
226    /// let b = reader.next().unwrap().unwrap();
227    /// let values = b.column(0).as_primitive::<Int32Type>().values();
228    /// assert_eq!(values, &[1, 2, 3]);
229    ///
230    /// // Root of JSON schema is a list type
231    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
232    /// let field = Field::new_list("int", field.clone(), true);
233    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
234    /// let b = reader.next().unwrap().unwrap();
235    /// let list = b.column(0).as_list::<i32>();
236    ///
237    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
238    /// let list_values = list.values().as_primitive::<Int32Type>();
239    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
240    /// ```
241    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
242        Self {
243            batch_size: 1024,
244            coerce_primitive: false,
245            strict_mode: false,
246            is_field: true,
247            struct_mode: Default::default(),
248            schema: Arc::new(Schema::new([field.into()])),
249        }
250    }
251
252    /// Sets the batch size in rows to read
253    pub fn with_batch_size(self, batch_size: usize) -> Self {
254        Self { batch_size, ..self }
255    }
256
257    /// Sets if the decoder should coerce primitive values (bool and number) into string
258    /// when the Schema's column is Utf8 or LargeUtf8.
259    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
260        Self {
261            coerce_primitive,
262            ..self
263        }
264    }
265
266    /// Sets if the decoder should return an error if it encounters a column not
267    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
268    /// `strict_mode` is effectively `true`. It is required for all fields of
269    /// the struct to be in the list: without field names, there is no way to
270    /// determine which field is missing.
271    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
272        Self {
273            strict_mode,
274            ..self
275        }
276    }
277
278    /// Set the [`StructMode`] for the reader, which determines whether structs
279    /// can be decoded from JSON as objects or lists. For more details refer to
280    /// the enum documentation. Default is to use `ObjectOnly`.
281    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
282        Self {
283            struct_mode,
284            ..self
285        }
286    }
287
288    /// Create a [`Reader`] with the provided [`BufRead`]
289    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
290        Ok(Reader {
291            reader,
292            decoder: self.build_decoder()?,
293        })
294    }
295
296    /// Create a [`Decoder`]
297    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
298        let (data_type, nullable) = match self.is_field {
299            false => (DataType::Struct(self.schema.fields.clone()), false),
300            true => {
301                let field = &self.schema.fields[0];
302                (field.data_type().clone(), field.is_nullable())
303            }
304        };
305
306        let decoder = make_decoder(
307            data_type,
308            self.coerce_primitive,
309            self.strict_mode,
310            nullable,
311            self.struct_mode,
312        )?;
313
314        let num_fields = self.schema.flattened_fields().len();
315
316        Ok(Decoder {
317            decoder,
318            is_field: self.is_field,
319            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
320            batch_size: self.batch_size,
321            schema: self.schema,
322        })
323    }
324}
325
326/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
327///
328/// Lines consisting solely of ASCII whitespace are ignored
329pub struct Reader<R> {
330    reader: R,
331    decoder: Decoder,
332}
333
334impl<R> std::fmt::Debug for Reader<R> {
335    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336        f.debug_struct("Reader")
337            .field("decoder", &self.decoder)
338            .finish()
339    }
340}
341
342impl<R: BufRead> Reader<R> {
343    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
344    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
345        loop {
346            let buf = self.reader.fill_buf()?;
347            if buf.is_empty() {
348                break;
349            }
350            let read = buf.len();
351
352            let decoded = self.decoder.decode(buf)?;
353            self.reader.consume(decoded);
354            if decoded != read {
355                break;
356            }
357        }
358        self.decoder.flush()
359    }
360}
361
362impl<R: BufRead> Iterator for Reader<R> {
363    type Item = Result<RecordBatch, ArrowError>;
364
365    fn next(&mut self) -> Option<Self::Item> {
366        self.read().transpose()
367    }
368}
369
370impl<R: BufRead> RecordBatchReader for Reader<R> {
371    fn schema(&self) -> SchemaRef {
372        self.decoder.schema.clone()
373    }
374}
375
376/// A low-level interface for reading JSON data from a byte stream
377///
378/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
379///
380/// The push-based interface facilitates integration with sources that yield arbitrarily
381/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
382/// object storage
383///
384/// ```
385/// # use std::io::BufRead;
386/// # use arrow_array::RecordBatch;
387/// # use arrow_json::reader::{Decoder, ReaderBuilder};
388/// # use arrow_schema::{ArrowError, SchemaRef};
389/// #
390/// fn read_from_json<R: BufRead>(
391///     mut reader: R,
392///     schema: SchemaRef,
393/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
394///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
395///     let mut next = move || {
396///         loop {
397///             // Decoder is agnostic that buf doesn't contain whole records
398///             let buf = reader.fill_buf()?;
399///             if buf.is_empty() {
400///                 break; // Input exhausted
401///             }
402///             let read = buf.len();
403///             let decoded = decoder.decode(buf)?;
404///
405///             // Consume the number of bytes read
406///             reader.consume(decoded);
407///             if decoded != read {
408///                 break; // Read batch size
409///             }
410///         }
411///         decoder.flush()
412///     };
413///     Ok(std::iter::from_fn(move || next().transpose()))
414/// }
415/// ```
416pub struct Decoder {
417    tape_decoder: TapeDecoder,
418    decoder: Box<dyn ArrayDecoder>,
419    batch_size: usize,
420    is_field: bool,
421    schema: SchemaRef,
422}
423
424impl std::fmt::Debug for Decoder {
425    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426        f.debug_struct("Decoder")
427            .field("schema", &self.schema)
428            .field("batch_size", &self.batch_size)
429            .finish()
430    }
431}
432
433impl Decoder {
434    /// Read JSON objects from `buf`, returning the number of bytes read
435    ///
436    /// This method returns once `batch_size` objects have been parsed since the
437    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
438    /// should be included in the next call to [`Self::decode`]
439    ///
440    /// There is no requirement that `buf` contains a whole number of records, facilitating
441    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
442    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
443        self.tape_decoder.decode(buf)
444    }
445
446    /// Serialize `rows` to this [`Decoder`]
447    ///
448    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
449    /// [`RecordBatch`].
450    ///
451    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
452    /// especially where the schema is known at compile-time, however, this provides a mechanism
453    /// to get something up and running quickly
454    ///
455    /// It can be used with [`serde_json::Value`]
456    ///
457    /// ```
458    /// # use std::sync::Arc;
459    /// # use serde_json::{Value, json};
460    /// # use arrow_array::cast::AsArray;
461    /// # use arrow_array::types::Float32Type;
462    /// # use arrow_json::ReaderBuilder;
463    /// # use arrow_schema::{DataType, Field, Schema};
464    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
465    ///
466    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
467    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
468    ///
469    /// decoder.serialize(&json).unwrap();
470    /// let batch = decoder.flush().unwrap().unwrap();
471    /// assert_eq!(batch.num_rows(), 2);
472    /// assert_eq!(batch.num_columns(), 1);
473    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
474    /// assert_eq!(values, &[2.3, 5.7])
475    /// ```
476    ///
477    /// Or with arbitrary [`Serialize`] types
478    ///
479    /// ```
480    /// # use std::sync::Arc;
481    /// # use arrow_json::ReaderBuilder;
482    /// # use arrow_schema::{DataType, Field, Schema};
483    /// # use serde::Serialize;
484    /// # use arrow_array::cast::AsArray;
485    /// # use arrow_array::types::{Float32Type, Int32Type};
486    /// #
487    /// #[derive(Serialize)]
488    /// struct MyStruct {
489    ///     int32: i32,
490    ///     float: f32,
491    /// }
492    ///
493    /// let schema = Schema::new(vec![
494    ///     Field::new("int32", DataType::Int32, false),
495    ///     Field::new("float", DataType::Float32, false),
496    /// ]);
497    ///
498    /// let rows = vec![
499    ///     MyStruct{ int32: 0, float: 3. },
500    ///     MyStruct{ int32: 4, float: 67.53 },
501    /// ];
502    ///
503    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
504    /// decoder.serialize(&rows).unwrap();
505    ///
506    /// let batch = decoder.flush().unwrap().unwrap();
507    ///
508    /// // Expect batch containing two columns
509    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
510    /// assert_eq!(int32.values(), &[0, 4]);
511    ///
512    /// let float = batch.column(1).as_primitive::<Float32Type>();
513    /// assert_eq!(float.values(), &[3., 67.53]);
514    /// ```
515    ///
516    /// Or even complex nested types
517    ///
518    /// ```
519    /// # use std::collections::BTreeMap;
520    /// # use std::sync::Arc;
521    /// # use arrow_array::StructArray;
522    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
523    /// # use arrow_json::ReaderBuilder;
524    /// # use arrow_schema::{DataType, Field, Fields, Schema};
525    /// # use serde::Serialize;
526    /// #
527    /// #[derive(Serialize)]
528    /// struct MyStruct {
529    ///     int32: i32,
530    ///     list: Vec<f64>,
531    ///     nested: Vec<Option<Nested>>,
532    /// }
533    ///
534    /// impl MyStruct {
535    ///     /// Returns the [`Fields`] for [`MyStruct`]
536    ///     fn fields() -> Fields {
537    ///         let nested = DataType::Struct(Nested::fields());
538    ///         Fields::from([
539    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
540    ///             Arc::new(Field::new_list(
541    ///                 "list",
542    ///                 Field::new("element", DataType::Float64, false),
543    ///                 false,
544    ///             )),
545    ///             Arc::new(Field::new_list(
546    ///                 "nested",
547    ///                 Field::new("element", nested, true),
548    ///                 true,
549    ///             )),
550    ///         ])
551    ///     }
552    /// }
553    ///
554    /// #[derive(Serialize)]
555    /// struct Nested {
556    ///     map: BTreeMap<String, Vec<String>>
557    /// }
558    ///
559    /// impl Nested {
560    ///     /// Returns the [`Fields`] for [`Nested`]
561    ///     fn fields() -> Fields {
562    ///         let element = Field::new("element", DataType::Utf8, false);
563    ///         Fields::from([
564    ///             Arc::new(Field::new_map(
565    ///                 "map",
566    ///                 "entries",
567    ///                 Field::new("key", DataType::Utf8, false),
568    ///                 Field::new_list("value", element, false),
569    ///                 false, // sorted
570    ///                 false, // nullable
571    ///             ))
572    ///         ])
573    ///     }
574    /// }
575    ///
576    /// let data = vec![
577    ///     MyStruct {
578    ///         int32: 34,
579    ///         list: vec![1., 2., 34.],
580    ///         nested: vec![
581    ///             None,
582    ///             Some(Nested {
583    ///                 map: vec![
584    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
585    ///                     ("key2".to_string(), vec!["baz".to_string()])
586    ///                 ].into_iter().collect()
587    ///             })
588    ///         ]
589    ///     },
590    ///     MyStruct {
591    ///         int32: 56,
592    ///         list: vec![],
593    ///         nested: vec![]
594    ///     },
595    ///     MyStruct {
596    ///         int32: 24,
597    ///         list: vec![-1., 245.],
598    ///         nested: vec![None]
599    ///     }
600    /// ];
601    ///
602    /// let schema = Schema::new(MyStruct::fields());
603    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
604    /// decoder.serialize(&data).unwrap();
605    /// let batch = decoder.flush().unwrap().unwrap();
606    /// assert_eq!(batch.num_rows(), 3);
607    /// assert_eq!(batch.num_columns(), 3);
608    ///
609    /// // Convert to StructArray to format
610    /// let s = StructArray::from(batch);
611    /// let options = FormatOptions::default().with_null("null");
612    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
613    ///
614    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
615    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
616    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
617    /// ```
618    ///
619    /// Note: this ignores any batch size setting, and always decodes all rows
620    ///
621    /// [serde]: https://docs.rs/serde/latest/serde/
622    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
623        self.tape_decoder.serialize(rows)
624    }
625
626    /// True if the decoder is currently part way through decoding a record.
627    pub fn has_partial_record(&self) -> bool {
628        self.tape_decoder.has_partial_row()
629    }
630
631    /// The number of unflushed records, including the partially decoded record (if any).
632    pub fn len(&self) -> usize {
633        self.tape_decoder.num_buffered_rows()
634    }
635
636    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
637    pub fn is_empty(&self) -> bool {
638        self.len() == 0
639    }
640
641    /// Flushes the currently buffered data to a [`RecordBatch`]
642    ///
643    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
644    ///
645    /// Note: This will return an error if called part way through decoding a record,
646    /// i.e. [`Self::has_partial_record`] is true.
647    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
648        let tape = self.tape_decoder.finish()?;
649
650        if tape.num_rows() == 0 {
651            return Ok(None);
652        }
653
654        // First offset is null sentinel
655        let mut next_object = 1;
656        let pos: Vec<_> = (0..tape.num_rows())
657            .map(|_| {
658                let next = tape.next(next_object, "row").unwrap();
659                std::mem::replace(&mut next_object, next)
660            })
661            .collect();
662
663        let decoded = self.decoder.decode(&tape, &pos)?;
664        self.tape_decoder.clear();
665
666        let batch = match self.is_field {
667            true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
668            false => {
669                RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
670            }
671        };
672
673        Ok(Some(batch))
674    }
675}
676
677trait ArrayDecoder: Send {
678    /// Decode elements from `tape` starting at the indexes contained in `pos`
679    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
680}
681
682macro_rules! primitive_decoder {
683    ($t:ty, $data_type:expr) => {
684        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
685    };
686}
687
688fn make_decoder(
689    data_type: DataType,
690    coerce_primitive: bool,
691    strict_mode: bool,
692    is_nullable: bool,
693    struct_mode: StructMode,
694) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
695    downcast_integer! {
696        data_type => (primitive_decoder, data_type),
697        DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
698        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
699        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
700        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
701        DataType::Timestamp(TimeUnit::Second, None) => {
702            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
703        },
704        DataType::Timestamp(TimeUnit::Millisecond, None) => {
705            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
706        },
707        DataType::Timestamp(TimeUnit::Microsecond, None) => {
708            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
709        },
710        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
711            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
712        },
713        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
714            let tz: Tz = tz.parse()?;
715            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
716        },
717        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
718            let tz: Tz = tz.parse()?;
719            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
720        },
721        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
722            let tz: Tz = tz.parse()?;
723            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
724        },
725        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
726            let tz: Tz = tz.parse()?;
727            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
728        },
729        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
730        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
731        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
732        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
733        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
734        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
735        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
736        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
737        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
738        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
739        DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
740        DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
741        DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
742        DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
743        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
744        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
745        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
746        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
747        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
748        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
749        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
750        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
751        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
752        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
753        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
754        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
755        d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
756    }
757}
758
759#[cfg(test)]
760mod tests {
761    use serde_json::json;
762    use std::fs::File;
763    use std::io::{BufReader, Cursor, Seek};
764
765    use arrow_array::cast::AsArray;
766    use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
767    use arrow_buffer::{ArrowNativeType, Buffer};
768    use arrow_cast::display::{ArrayFormatter, FormatOptions};
769    use arrow_data::ArrayDataBuilder;
770    use arrow_schema::{Field, Fields};
771
772    use super::*;
773
774    fn do_read(
775        buf: &str,
776        batch_size: usize,
777        coerce_primitive: bool,
778        strict_mode: bool,
779        schema: SchemaRef,
780    ) -> Vec<RecordBatch> {
781        let mut unbuffered = vec![];
782
783        // Test with different batch sizes to test for boundary conditions
784        for batch_size in [1, 3, 100, batch_size] {
785            unbuffered = ReaderBuilder::new(schema.clone())
786                .with_batch_size(batch_size)
787                .with_coerce_primitive(coerce_primitive)
788                .build(Cursor::new(buf.as_bytes()))
789                .unwrap()
790                .collect::<Result<Vec<_>, _>>()
791                .unwrap();
792
793            for b in unbuffered.iter().take(unbuffered.len() - 1) {
794                assert_eq!(b.num_rows(), batch_size)
795            }
796
797            // Test with different buffer sizes to test for boundary conditions
798            for b in [1, 3, 5] {
799                let buffered = ReaderBuilder::new(schema.clone())
800                    .with_batch_size(batch_size)
801                    .with_coerce_primitive(coerce_primitive)
802                    .with_strict_mode(strict_mode)
803                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
804                    .unwrap()
805                    .collect::<Result<Vec<_>, _>>()
806                    .unwrap();
807                assert_eq!(unbuffered, buffered);
808            }
809        }
810
811        unbuffered
812    }
813
814    #[test]
815    fn test_basic() {
816        let buf = r#"
817        {"a": 1, "b": 2, "c": true, "d": 1}
818        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
819
820        {"b": 6, "a": 2.0, "d": 45}
821        {"b": "5", "a": 2}
822        {"b": 4e0}
823        {"b": 7, "a": null}
824        "#;
825
826        let schema = Arc::new(Schema::new(vec![
827            Field::new("a", DataType::Int64, true),
828            Field::new("b", DataType::Int32, true),
829            Field::new("c", DataType::Boolean, true),
830            Field::new("d", DataType::Date32, true),
831            Field::new("e", DataType::Date64, true),
832        ]));
833
834        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
835        assert!(decoder.is_empty());
836        assert_eq!(decoder.len(), 0);
837        assert!(!decoder.has_partial_record());
838        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
839        assert!(!decoder.is_empty());
840        assert_eq!(decoder.len(), 6);
841        assert!(!decoder.has_partial_record());
842        let batch = decoder.flush().unwrap().unwrap();
843        assert_eq!(batch.num_rows(), 6);
844        assert!(decoder.is_empty());
845        assert_eq!(decoder.len(), 0);
846        assert!(!decoder.has_partial_record());
847
848        let batches = do_read(buf, 1024, false, false, schema);
849        assert_eq!(batches.len(), 1);
850
851        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
852        assert_eq!(col1.null_count(), 2);
853        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
854        assert!(col1.is_null(4));
855        assert!(col1.is_null(5));
856
857        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
858        assert_eq!(col2.null_count(), 0);
859        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
860
861        let col3 = batches[0].column(2).as_boolean();
862        assert_eq!(col3.null_count(), 4);
863        assert!(col3.value(0));
864        assert!(!col3.is_null(0));
865        assert!(!col3.value(1));
866        assert!(!col3.is_null(1));
867
868        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
869        assert_eq!(col4.null_count(), 3);
870        assert!(col4.is_null(3));
871        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
872
873        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
874        assert_eq!(col5.null_count(), 5);
875        assert!(col5.is_null(0));
876        assert!(col5.is_null(2));
877        assert!(col5.is_null(3));
878        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
879    }
880
881    #[test]
882    fn test_string() {
883        let buf = r#"
884        {"a": "1", "b": "2"}
885        {"a": "hello", "b": "shoo"}
886        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
887
888        {"b": null}
889        {"b": "", "a": null}
890
891        "#;
892        let schema = Arc::new(Schema::new(vec![
893            Field::new("a", DataType::Utf8, true),
894            Field::new("b", DataType::LargeUtf8, true),
895        ]));
896
897        let batches = do_read(buf, 1024, false, false, schema);
898        assert_eq!(batches.len(), 1);
899
900        let col1 = batches[0].column(0).as_string::<i32>();
901        assert_eq!(col1.null_count(), 2);
902        assert_eq!(col1.value(0), "1");
903        assert_eq!(col1.value(1), "hello");
904        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
905        assert!(col1.is_null(3));
906        assert!(col1.is_null(4));
907
908        let col2 = batches[0].column(1).as_string::<i64>();
909        assert_eq!(col2.null_count(), 1);
910        assert_eq!(col2.value(0), "2");
911        assert_eq!(col2.value(1), "shoo");
912        assert_eq!(col2.value(2), "\t😁foo");
913        assert!(col2.is_null(3));
914        assert_eq!(col2.value(4), "");
915    }
916
917    #[test]
918    fn test_long_string_view_allocation() {
919        // The JSON input contains field "a" with different string lengths.
920        // According to the implementation in the decoder:
921        // - For a string, capacity is only increased if its length > 12 bytes.
922        // Therefore, for:
923        // Row 1: "short" (5 bytes) -> capacity += 0
924        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
925        // Row 3: "hello" (5 bytes) -> capacity += 0
926        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
927        // Expected total capacity = 24 + 17 = 41
928        let expected_capacity: usize = 41;
929
930        let buf = r#"
931        {"a": "short", "b": "dummy"}
932        {"a": "this is definitely long", "b": "dummy"}
933        {"a": "hello", "b": "dummy"}
934        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
935        "#;
936
937        let schema = Arc::new(Schema::new(vec![
938            Field::new("a", DataType::Utf8View, true),
939            Field::new("b", DataType::LargeUtf8, true),
940        ]));
941
942        let batches = do_read(buf, 1024, false, false, schema);
943        assert_eq!(batches.len(), 1, "Expected one record batch");
944
945        // Get the first column ("a") as a StringViewArray.
946        let col_a = batches[0].column(0);
947        let string_view_array = col_a
948            .as_any()
949            .downcast_ref::<StringViewArray>()
950            .expect("Column should be a StringViewArray");
951
952        // Retrieve the underlying data buffer from the array.
953        // The builder pre-allocates capacity based on the sum of lengths for long strings.
954        let data_buffer = string_view_array.to_data().buffers()[0].len();
955
956        // Check that the allocated capacity is at least what we expected.
957        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
958        assert!(
959            data_buffer >= expected_capacity,
960            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
961        );
962
963        // Additionally, verify that the decoded values are correct.
964        assert_eq!(string_view_array.value(0), "short");
965        assert_eq!(string_view_array.value(1), "this is definitely long");
966        assert_eq!(string_view_array.value(2), "hello");
967        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
968    }
969
970    /// Test the memory capacity allocation logic when converting numeric types to strings.
971    #[test]
972    fn test_numeric_view_allocation() {
973        // For numeric types, the expected capacity calculation is as follows:
974        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
975        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
976        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
977        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
978        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
979        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
980        let expected_capacity: usize = 33;
981
982        let buf = r#"
983    {"n": 123456789}
984    {"n": 1000000000000}
985    {"n": 3.1415}
986    {"n": 2.718281828459045}
987    "#;
988
989        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
990
991        let batches = do_read(buf, 1024, true, false, schema);
992        assert_eq!(batches.len(), 1, "Expected one record batch");
993
994        let col_n = batches[0].column(0);
995        let string_view_array = col_n
996            .as_any()
997            .downcast_ref::<StringViewArray>()
998            .expect("Column should be a StringViewArray");
999
1000        // Check that the underlying data buffer capacity is at least the expected value.
1001        let data_buffer = string_view_array.to_data().buffers()[0].len();
1002        assert!(
1003            data_buffer >= expected_capacity,
1004            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1005        );
1006
1007        // Verify that the converted string values are correct.
1008        // Note: The format of the number converted to a string should match the actual implementation.
1009        assert_eq!(string_view_array.value(0), "123456789");
1010        assert_eq!(string_view_array.value(1), "1000000000000");
1011        assert_eq!(string_view_array.value(2), "3.1415");
1012        assert_eq!(string_view_array.value(3), "2.718281828459045");
1013    }
1014
1015    #[test]
1016    fn test_string_with_uft8view() {
1017        let buf = r#"
1018        {"a": "1", "b": "2"}
1019        {"a": "hello", "b": "shoo"}
1020        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1021
1022        {"b": null}
1023        {"b": "", "a": null}
1024
1025        "#;
1026        let schema = Arc::new(Schema::new(vec![
1027            Field::new("a", DataType::Utf8View, true),
1028            Field::new("b", DataType::LargeUtf8, true),
1029        ]));
1030
1031        let batches = do_read(buf, 1024, false, false, schema);
1032        assert_eq!(batches.len(), 1);
1033
1034        let col1 = batches[0].column(0).as_string_view();
1035        assert_eq!(col1.null_count(), 2);
1036        assert_eq!(col1.value(0), "1");
1037        assert_eq!(col1.value(1), "hello");
1038        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1039        assert!(col1.is_null(3));
1040        assert!(col1.is_null(4));
1041        assert_eq!(col1.data_type(), &DataType::Utf8View);
1042
1043        let col2 = batches[0].column(1).as_string::<i64>();
1044        assert_eq!(col2.null_count(), 1);
1045        assert_eq!(col2.value(0), "2");
1046        assert_eq!(col2.value(1), "shoo");
1047        assert_eq!(col2.value(2), "\t😁foo");
1048        assert!(col2.is_null(3));
1049        assert_eq!(col2.value(4), "");
1050    }
1051
1052    #[test]
1053    fn test_complex() {
1054        let buf = r#"
1055           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1056           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1057           {"list": null, "nested": {"a": null}}
1058        "#;
1059
1060        let schema = Arc::new(Schema::new(vec![
1061            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1062            Field::new_struct(
1063                "nested",
1064                vec![
1065                    Field::new("a", DataType::Int32, true),
1066                    Field::new("b", DataType::Int32, true),
1067                ],
1068                true,
1069            ),
1070            Field::new_struct(
1071                "nested_list",
1072                vec![Field::new_list(
1073                    "list2",
1074                    Field::new_struct(
1075                        "element",
1076                        vec![Field::new("c", DataType::Int32, false)],
1077                        false,
1078                    ),
1079                    true,
1080                )],
1081                true,
1082            ),
1083        ]));
1084
1085        let batches = do_read(buf, 1024, false, false, schema);
1086        assert_eq!(batches.len(), 1);
1087
1088        let list = batches[0].column(0).as_list::<i32>();
1089        assert_eq!(list.len(), 3);
1090        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1091        assert_eq!(list.null_count(), 1);
1092        assert!(list.is_null(2));
1093        let list_values = list.values().as_primitive::<Int32Type>();
1094        assert_eq!(list_values.values(), &[5, 6]);
1095
1096        let nested = batches[0].column(1).as_struct();
1097        let a = nested.column(0).as_primitive::<Int32Type>();
1098        assert_eq!(list.null_count(), 1);
1099        assert_eq!(a.values(), &[1, 7, 0]);
1100        assert!(list.is_null(2));
1101
1102        let b = nested.column(1).as_primitive::<Int32Type>();
1103        assert_eq!(b.null_count(), 2);
1104        assert_eq!(b.len(), 3);
1105        assert_eq!(b.value(0), 2);
1106        assert!(b.is_null(1));
1107        assert!(b.is_null(2));
1108
1109        let nested_list = batches[0].column(2).as_struct();
1110        assert_eq!(nested_list.len(), 3);
1111        assert_eq!(nested_list.null_count(), 1);
1112        assert!(nested_list.is_null(2));
1113
1114        let list2 = nested_list.column(0).as_list::<i32>();
1115        assert_eq!(list2.len(), 3);
1116        assert_eq!(list2.null_count(), 1);
1117        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1118        assert!(list2.is_null(2));
1119
1120        let list2_values = list2.values().as_struct();
1121
1122        let c = list2_values.column(0).as_primitive::<Int32Type>();
1123        assert_eq!(c.values(), &[3, 4]);
1124    }
1125
1126    #[test]
1127    fn test_projection() {
1128        let buf = r#"
1129           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1130           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1131        "#;
1132
1133        let schema = Arc::new(Schema::new(vec![
1134            Field::new_struct(
1135                "nested",
1136                vec![Field::new("a", DataType::Int32, false)],
1137                true,
1138            ),
1139            Field::new_struct(
1140                "nested_list",
1141                vec![Field::new_list(
1142                    "list2",
1143                    Field::new_struct(
1144                        "element",
1145                        vec![Field::new("d", DataType::Int32, true)],
1146                        false,
1147                    ),
1148                    true,
1149                )],
1150                true,
1151            ),
1152        ]));
1153
1154        let batches = do_read(buf, 1024, false, false, schema);
1155        assert_eq!(batches.len(), 1);
1156
1157        let nested = batches[0].column(0).as_struct();
1158        assert_eq!(nested.num_columns(), 1);
1159        let a = nested.column(0).as_primitive::<Int32Type>();
1160        assert_eq!(a.null_count(), 0);
1161        assert_eq!(a.values(), &[1, 7]);
1162
1163        let nested_list = batches[0].column(1).as_struct();
1164        assert_eq!(nested_list.num_columns(), 1);
1165        assert_eq!(nested_list.null_count(), 0);
1166
1167        let list2 = nested_list.column(0).as_list::<i32>();
1168        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1169        assert_eq!(list2.null_count(), 0);
1170
1171        let child = list2.values().as_struct();
1172        assert_eq!(child.num_columns(), 1);
1173        assert_eq!(child.len(), 2);
1174        assert_eq!(child.null_count(), 0);
1175
1176        let c = child.column(0).as_primitive::<Int32Type>();
1177        assert_eq!(c.values(), &[5, 0]);
1178        assert_eq!(c.null_count(), 1);
1179        assert!(c.is_null(1));
1180    }
1181
1182    #[test]
1183    fn test_map() {
1184        let buf = r#"
1185           {"map": {"a": ["foo", null]}}
1186           {"map": {"a": [null], "b": []}}
1187           {"map": {"c": null, "a": ["baz"]}}
1188        "#;
1189        let map = Field::new_map(
1190            "map",
1191            "entries",
1192            Field::new("key", DataType::Utf8, false),
1193            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1194            false,
1195            true,
1196        );
1197
1198        let schema = Arc::new(Schema::new(vec![map]));
1199
1200        let batches = do_read(buf, 1024, false, false, schema);
1201        assert_eq!(batches.len(), 1);
1202
1203        let map = batches[0].column(0).as_map();
1204        let map_keys = map.keys().as_string::<i32>();
1205        let map_values = map.values().as_list::<i32>();
1206        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1207
1208        let k: Vec<_> = map_keys.iter().flatten().collect();
1209        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1210
1211        let list_values = map_values.values().as_string::<i32>();
1212        let lv: Vec<_> = list_values.iter().collect();
1213        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1214        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1215        assert_eq!(map_values.null_count(), 1);
1216        assert!(map_values.is_null(3));
1217
1218        let options = FormatOptions::default().with_null("null");
1219        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1220        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1221        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1222        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1223    }
1224
1225    #[test]
1226    fn test_not_coercing_primitive_into_string_without_flag() {
1227        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1228
1229        let buf = r#"{"a": 1}"#;
1230        let err = ReaderBuilder::new(schema.clone())
1231            .with_batch_size(1024)
1232            .build(Cursor::new(buf.as_bytes()))
1233            .unwrap()
1234            .read()
1235            .unwrap_err();
1236
1237        assert_eq!(
1238            err.to_string(),
1239            "Json error: whilst decoding field 'a': expected string got 1"
1240        );
1241
1242        let buf = r#"{"a": true}"#;
1243        let err = ReaderBuilder::new(schema)
1244            .with_batch_size(1024)
1245            .build(Cursor::new(buf.as_bytes()))
1246            .unwrap()
1247            .read()
1248            .unwrap_err();
1249
1250        assert_eq!(
1251            err.to_string(),
1252            "Json error: whilst decoding field 'a': expected string got true"
1253        );
1254    }
1255
1256    #[test]
1257    fn test_coercing_primitive_into_string() {
1258        let buf = r#"
1259        {"a": 1, "b": 2, "c": true}
1260        {"a": 2E0, "b": 4, "c": false}
1261
1262        {"b": 6, "a": 2.0}
1263        {"b": "5", "a": 2}
1264        {"b": 4e0}
1265        {"b": 7, "a": null}
1266        "#;
1267
1268        let schema = Arc::new(Schema::new(vec![
1269            Field::new("a", DataType::Utf8, true),
1270            Field::new("b", DataType::Utf8, true),
1271            Field::new("c", DataType::Utf8, true),
1272        ]));
1273
1274        let batches = do_read(buf, 1024, true, false, schema);
1275        assert_eq!(batches.len(), 1);
1276
1277        let col1 = batches[0].column(0).as_string::<i32>();
1278        assert_eq!(col1.null_count(), 2);
1279        assert_eq!(col1.value(0), "1");
1280        assert_eq!(col1.value(1), "2E0");
1281        assert_eq!(col1.value(2), "2.0");
1282        assert_eq!(col1.value(3), "2");
1283        assert!(col1.is_null(4));
1284        assert!(col1.is_null(5));
1285
1286        let col2 = batches[0].column(1).as_string::<i32>();
1287        assert_eq!(col2.null_count(), 0);
1288        assert_eq!(col2.value(0), "2");
1289        assert_eq!(col2.value(1), "4");
1290        assert_eq!(col2.value(2), "6");
1291        assert_eq!(col2.value(3), "5");
1292        assert_eq!(col2.value(4), "4e0");
1293        assert_eq!(col2.value(5), "7");
1294
1295        let col3 = batches[0].column(2).as_string::<i32>();
1296        assert_eq!(col3.null_count(), 4);
1297        assert_eq!(col3.value(0), "true");
1298        assert_eq!(col3.value(1), "false");
1299        assert!(col3.is_null(2));
1300        assert!(col3.is_null(3));
1301        assert!(col3.is_null(4));
1302        assert!(col3.is_null(5));
1303    }
1304
1305    fn test_decimal<T: DecimalType>(data_type: DataType) {
1306        let buf = r#"
1307        {"a": 1, "b": 2, "c": 38.30}
1308        {"a": 2, "b": 4, "c": 123.456}
1309
1310        {"b": 1337, "a": "2.0452"}
1311        {"b": "5", "a": "11034.2"}
1312        {"b": 40}
1313        {"b": 1234, "a": null}
1314        "#;
1315
1316        let schema = Arc::new(Schema::new(vec![
1317            Field::new("a", data_type.clone(), true),
1318            Field::new("b", data_type.clone(), true),
1319            Field::new("c", data_type, true),
1320        ]));
1321
1322        let batches = do_read(buf, 1024, true, false, schema);
1323        assert_eq!(batches.len(), 1);
1324
1325        let col1 = batches[0].column(0).as_primitive::<T>();
1326        assert_eq!(col1.null_count(), 2);
1327        assert!(col1.is_null(4));
1328        assert!(col1.is_null(5));
1329        assert_eq!(
1330            col1.values(),
1331            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1332        );
1333
1334        let col2 = batches[0].column(1).as_primitive::<T>();
1335        assert_eq!(col2.null_count(), 0);
1336        assert_eq!(
1337            col2.values(),
1338            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1339        );
1340
1341        let col3 = batches[0].column(2).as_primitive::<T>();
1342        assert_eq!(col3.null_count(), 4);
1343        assert!(!col3.is_null(0));
1344        assert!(!col3.is_null(1));
1345        assert!(col3.is_null(2));
1346        assert!(col3.is_null(3));
1347        assert!(col3.is_null(4));
1348        assert!(col3.is_null(5));
1349        assert_eq!(
1350            col3.values(),
1351            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1352        );
1353    }
1354
1355    #[test]
1356    fn test_decimals() {
1357        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1358        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1359        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1360        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1361    }
1362
1363    fn test_timestamp<T: ArrowTimestampType>() {
1364        let buf = r#"
1365        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1366        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1367
1368        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1369        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1370        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1371        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1372        "#;
1373
1374        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1375        let schema = Arc::new(Schema::new(vec![
1376            Field::new("a", T::DATA_TYPE, true),
1377            Field::new("b", T::DATA_TYPE, true),
1378            Field::new("c", T::DATA_TYPE, true),
1379            Field::new("d", with_timezone, true),
1380        ]));
1381
1382        let batches = do_read(buf, 1024, true, false, schema);
1383        assert_eq!(batches.len(), 1);
1384
1385        let unit_in_nanos: i64 = match T::UNIT {
1386            TimeUnit::Second => 1_000_000_000,
1387            TimeUnit::Millisecond => 1_000_000,
1388            TimeUnit::Microsecond => 1_000,
1389            TimeUnit::Nanosecond => 1,
1390        };
1391
1392        let col1 = batches[0].column(0).as_primitive::<T>();
1393        assert_eq!(col1.null_count(), 4);
1394        assert!(col1.is_null(2));
1395        assert!(col1.is_null(3));
1396        assert!(col1.is_null(4));
1397        assert!(col1.is_null(5));
1398        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1399
1400        let col2 = batches[0].column(1).as_primitive::<T>();
1401        assert_eq!(col2.null_count(), 1);
1402        assert!(col2.is_null(5));
1403        assert_eq!(
1404            col2.values(),
1405            &[
1406                1599572549190855000 / unit_in_nanos,
1407                1599572549190855000 / unit_in_nanos,
1408                1599572549000000000 / unit_in_nanos,
1409                40,
1410                1234,
1411                0
1412            ]
1413        );
1414
1415        let col3 = batches[0].column(2).as_primitive::<T>();
1416        assert_eq!(col3.null_count(), 0);
1417        assert_eq!(
1418            col3.values(),
1419            &[
1420                38,
1421                123,
1422                854702816123000000 / unit_in_nanos,
1423                1599572549190855000 / unit_in_nanos,
1424                854702816123000000 / unit_in_nanos,
1425                854738816123000000 / unit_in_nanos
1426            ]
1427        );
1428
1429        let col4 = batches[0].column(3).as_primitive::<T>();
1430
1431        assert_eq!(col4.null_count(), 0);
1432        assert_eq!(
1433            col4.values(),
1434            &[
1435                854674016123000000 / unit_in_nanos,
1436                123,
1437                854702816123000000 / unit_in_nanos,
1438                854720816123000000 / unit_in_nanos,
1439                854674016000000000 / unit_in_nanos,
1440                854640000000000000 / unit_in_nanos
1441            ]
1442        );
1443    }
1444
1445    #[test]
1446    fn test_timestamps() {
1447        test_timestamp::<TimestampSecondType>();
1448        test_timestamp::<TimestampMillisecondType>();
1449        test_timestamp::<TimestampMicrosecondType>();
1450        test_timestamp::<TimestampNanosecondType>();
1451    }
1452
1453    fn test_time<T: ArrowTemporalType>() {
1454        let buf = r#"
1455        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1456        {"a": 2, "b": "23:59:59", "c": 123.456}
1457
1458        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1459        {"b": 40, "c": "13:42:29.190855"}
1460        {"b": 1234, "a": null, "c": "09:26:56.123"}
1461        {"c": "14:26:56.123"}
1462        "#;
1463
1464        let unit = match T::DATA_TYPE {
1465            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1466            _ => unreachable!(),
1467        };
1468
1469        let unit_in_nanos = match unit {
1470            TimeUnit::Second => 1_000_000_000,
1471            TimeUnit::Millisecond => 1_000_000,
1472            TimeUnit::Microsecond => 1_000,
1473            TimeUnit::Nanosecond => 1,
1474        };
1475
1476        let schema = Arc::new(Schema::new(vec![
1477            Field::new("a", T::DATA_TYPE, true),
1478            Field::new("b", T::DATA_TYPE, true),
1479            Field::new("c", T::DATA_TYPE, true),
1480        ]));
1481
1482        let batches = do_read(buf, 1024, true, false, schema);
1483        assert_eq!(batches.len(), 1);
1484
1485        let col1 = batches[0].column(0).as_primitive::<T>();
1486        assert_eq!(col1.null_count(), 4);
1487        assert!(col1.is_null(2));
1488        assert!(col1.is_null(3));
1489        assert!(col1.is_null(4));
1490        assert!(col1.is_null(5));
1491        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1492
1493        let col2 = batches[0].column(1).as_primitive::<T>();
1494        assert_eq!(col2.null_count(), 1);
1495        assert!(col2.is_null(5));
1496        assert_eq!(
1497            col2.values(),
1498            &[
1499                34016123000000 / unit_in_nanos,
1500                86399000000000 / unit_in_nanos,
1501                64800000000000 / unit_in_nanos,
1502                40,
1503                1234,
1504                0
1505            ]
1506            .map(T::Native::usize_as)
1507        );
1508
1509        let col3 = batches[0].column(2).as_primitive::<T>();
1510        assert_eq!(col3.null_count(), 0);
1511        assert_eq!(
1512            col3.values(),
1513            &[
1514                38,
1515                123,
1516                34016123000000 / unit_in_nanos,
1517                49349190855000 / unit_in_nanos,
1518                34016123000000 / unit_in_nanos,
1519                52016123000000 / unit_in_nanos
1520            ]
1521            .map(T::Native::usize_as)
1522        );
1523    }
1524
1525    #[test]
1526    fn test_times() {
1527        test_time::<Time32MillisecondType>();
1528        test_time::<Time32SecondType>();
1529        test_time::<Time64MicrosecondType>();
1530        test_time::<Time64NanosecondType>();
1531    }
1532
1533    fn test_duration<T: ArrowTemporalType>() {
1534        let buf = r#"
1535        {"a": 1, "b": "2"}
1536        {"a": 3, "b": null}
1537        "#;
1538
1539        let schema = Arc::new(Schema::new(vec![
1540            Field::new("a", T::DATA_TYPE, true),
1541            Field::new("b", T::DATA_TYPE, true),
1542        ]));
1543
1544        let batches = do_read(buf, 1024, true, false, schema);
1545        assert_eq!(batches.len(), 1);
1546
1547        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1548        assert_eq!(col_a.null_count(), 0);
1549        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1550
1551        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1552        assert_eq!(col2.null_count(), 1);
1553        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1554    }
1555
1556    #[test]
1557    fn test_durations() {
1558        test_duration::<DurationNanosecondType>();
1559        test_duration::<DurationMicrosecondType>();
1560        test_duration::<DurationMillisecondType>();
1561        test_duration::<DurationSecondType>();
1562    }
1563
1564    #[test]
1565    fn test_delta_checkpoint() {
1566        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1567        let schema = Arc::new(Schema::new(vec![
1568            Field::new_struct(
1569                "protocol",
1570                vec![
1571                    Field::new("minReaderVersion", DataType::Int32, true),
1572                    Field::new("minWriterVersion", DataType::Int32, true),
1573                ],
1574                true,
1575            ),
1576            Field::new_struct(
1577                "add",
1578                vec![Field::new_map(
1579                    "partitionValues",
1580                    "key_value",
1581                    Field::new("key", DataType::Utf8, false),
1582                    Field::new("value", DataType::Utf8, true),
1583                    false,
1584                    false,
1585                )],
1586                true,
1587            ),
1588        ]));
1589
1590        let batches = do_read(json, 1024, true, false, schema);
1591        assert_eq!(batches.len(), 1);
1592
1593        let s: StructArray = batches.into_iter().next().unwrap().into();
1594        let opts = FormatOptions::default().with_null("null");
1595        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1596        assert_eq!(
1597            formatter.value(0).to_string(),
1598            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1599        );
1600    }
1601
1602    #[test]
1603    fn struct_nullability() {
1604        let do_test = |child: DataType| {
1605            // Test correctly enforced nullability
1606            let non_null = r#"{"foo": {}}"#;
1607            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1608                "foo",
1609                vec![Field::new("bar", child, false)],
1610                true,
1611            )]));
1612            let mut reader = ReaderBuilder::new(schema.clone())
1613                .build(Cursor::new(non_null.as_bytes()))
1614                .unwrap();
1615            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1616
1617            let null = r#"{"foo": {bar: null}}"#;
1618            let mut reader = ReaderBuilder::new(schema.clone())
1619                .build(Cursor::new(null.as_bytes()))
1620                .unwrap();
1621            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1622
1623            // Test nulls in nullable parent can mask nulls in non-nullable child
1624            let null = r#"{"foo": null}"#;
1625            let mut reader = ReaderBuilder::new(schema)
1626                .build(Cursor::new(null.as_bytes()))
1627                .unwrap();
1628            let batch = reader.next().unwrap().unwrap();
1629            assert_eq!(batch.num_columns(), 1);
1630            let foo = batch.column(0).as_struct();
1631            assert_eq!(foo.len(), 1);
1632            assert!(foo.is_null(0));
1633            assert_eq!(foo.num_columns(), 1);
1634
1635            let bar = foo.column(0);
1636            assert_eq!(bar.len(), 1);
1637            // Non-nullable child can still contain null as masked by parent
1638            assert!(bar.is_null(0));
1639        };
1640
1641        do_test(DataType::Boolean);
1642        do_test(DataType::Int32);
1643        do_test(DataType::Utf8);
1644        do_test(DataType::Decimal128(2, 1));
1645        do_test(DataType::Timestamp(
1646            TimeUnit::Microsecond,
1647            Some("+00:00".into()),
1648        ));
1649    }
1650
1651    #[test]
1652    fn test_truncation() {
1653        let buf = r#"
1654        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1655        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1656        {"i64": -9223372036854775808, "u64": 0 }
1657        {"i64": "-9223372036854775808", "u64": 0 }
1658        "#;
1659
1660        let schema = Arc::new(Schema::new(vec![
1661            Field::new("i64", DataType::Int64, true),
1662            Field::new("u64", DataType::UInt64, true),
1663        ]));
1664
1665        let batches = do_read(buf, 1024, true, false, schema);
1666        assert_eq!(batches.len(), 1);
1667
1668        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1669        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1670
1671        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1672        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1673    }
1674
1675    #[test]
1676    fn test_timestamp_truncation() {
1677        let buf = r#"
1678        {"time": 9223372036854775807 }
1679        {"time": -9223372036854775808 }
1680        {"time": 9e5 }
1681        "#;
1682
1683        let schema = Arc::new(Schema::new(vec![Field::new(
1684            "time",
1685            DataType::Timestamp(TimeUnit::Nanosecond, None),
1686            true,
1687        )]));
1688
1689        let batches = do_read(buf, 1024, true, false, schema);
1690        assert_eq!(batches.len(), 1);
1691
1692        let i64 = batches[0]
1693            .column(0)
1694            .as_primitive::<TimestampNanosecondType>();
1695        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1696    }
1697
1698    #[test]
1699    fn test_strict_mode_no_missing_columns_in_schema() {
1700        let buf = r#"
1701        {"a": 1, "b": "2", "c": true}
1702        {"a": 2E0, "b": "4", "c": false}
1703        "#;
1704
1705        let schema = Arc::new(Schema::new(vec![
1706            Field::new("a", DataType::Int16, false),
1707            Field::new("b", DataType::Utf8, false),
1708            Field::new("c", DataType::Boolean, false),
1709        ]));
1710
1711        let batches = do_read(buf, 1024, true, true, schema);
1712        assert_eq!(batches.len(), 1);
1713
1714        let buf = r#"
1715        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1716        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1717        "#;
1718
1719        let schema = Arc::new(Schema::new(vec![
1720            Field::new("a", DataType::Int16, false),
1721            Field::new("b", DataType::Utf8, false),
1722            Field::new_struct(
1723                "c",
1724                vec![
1725                    Field::new("a", DataType::Boolean, false),
1726                    Field::new("b", DataType::Int16, false),
1727                ],
1728                false,
1729            ),
1730        ]));
1731
1732        let batches = do_read(buf, 1024, true, true, schema);
1733        assert_eq!(batches.len(), 1);
1734    }
1735
1736    #[test]
1737    fn test_strict_mode_missing_columns_in_schema() {
1738        let buf = r#"
1739        {"a": 1, "b": "2", "c": true}
1740        {"a": 2E0, "b": "4", "c": false}
1741        "#;
1742
1743        let schema = Arc::new(Schema::new(vec![
1744            Field::new("a", DataType::Int16, true),
1745            Field::new("c", DataType::Boolean, true),
1746        ]));
1747
1748        let err = ReaderBuilder::new(schema)
1749            .with_batch_size(1024)
1750            .with_strict_mode(true)
1751            .build(Cursor::new(buf.as_bytes()))
1752            .unwrap()
1753            .read()
1754            .unwrap_err();
1755
1756        assert_eq!(
1757            err.to_string(),
1758            "Json error: column 'b' missing from schema"
1759        );
1760
1761        let buf = r#"
1762        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1763        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1764        "#;
1765
1766        let schema = Arc::new(Schema::new(vec![
1767            Field::new("a", DataType::Int16, false),
1768            Field::new("b", DataType::Utf8, false),
1769            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1770        ]));
1771
1772        let err = ReaderBuilder::new(schema)
1773            .with_batch_size(1024)
1774            .with_strict_mode(true)
1775            .build(Cursor::new(buf.as_bytes()))
1776            .unwrap()
1777            .read()
1778            .unwrap_err();
1779
1780        assert_eq!(
1781            err.to_string(),
1782            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1783        );
1784    }
1785
1786    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1787        let file = File::open(path).unwrap();
1788        let mut reader = BufReader::new(file);
1789        let schema = schema.unwrap_or_else(|| {
1790            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1791            reader.rewind().unwrap();
1792            schema
1793        });
1794        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1795        builder.build(reader).unwrap()
1796    }
1797
1798    #[test]
1799    fn test_json_basic() {
1800        let mut reader = read_file("test/data/basic.json", None);
1801        let batch = reader.next().unwrap().unwrap();
1802
1803        assert_eq!(8, batch.num_columns());
1804        assert_eq!(12, batch.num_rows());
1805
1806        let schema = reader.schema();
1807        let batch_schema = batch.schema();
1808        assert_eq!(schema, batch_schema);
1809
1810        let a = schema.column_with_name("a").unwrap();
1811        assert_eq!(0, a.0);
1812        assert_eq!(&DataType::Int64, a.1.data_type());
1813        let b = schema.column_with_name("b").unwrap();
1814        assert_eq!(1, b.0);
1815        assert_eq!(&DataType::Float64, b.1.data_type());
1816        let c = schema.column_with_name("c").unwrap();
1817        assert_eq!(2, c.0);
1818        assert_eq!(&DataType::Boolean, c.1.data_type());
1819        let d = schema.column_with_name("d").unwrap();
1820        assert_eq!(3, d.0);
1821        assert_eq!(&DataType::Utf8, d.1.data_type());
1822
1823        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1824        assert_eq!(1, aa.value(0));
1825        assert_eq!(-10, aa.value(1));
1826        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1827        assert_eq!(2.0, bb.value(0));
1828        assert_eq!(-3.5, bb.value(1));
1829        let cc = batch.column(c.0).as_boolean();
1830        assert!(!cc.value(0));
1831        assert!(cc.value(10));
1832        let dd = batch.column(d.0).as_string::<i32>();
1833        assert_eq!("4", dd.value(0));
1834        assert_eq!("text", dd.value(8));
1835    }
1836
1837    #[test]
1838    fn test_json_empty_projection() {
1839        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1840        let batch = reader.next().unwrap().unwrap();
1841
1842        assert_eq!(0, batch.num_columns());
1843        assert_eq!(12, batch.num_rows());
1844    }
1845
1846    #[test]
1847    fn test_json_basic_with_nulls() {
1848        let mut reader = read_file("test/data/basic_nulls.json", None);
1849        let batch = reader.next().unwrap().unwrap();
1850
1851        assert_eq!(4, batch.num_columns());
1852        assert_eq!(12, batch.num_rows());
1853
1854        let schema = reader.schema();
1855        let batch_schema = batch.schema();
1856        assert_eq!(schema, batch_schema);
1857
1858        let a = schema.column_with_name("a").unwrap();
1859        assert_eq!(&DataType::Int64, a.1.data_type());
1860        let b = schema.column_with_name("b").unwrap();
1861        assert_eq!(&DataType::Float64, b.1.data_type());
1862        let c = schema.column_with_name("c").unwrap();
1863        assert_eq!(&DataType::Boolean, c.1.data_type());
1864        let d = schema.column_with_name("d").unwrap();
1865        assert_eq!(&DataType::Utf8, d.1.data_type());
1866
1867        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1868        assert!(aa.is_valid(0));
1869        assert!(!aa.is_valid(1));
1870        assert!(!aa.is_valid(11));
1871        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1872        assert!(bb.is_valid(0));
1873        assert!(!bb.is_valid(2));
1874        assert!(!bb.is_valid(11));
1875        let cc = batch.column(c.0).as_boolean();
1876        assert!(cc.is_valid(0));
1877        assert!(!cc.is_valid(4));
1878        assert!(!cc.is_valid(11));
1879        let dd = batch.column(d.0).as_string::<i32>();
1880        assert!(!dd.is_valid(0));
1881        assert!(dd.is_valid(1));
1882        assert!(!dd.is_valid(4));
1883        assert!(!dd.is_valid(11));
1884    }
1885
1886    #[test]
1887    fn test_json_basic_schema() {
1888        let schema = Schema::new(vec![
1889            Field::new("a", DataType::Int64, true),
1890            Field::new("b", DataType::Float32, false),
1891            Field::new("c", DataType::Boolean, false),
1892            Field::new("d", DataType::Utf8, false),
1893        ]);
1894
1895        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1896        let reader_schema = reader.schema();
1897        assert_eq!(reader_schema.as_ref(), &schema);
1898        let batch = reader.next().unwrap().unwrap();
1899
1900        assert_eq!(4, batch.num_columns());
1901        assert_eq!(12, batch.num_rows());
1902
1903        let schema = batch.schema();
1904
1905        let a = schema.column_with_name("a").unwrap();
1906        assert_eq!(&DataType::Int64, a.1.data_type());
1907        let b = schema.column_with_name("b").unwrap();
1908        assert_eq!(&DataType::Float32, b.1.data_type());
1909        let c = schema.column_with_name("c").unwrap();
1910        assert_eq!(&DataType::Boolean, c.1.data_type());
1911        let d = schema.column_with_name("d").unwrap();
1912        assert_eq!(&DataType::Utf8, d.1.data_type());
1913
1914        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1915        assert_eq!(1, aa.value(0));
1916        assert_eq!(100000000000000, aa.value(11));
1917        let bb = batch.column(b.0).as_primitive::<Float32Type>();
1918        assert_eq!(2.0, bb.value(0));
1919        assert_eq!(-3.5, bb.value(1));
1920    }
1921
1922    #[test]
1923    fn test_json_basic_schema_projection() {
1924        let schema = Schema::new(vec![
1925            Field::new("a", DataType::Int64, true),
1926            Field::new("c", DataType::Boolean, false),
1927        ]);
1928
1929        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1930        let batch = reader.next().unwrap().unwrap();
1931
1932        assert_eq!(2, batch.num_columns());
1933        assert_eq!(2, batch.schema().fields().len());
1934        assert_eq!(12, batch.num_rows());
1935
1936        assert_eq!(batch.schema().as_ref(), &schema);
1937
1938        let a = schema.column_with_name("a").unwrap();
1939        assert_eq!(0, a.0);
1940        assert_eq!(&DataType::Int64, a.1.data_type());
1941        let c = schema.column_with_name("c").unwrap();
1942        assert_eq!(1, c.0);
1943        assert_eq!(&DataType::Boolean, c.1.data_type());
1944    }
1945
1946    #[test]
1947    fn test_json_arrays() {
1948        let mut reader = read_file("test/data/arrays.json", None);
1949        let batch = reader.next().unwrap().unwrap();
1950
1951        assert_eq!(4, batch.num_columns());
1952        assert_eq!(3, batch.num_rows());
1953
1954        let schema = batch.schema();
1955
1956        let a = schema.column_with_name("a").unwrap();
1957        assert_eq!(&DataType::Int64, a.1.data_type());
1958        let b = schema.column_with_name("b").unwrap();
1959        assert_eq!(
1960            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1961            b.1.data_type()
1962        );
1963        let c = schema.column_with_name("c").unwrap();
1964        assert_eq!(
1965            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1966            c.1.data_type()
1967        );
1968        let d = schema.column_with_name("d").unwrap();
1969        assert_eq!(&DataType::Utf8, d.1.data_type());
1970
1971        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1972        assert_eq!(1, aa.value(0));
1973        assert_eq!(-10, aa.value(1));
1974        assert_eq!(1627668684594000000, aa.value(2));
1975        let bb = batch.column(b.0).as_list::<i32>();
1976        let bb = bb.values().as_primitive::<Float64Type>();
1977        assert_eq!(9, bb.len());
1978        assert_eq!(2.0, bb.value(0));
1979        assert_eq!(-6.1, bb.value(5));
1980        assert!(!bb.is_valid(7));
1981
1982        let cc = batch
1983            .column(c.0)
1984            .as_any()
1985            .downcast_ref::<ListArray>()
1986            .unwrap();
1987        let cc = cc.values().as_boolean();
1988        assert_eq!(6, cc.len());
1989        assert!(!cc.value(0));
1990        assert!(!cc.value(4));
1991        assert!(!cc.is_valid(5));
1992    }
1993
1994    #[test]
1995    fn test_empty_json_arrays() {
1996        let json_content = r#"
1997            {"items": []}
1998            {"items": null}
1999            {}
2000            "#;
2001
2002        let schema = Arc::new(Schema::new(vec![Field::new(
2003            "items",
2004            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2005            true,
2006        )]));
2007
2008        let batches = do_read(json_content, 1024, false, false, schema);
2009        assert_eq!(batches.len(), 1);
2010
2011        let col1 = batches[0].column(0).as_list::<i32>();
2012        assert_eq!(col1.null_count(), 2);
2013        assert!(col1.value(0).is_empty());
2014        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2015        assert!(col1.is_null(1));
2016        assert!(col1.is_null(2));
2017    }
2018
2019    #[test]
2020    fn test_nested_empty_json_arrays() {
2021        let json_content = r#"
2022            {"items": [[],[]]}
2023            {"items": [[null, null],[null]]}
2024            "#;
2025
2026        let schema = Arc::new(Schema::new(vec![Field::new(
2027            "items",
2028            DataType::List(FieldRef::new(Field::new_list_field(
2029                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2030                true,
2031            ))),
2032            true,
2033        )]));
2034
2035        let batches = do_read(json_content, 1024, false, false, schema);
2036        assert_eq!(batches.len(), 1);
2037
2038        let col1 = batches[0].column(0).as_list::<i32>();
2039        assert_eq!(col1.null_count(), 0);
2040        assert_eq!(col1.value(0).len(), 2);
2041        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2042        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2043
2044        assert_eq!(col1.value(1).len(), 2);
2045        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2046        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2047    }
2048
2049    #[test]
2050    fn test_nested_list_json_arrays() {
2051        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2052        let a_struct_field = Field::new_struct(
2053            "a",
2054            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2055            true,
2056        );
2057        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2058        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2059        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2060        let json_content = r#"
2061        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2062        {"a": [{"b": false, "c": null}]}
2063        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2064        {"a": null}
2065        {"a": []}
2066        {"a": [null]}
2067        "#;
2068        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2069
2070        // build expected output
2071        let d = StringArray::from(vec![
2072            Some("a_text"),
2073            Some("b_text"),
2074            None,
2075            Some("c_text"),
2076            Some("d_text"),
2077            None,
2078            None,
2079        ]);
2080        let c = ArrayDataBuilder::new(c_field.data_type().clone())
2081            .len(7)
2082            .add_child_data(d.to_data())
2083            .null_bit_buffer(Some(Buffer::from([0b00111011])))
2084            .build()
2085            .unwrap();
2086        let b = BooleanArray::from(vec![
2087            Some(true),
2088            Some(false),
2089            Some(false),
2090            Some(true),
2091            None,
2092            Some(true),
2093            None,
2094        ]);
2095        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2096            .len(7)
2097            .add_child_data(b.to_data())
2098            .add_child_data(c.clone())
2099            .null_bit_buffer(Some(Buffer::from([0b00111111])))
2100            .build()
2101            .unwrap();
2102        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2103            .len(6)
2104            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2105            .add_child_data(a)
2106            .null_bit_buffer(Some(Buffer::from([0b00110111])))
2107            .build()
2108            .unwrap();
2109        let expected = make_array(a_list);
2110
2111        // compare `a` with result from json reader
2112        let batch = reader.next().unwrap().unwrap();
2113        let read = batch.column(0);
2114        assert_eq!(read.len(), 6);
2115        // compare the arrays the long way around, to better detect differences
2116        let read: &ListArray = read.as_list::<i32>();
2117        let expected = expected.as_list::<i32>();
2118        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2119        // compare list null buffers
2120        assert_eq!(read.nulls(), expected.nulls());
2121        // build struct from list
2122        let struct_array = read.values().as_struct();
2123        let expected_struct_array = expected.values().as_struct();
2124
2125        assert_eq!(7, struct_array.len());
2126        assert_eq!(1, struct_array.null_count());
2127        assert_eq!(7, expected_struct_array.len());
2128        assert_eq!(1, expected_struct_array.null_count());
2129        // test struct's nulls
2130        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2131        // test struct's fields
2132        let read_b = struct_array.column(0);
2133        assert_eq!(read_b.as_ref(), &b);
2134        let read_c = struct_array.column(1);
2135        assert_eq!(read_c.to_data(), c);
2136        let read_c = read_c.as_struct();
2137        let read_d = read_c.column(0);
2138        assert_eq!(read_d.as_ref(), &d);
2139
2140        assert_eq!(read, expected);
2141    }
2142
2143    #[test]
2144    fn test_skip_empty_lines() {
2145        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2146        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2147        let json_content = "
2148        {\"a\": 1}
2149        {\"a\": 2}
2150        {\"a\": 3}";
2151        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2152        let batch = reader.next().unwrap().unwrap();
2153
2154        assert_eq!(1, batch.num_columns());
2155        assert_eq!(3, batch.num_rows());
2156
2157        let schema = reader.schema();
2158        let c = schema.column_with_name("a").unwrap();
2159        assert_eq!(&DataType::Int64, c.1.data_type());
2160    }
2161
2162    #[test]
2163    fn test_with_multiple_batches() {
2164        let file = File::open("test/data/basic_nulls.json").unwrap();
2165        let mut reader = BufReader::new(file);
2166        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2167        reader.rewind().unwrap();
2168
2169        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2170        let mut reader = builder.build(reader).unwrap();
2171
2172        let mut num_records = Vec::new();
2173        while let Some(rb) = reader.next().transpose().unwrap() {
2174            num_records.push(rb.num_rows());
2175        }
2176
2177        assert_eq!(vec![5, 5, 2], num_records);
2178    }
2179
2180    #[test]
2181    fn test_timestamp_from_json_seconds() {
2182        let schema = Schema::new(vec![Field::new(
2183            "a",
2184            DataType::Timestamp(TimeUnit::Second, None),
2185            true,
2186        )]);
2187
2188        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2189        let batch = reader.next().unwrap().unwrap();
2190
2191        assert_eq!(1, batch.num_columns());
2192        assert_eq!(12, batch.num_rows());
2193
2194        let schema = reader.schema();
2195        let batch_schema = batch.schema();
2196        assert_eq!(schema, batch_schema);
2197
2198        let a = schema.column_with_name("a").unwrap();
2199        assert_eq!(
2200            &DataType::Timestamp(TimeUnit::Second, None),
2201            a.1.data_type()
2202        );
2203
2204        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2205        assert!(aa.is_valid(0));
2206        assert!(!aa.is_valid(1));
2207        assert!(!aa.is_valid(2));
2208        assert_eq!(1, aa.value(0));
2209        assert_eq!(1, aa.value(3));
2210        assert_eq!(5, aa.value(7));
2211    }
2212
2213    #[test]
2214    fn test_timestamp_from_json_milliseconds() {
2215        let schema = Schema::new(vec![Field::new(
2216            "a",
2217            DataType::Timestamp(TimeUnit::Millisecond, None),
2218            true,
2219        )]);
2220
2221        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2222        let batch = reader.next().unwrap().unwrap();
2223
2224        assert_eq!(1, batch.num_columns());
2225        assert_eq!(12, batch.num_rows());
2226
2227        let schema = reader.schema();
2228        let batch_schema = batch.schema();
2229        assert_eq!(schema, batch_schema);
2230
2231        let a = schema.column_with_name("a").unwrap();
2232        assert_eq!(
2233            &DataType::Timestamp(TimeUnit::Millisecond, None),
2234            a.1.data_type()
2235        );
2236
2237        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2238        assert!(aa.is_valid(0));
2239        assert!(!aa.is_valid(1));
2240        assert!(!aa.is_valid(2));
2241        assert_eq!(1, aa.value(0));
2242        assert_eq!(1, aa.value(3));
2243        assert_eq!(5, aa.value(7));
2244    }
2245
2246    #[test]
2247    fn test_date_from_json_milliseconds() {
2248        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2249
2250        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2251        let batch = reader.next().unwrap().unwrap();
2252
2253        assert_eq!(1, batch.num_columns());
2254        assert_eq!(12, batch.num_rows());
2255
2256        let schema = reader.schema();
2257        let batch_schema = batch.schema();
2258        assert_eq!(schema, batch_schema);
2259
2260        let a = schema.column_with_name("a").unwrap();
2261        assert_eq!(&DataType::Date64, a.1.data_type());
2262
2263        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2264        assert!(aa.is_valid(0));
2265        assert!(!aa.is_valid(1));
2266        assert!(!aa.is_valid(2));
2267        assert_eq!(1, aa.value(0));
2268        assert_eq!(1, aa.value(3));
2269        assert_eq!(5, aa.value(7));
2270    }
2271
2272    #[test]
2273    fn test_time_from_json_nanoseconds() {
2274        let schema = Schema::new(vec![Field::new(
2275            "a",
2276            DataType::Time64(TimeUnit::Nanosecond),
2277            true,
2278        )]);
2279
2280        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2281        let batch = reader.next().unwrap().unwrap();
2282
2283        assert_eq!(1, batch.num_columns());
2284        assert_eq!(12, batch.num_rows());
2285
2286        let schema = reader.schema();
2287        let batch_schema = batch.schema();
2288        assert_eq!(schema, batch_schema);
2289
2290        let a = schema.column_with_name("a").unwrap();
2291        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2292
2293        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2294        assert!(aa.is_valid(0));
2295        assert!(!aa.is_valid(1));
2296        assert!(!aa.is_valid(2));
2297        assert_eq!(1, aa.value(0));
2298        assert_eq!(1, aa.value(3));
2299        assert_eq!(5, aa.value(7));
2300    }
2301
2302    #[test]
2303    fn test_json_iterator() {
2304        let file = File::open("test/data/basic.json").unwrap();
2305        let mut reader = BufReader::new(file);
2306        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2307        reader.rewind().unwrap();
2308
2309        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2310        let reader = builder.build(reader).unwrap();
2311        let schema = reader.schema();
2312        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2313
2314        let mut sum_num_rows = 0;
2315        let mut num_batches = 0;
2316        let mut sum_a = 0;
2317        for batch in reader {
2318            let batch = batch.unwrap();
2319            assert_eq!(8, batch.num_columns());
2320            sum_num_rows += batch.num_rows();
2321            num_batches += 1;
2322            let batch_schema = batch.schema();
2323            assert_eq!(schema, batch_schema);
2324            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2325            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2326        }
2327        assert_eq!(12, sum_num_rows);
2328        assert_eq!(3, num_batches);
2329        assert_eq!(100000000000011, sum_a);
2330    }
2331
2332    #[test]
2333    fn test_decoder_error() {
2334        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2335            "a",
2336            vec![Field::new("child", DataType::Int32, false)],
2337            true,
2338        )]));
2339
2340        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2341        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2342        assert!(decoder.tape_decoder.has_partial_row());
2343        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2344        let _ = decoder.flush().unwrap_err();
2345        assert!(decoder.tape_decoder.has_partial_row());
2346        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2347
2348        let parse_err = |s: &str| {
2349            ReaderBuilder::new(schema.clone())
2350                .build(Cursor::new(s.as_bytes()))
2351                .unwrap()
2352                .next()
2353                .unwrap()
2354                .unwrap_err()
2355                .to_string()
2356        };
2357
2358        let err = parse_err(r#"{"a": 123}"#);
2359        assert_eq!(
2360            err,
2361            "Json error: whilst decoding field 'a': expected { got 123"
2362        );
2363
2364        let err = parse_err(r#"{"a": ["bar"]}"#);
2365        assert_eq!(
2366            err,
2367            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2368        );
2369
2370        let err = parse_err(r#"{"a": []}"#);
2371        assert_eq!(
2372            err,
2373            "Json error: whilst decoding field 'a': expected { got []"
2374        );
2375
2376        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2377        assert_eq!(
2378            err,
2379            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2380        );
2381
2382        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2383        assert_eq!(
2384            err,
2385            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2386        );
2387
2388        let err = parse_err(r#"{"a": true}"#);
2389        assert_eq!(
2390            err,
2391            "Json error: whilst decoding field 'a': expected { got true"
2392        );
2393
2394        let err = parse_err(r#"{"a": false}"#);
2395        assert_eq!(
2396            err,
2397            "Json error: whilst decoding field 'a': expected { got false"
2398        );
2399
2400        let err = parse_err(r#"{"a": "foo"}"#);
2401        assert_eq!(
2402            err,
2403            "Json error: whilst decoding field 'a': expected { got \"foo\""
2404        );
2405
2406        let err = parse_err(r#"{"a": {"child": false}}"#);
2407        assert_eq!(
2408            err,
2409            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2410        );
2411
2412        let err = parse_err(r#"{"a": {"child": []}}"#);
2413        assert_eq!(
2414            err,
2415            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2416        );
2417
2418        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2419        assert_eq!(
2420            err,
2421            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2422        );
2423
2424        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2425        assert_eq!(
2426            err,
2427            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2428        );
2429    }
2430
2431    #[test]
2432    fn test_serialize_timestamp() {
2433        let json = vec![
2434            json!({"timestamp": 1681319393}),
2435            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2436        ];
2437        let schema = Schema::new(vec![Field::new(
2438            "timestamp",
2439            DataType::Timestamp(TimeUnit::Second, None),
2440            true,
2441        )]);
2442        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2443            .build_decoder()
2444            .unwrap();
2445        decoder.serialize(&json).unwrap();
2446        let batch = decoder.flush().unwrap().unwrap();
2447        assert_eq!(batch.num_rows(), 2);
2448        assert_eq!(batch.num_columns(), 1);
2449        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2450        assert_eq!(values.values(), &[1681319393, -7200]);
2451    }
2452
2453    #[test]
2454    fn test_serialize_decimal() {
2455        let json = vec![
2456            json!({"decimal": 1.234}),
2457            json!({"decimal": "1.234"}),
2458            json!({"decimal": 1234}),
2459            json!({"decimal": "1234"}),
2460        ];
2461        let schema = Schema::new(vec![Field::new(
2462            "decimal",
2463            DataType::Decimal128(10, 3),
2464            true,
2465        )]);
2466        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2467            .build_decoder()
2468            .unwrap();
2469        decoder.serialize(&json).unwrap();
2470        let batch = decoder.flush().unwrap().unwrap();
2471        assert_eq!(batch.num_rows(), 4);
2472        assert_eq!(batch.num_columns(), 1);
2473        let values = batch.column(0).as_primitive::<Decimal128Type>();
2474        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2475    }
2476
2477    #[test]
2478    fn test_serde_field() {
2479        let field = Field::new("int", DataType::Int32, true);
2480        let mut decoder = ReaderBuilder::new_with_field(field)
2481            .build_decoder()
2482            .unwrap();
2483        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2484        let b = decoder.flush().unwrap().unwrap();
2485        let values = b.column(0).as_primitive::<Int32Type>().values();
2486        assert_eq!(values, &[1, 2, 3, 4]);
2487    }
2488
2489    #[test]
2490    fn test_serde_large_numbers() {
2491        let field = Field::new("int", DataType::Int64, true);
2492        let mut decoder = ReaderBuilder::new_with_field(field)
2493            .build_decoder()
2494            .unwrap();
2495
2496        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2497        let b = decoder.flush().unwrap().unwrap();
2498        let values = b.column(0).as_primitive::<Int64Type>().values();
2499        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2500
2501        let field = Field::new(
2502            "int",
2503            DataType::Timestamp(TimeUnit::Microsecond, None),
2504            true,
2505        );
2506        let mut decoder = ReaderBuilder::new_with_field(field)
2507            .build_decoder()
2508            .unwrap();
2509
2510        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2511        let b = decoder.flush().unwrap().unwrap();
2512        let values = b
2513            .column(0)
2514            .as_primitive::<TimestampMicrosecondType>()
2515            .values();
2516        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2517    }
2518
2519    #[test]
2520    fn test_coercing_primitive_into_string_decoder() {
2521        let buf = &format!(
2522            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2523            (i32::MAX as i64 + 10),
2524            i64::MAX - 10
2525        );
2526        let schema = Schema::new(vec![
2527            Field::new("a", DataType::Float64, true),
2528            Field::new("b", DataType::Utf8, true),
2529            Field::new("c", DataType::Utf8, true),
2530        ]);
2531        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2532        let schema_ref = Arc::new(schema);
2533
2534        // read record batches
2535        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2536        let mut decoder = reader.build_decoder().unwrap();
2537        decoder.serialize(json_array.as_slice()).unwrap();
2538        let batch = decoder.flush().unwrap().unwrap();
2539        assert_eq!(
2540            batch,
2541            RecordBatch::try_new(
2542                schema_ref,
2543                vec![
2544                    Arc::new(Float64Array::from(vec![
2545                        1.0,
2546                        2.0,
2547                        (i32::MAX as i64 + 10) as f64,
2548                        (i64::MAX - 10) as f64
2549                    ])),
2550                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2551                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2552                ]
2553            )
2554            .unwrap()
2555        );
2556    }
2557
2558    // Parse the given `row` in `struct_mode` as a type given by fields.
2559    //
2560    // If as_struct == true, wrap the fields in a Struct field with name "r".
2561    // If as_struct == false, wrap the fields in a Schema.
2562    fn _parse_structs(
2563        row: &str,
2564        struct_mode: StructMode,
2565        fields: Fields,
2566        as_struct: bool,
2567    ) -> Result<RecordBatch, ArrowError> {
2568        let builder = if as_struct {
2569            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2570        } else {
2571            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2572        };
2573        builder
2574            .with_struct_mode(struct_mode)
2575            .build(Cursor::new(row.as_bytes()))
2576            .unwrap()
2577            .next()
2578            .unwrap()
2579    }
2580
2581    #[test]
2582    fn test_struct_decoding_list_length() {
2583        use arrow_array::array;
2584
2585        let row = "[1, 2]";
2586
2587        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2588        let too_few_fields = Fields::from(fields.clone());
2589        fields.push(Field::new("b", DataType::Int32, true));
2590        let correct_fields = Fields::from(fields.clone());
2591        fields.push(Field::new("c", DataType::Int32, true));
2592        let too_many_fields = Fields::from(fields.clone());
2593
2594        let parse = |fields: Fields, as_struct: bool| {
2595            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2596        };
2597
2598        let expected_row = StructArray::new(
2599            correct_fields.clone(),
2600            vec![
2601                Arc::new(array::Int32Array::from(vec![1])),
2602                Arc::new(array::Int32Array::from(vec![2])),
2603            ],
2604            None,
2605        );
2606        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2607
2608        assert_eq!(
2609            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2610            "Json error: found extra columns for 1 fields".to_string()
2611        );
2612        assert_eq!(
2613            parse(too_few_fields, false).unwrap_err().to_string(),
2614            "Json error: found extra columns for 1 fields".to_string()
2615        );
2616        assert_eq!(
2617            parse(correct_fields.clone(), true).unwrap(),
2618            RecordBatch::try_new(
2619                Arc::new(Schema::new(vec![row_field])),
2620                vec![Arc::new(expected_row.clone())]
2621            )
2622            .unwrap()
2623        );
2624        assert_eq!(
2625            parse(correct_fields, false).unwrap(),
2626            RecordBatch::from(expected_row)
2627        );
2628        assert_eq!(
2629            parse(too_many_fields.clone(), true)
2630                .unwrap_err()
2631                .to_string(),
2632            "Json error: found 2 columns for 3 fields".to_string()
2633        );
2634        assert_eq!(
2635            parse(too_many_fields, false).unwrap_err().to_string(),
2636            "Json error: found 2 columns for 3 fields".to_string()
2637        );
2638    }
2639
2640    #[test]
2641    fn test_struct_decoding() {
2642        use arrow_array::builder;
2643
2644        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2645        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2646        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2647
2648        let struct_fields = Fields::from(vec![
2649            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2650            Field::new_map(
2651                "c",
2652                "entries",
2653                Field::new("keys", DataType::Utf8, false),
2654                Field::new("values", DataType::Int32, true),
2655                false,
2656                false,
2657            ),
2658        ]);
2659
2660        let list_array =
2661            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2662
2663        let map_array = {
2664            let mut map_builder = builder::MapBuilder::new(
2665                None,
2666                builder::StringBuilder::new(),
2667                builder::Int32Builder::new(),
2668            );
2669            map_builder.keys().append_value("d");
2670            map_builder.values().append_value(3);
2671            map_builder.append(true).unwrap();
2672            map_builder.finish()
2673        };
2674
2675        let struct_array = StructArray::new(
2676            struct_fields.clone(),
2677            vec![Arc::new(list_array), Arc::new(map_array)],
2678            None,
2679        );
2680
2681        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2682        let schema = Arc::new(Schema::new(fields.clone()));
2683        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2684
2685        let parse = |row: &str, struct_mode: StructMode| {
2686            _parse_structs(row, struct_mode, fields.clone(), false)
2687        };
2688
2689        assert_eq!(
2690            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2691            expected
2692        );
2693        assert_eq!(
2694            parse(nested_list_json, StructMode::ObjectOnly)
2695                .unwrap_err()
2696                .to_string(),
2697            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2698        );
2699        assert_eq!(
2700            parse(nested_mixed_json, StructMode::ObjectOnly)
2701                .unwrap_err()
2702                .to_string(),
2703            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2704        );
2705
2706        assert_eq!(
2707            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2708            expected
2709        );
2710        assert_eq!(
2711            parse(nested_object_json, StructMode::ListOnly)
2712                .unwrap_err()
2713                .to_string(),
2714            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2715        );
2716        assert_eq!(
2717            parse(nested_mixed_json, StructMode::ListOnly)
2718                .unwrap_err()
2719                .to_string(),
2720            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2721        );
2722    }
2723
2724    // Test cases:
2725    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2726    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2727    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2728    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2729    #[test]
2730    fn test_struct_decoding_empty_list() {
2731        let int_field = Field::new("a", DataType::Int32, true);
2732        let struct_field = Field::new(
2733            "r",
2734            DataType::Struct(Fields::from(vec![int_field.clone()])),
2735            true,
2736        );
2737
2738        let parse = |row: &str, as_struct: bool, field: Field| {
2739            _parse_structs(
2740                row,
2741                StructMode::ListOnly,
2742                Fields::from(vec![field]),
2743                as_struct,
2744            )
2745        };
2746
2747        // Missing fields
2748        assert_eq!(
2749            parse("[]", true, struct_field.clone())
2750                .unwrap_err()
2751                .to_string(),
2752            "Json error: found 0 columns for 1 fields".to_owned()
2753        );
2754        assert_eq!(
2755            parse("[]", false, int_field.clone())
2756                .unwrap_err()
2757                .to_string(),
2758            "Json error: found 0 columns for 1 fields".to_owned()
2759        );
2760        assert_eq!(
2761            parse("[]", false, struct_field.clone())
2762                .unwrap_err()
2763                .to_string(),
2764            "Json error: found 0 columns for 1 fields".to_owned()
2765        );
2766        assert_eq!(
2767            parse("[[]]", false, struct_field.clone())
2768                .unwrap_err()
2769                .to_string(),
2770            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2771        );
2772    }
2773
2774    #[test]
2775    fn test_decode_list_struct_with_wrong_types() {
2776        let int_field = Field::new("a", DataType::Int32, true);
2777        let struct_field = Field::new(
2778            "r",
2779            DataType::Struct(Fields::from(vec![int_field.clone()])),
2780            true,
2781        );
2782
2783        let parse = |row: &str, as_struct: bool, field: Field| {
2784            _parse_structs(
2785                row,
2786                StructMode::ListOnly,
2787                Fields::from(vec![field]),
2788                as_struct,
2789            )
2790        };
2791
2792        // Wrong values
2793        assert_eq!(
2794            parse(r#"[["a"]]"#, false, struct_field.clone())
2795                .unwrap_err()
2796                .to_string(),
2797            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2798        );
2799        assert_eq!(
2800            parse(r#"[["a"]]"#, true, struct_field.clone())
2801                .unwrap_err()
2802                .to_string(),
2803            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2804        );
2805        assert_eq!(
2806            parse(r#"["a"]"#, true, int_field.clone())
2807                .unwrap_err()
2808                .to_string(),
2809            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2810        );
2811        assert_eq!(
2812            parse(r#"["a"]"#, false, int_field.clone())
2813                .unwrap_err()
2814                .to_string(),
2815            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2816        );
2817    }
2818}