arrow_json/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # JSON Writer
19//!
20//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
21//! JSON objects or JSON formatted byte streams.
22//!
23//! ## Writing JSON formatted byte streams
24//!
25//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
26//! [`LineDelimitedWriter`]:
27//!
28//! ```
29//! # use std::sync::Arc;
30//! # use arrow_array::{Int32Array, RecordBatch};
31//! # use arrow_schema::{DataType, Field, Schema};
32//!
33//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
34//! let a = Int32Array::from(vec![1, 2, 3]);
35//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
36//!
37//! // Write the record batch out as JSON
38//! let buf = Vec::new();
39//! let mut writer = arrow_json::LineDelimitedWriter::new(buf);
40//! writer.write_batches(&vec![&batch]).unwrap();
41//! writer.finish().unwrap();
42//!
43//! // Get the underlying buffer back,
44//! let buf = writer.into_inner();
45//! assert_eq!(r#"{"a":1}
46//! {"a":2}
47//! {"a":3}
48//!"#, String::from_utf8(buf).unwrap())
49//! ```
50//!
51//! To serialize [`RecordBatch`]es into a well formed JSON array, use
52//! [`ArrayWriter`]:
53//!
54//! ```
55//! # use std::sync::Arc;
56//! # use arrow_array::{Int32Array, RecordBatch};
57//! use arrow_schema::{DataType, Field, Schema};
58//!
59//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
60//! let a = Int32Array::from(vec![1, 2, 3]);
61//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
62//!
63//! // Write the record batch out as a JSON array
64//! let buf = Vec::new();
65//! let mut writer = arrow_json::ArrayWriter::new(buf);
66//! writer.write_batches(&vec![&batch]).unwrap();
67//! writer.finish().unwrap();
68//!
69//! // Get the underlying buffer back,
70//! let buf = writer.into_inner();
71//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
72//! ```
73//!
74//! [`LineDelimitedWriter`] and [`ArrayWriter`] will omit writing keys with null values.
75//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
76//! using a [`WriterBuilder`] to construct a [`Writer`].
77//!
78//! ## Writing to [serde_json] JSON Objects
79//!
80//! To serialize [`RecordBatch`]es into an array of
81//! [JSON](https://docs.serde.rs/serde_json/) objects you can reparse the resulting JSON string.
82//! Note that this is less efficient than using the `Writer` API.
83//!
84//! ```
85//! # use std::sync::Arc;
86//! # use arrow_array::{Int32Array, RecordBatch};
87//! # use arrow_schema::{DataType, Field, Schema};
88//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
89//! let a = Int32Array::from(vec![1, 2, 3]);
90//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
91//!
92//! // Write the record batch out as json bytes (string)
93//! let buf = Vec::new();
94//! let mut writer = arrow_json::ArrayWriter::new(buf);
95//! writer.write_batches(&vec![&batch]).unwrap();
96//! writer.finish().unwrap();
97//! let json_data = writer.into_inner();
98//!
99//! // Parse the string using serde_json
100//! use serde_json::{Map, Value};
101//! let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(json_data.as_slice()).unwrap();
102//! assert_eq!(
103//!     serde_json::Value::Object(json_rows[1].clone()),
104//!     serde_json::json!({"a": 2}),
105//! );
106//! ```
107mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullableEncoder};
116
117/// This trait defines how to format a sequence of JSON objects to a
118/// byte stream.
119pub trait JsonFormat: Debug + Default {
120    #[inline]
121    /// write any bytes needed at the start of the file to the writer
122    fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123        Ok(())
124    }
125
126    #[inline]
127    /// write any bytes needed for the start of each row
128    fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129        Ok(())
130    }
131
132    #[inline]
133    /// write any bytes needed for the end of each row
134    fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135        Ok(())
136    }
137
138    /// write any bytes needed for the start of each row
139    fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140        Ok(())
141    }
142}
143
144/// Produces JSON output with one record per line.
145///
146/// For example:
147///
148/// ```json
149/// {"foo":1}
150/// {"bar":1}
151///
152/// ```
153#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157    fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158        writer.write_all(b"\n")?;
159        Ok(())
160    }
161}
162
163/// Produces JSON output as a single JSON array.
164///
165/// For example:
166///
167/// ```json
168/// [{"foo":1},{"bar":1}]
169/// ```
170#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174    fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175        writer.write_all(b"[")?;
176        Ok(())
177    }
178
179    fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180        if !is_first_row {
181            writer.write_all(b",")?;
182        }
183        Ok(())
184    }
185
186    fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187        writer.write_all(b"]")?;
188        Ok(())
189    }
190}
191
192/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects.
193pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays.
196pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198/// JSON writer builder.
199#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203    /// Create a new builder for configuring JSON writing options.
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// # use arrow_json::{Writer, WriterBuilder};
209    /// # use arrow_json::writer::LineDelimited;
210    /// # use std::fs::File;
211    ///
212    /// fn example() -> Writer<File, LineDelimited> {
213    ///     let file = File::create("target/out.json").unwrap();
214    ///
215    ///     // create a builder that keeps keys with null values
216    ///     let builder = WriterBuilder::new().with_explicit_nulls(true);
217    ///     let writer = builder.build::<_, LineDelimited>(file);
218    ///
219    ///     writer
220    /// }
221    /// ```
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Returns `true` if this writer is configured to keep keys with null values.
227    pub fn explicit_nulls(&self) -> bool {
228        self.0.explicit_nulls()
229    }
230
231    /// Set whether to keep keys with null values, or to omit writing them.
232    ///
233    /// For example, with [`LineDelimited`] format:
234    ///
235    /// Skip nulls (set to `false`):
236    ///
237    /// ```json
238    /// {"foo":1}
239    /// {"foo":1,"bar":2}
240    /// {}
241    /// ```
242    ///
243    /// Keep nulls (set to `true`):
244    ///
245    /// ```json
246    /// {"foo":1,"bar":null}
247    /// {"foo":1,"bar":2}
248    /// {"foo":null,"bar":null}
249    /// ```
250    ///
251    /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`,
252    /// nulls will be written explicitly regardless of this setting.
253    pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254        self.0 = self.0.with_explicit_nulls(explicit_nulls);
255        self
256    }
257
258    /// Returns if this writer is configured to write structs as JSON Objects or Arrays.
259    pub fn struct_mode(&self) -> StructMode {
260        self.0.struct_mode()
261    }
262
263    /// Set the [`StructMode`] for the writer, which determines whether structs
264    /// are encoded to JSON as objects or lists. For more details refer to the
265    /// enum documentation. Default is to use `ObjectOnly`. If this is set to
266    /// `ListOnly`, nulls will be written explicitly regardless of the
267    /// `explicit_nulls` setting.
268    pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269        self.0 = self.0.with_struct_mode(struct_mode);
270        self
271    }
272
273    /// Set an encoder factory to use when creating encoders for writing JSON.
274    ///
275    /// This can be used to override how some types are encoded or to provide
276    /// a fallback for types that are not supported by the default encoder.
277    pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278        self.0 = self.0.with_encoder_factory(factory);
279        self
280    }
281
282    /// Create a new `Writer` with specified `JsonFormat` and builder options.
283    pub fn build<W, F>(self, writer: W) -> Writer<W, F>
284    where
285        W: Write,
286        F: JsonFormat,
287    {
288        Writer {
289            writer,
290            started: false,
291            finished: false,
292            format: F::default(),
293            options: self.0,
294        }
295    }
296}
297
298/// A JSON writer which serializes [`RecordBatch`]es to a stream of
299/// `u8` encoded JSON objects.
300///
301/// See the module level documentation for detailed usage and examples.
302/// The specific format of the stream is controlled by the [`JsonFormat`]
303/// type parameter.
304///
305/// By default the writer will skip writing keys with null values for
306/// backward compatibility. See [`WriterBuilder`] on how to customize
307/// this behaviour when creating a new writer.
308#[derive(Debug)]
309pub struct Writer<W, F>
310where
311    W: Write,
312    F: JsonFormat,
313{
314    /// Underlying writer to use to write bytes
315    writer: W,
316
317    /// Has the writer output any records yet?
318    started: bool,
319
320    /// Is the writer finished?
321    finished: bool,
322
323    /// Determines how the byte stream is formatted
324    format: F,
325
326    /// Controls how JSON should be encoded, e.g. whether to write explicit nulls or skip them
327    options: EncoderOptions,
328}
329
330impl<W, F> Writer<W, F>
331where
332    W: Write,
333    F: JsonFormat,
334{
335    /// Construct a new writer
336    pub fn new(writer: W) -> Self {
337        Self {
338            writer,
339            started: false,
340            finished: false,
341            format: F::default(),
342            options: EncoderOptions::default(),
343        }
344    }
345
346    /// Serialize `batch` to JSON output
347    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
348        if batch.num_rows() == 0 {
349            return Ok(());
350        }
351
352        // BufWriter uses a buffer size of 8KB
353        // We therefore double this and flush once we have more than 8KB
354        let mut buffer = Vec::with_capacity(16 * 1024);
355
356        let mut is_first_row = !self.started;
357        if !self.started {
358            self.format.start_stream(&mut buffer)?;
359            self.started = true;
360        }
361
362        let array = StructArray::from(batch.clone());
363        let field = Arc::new(Field::new_struct(
364            "",
365            batch.schema().fields().clone(),
366            false,
367        ));
368
369        let mut encoder = make_encoder(&field, &array, &self.options)?;
370
371        // Validate that the root is not nullable
372        assert!(!encoder.has_nulls(), "root cannot be nullable");
373        for idx in 0..batch.num_rows() {
374            self.format.start_row(&mut buffer, is_first_row)?;
375            is_first_row = false;
376
377            encoder.encode(idx, &mut buffer);
378            if buffer.len() > 8 * 1024 {
379                self.writer.write_all(&buffer)?;
380                buffer.clear();
381            }
382            self.format.end_row(&mut buffer)?;
383        }
384
385        if !buffer.is_empty() {
386            self.writer.write_all(&buffer)?;
387        }
388
389        Ok(())
390    }
391
392    /// Serialize `batches` to JSON output
393    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
394        for b in batches {
395            self.write(b)?;
396        }
397        Ok(())
398    }
399
400    /// Finishes the output stream. This function must be called after
401    /// all record batches have been produced. (e.g. producing the final `']'` if writing
402    /// arrays.
403    pub fn finish(&mut self) -> Result<(), ArrowError> {
404        if !self.started {
405            self.format.start_stream(&mut self.writer)?;
406            self.started = true;
407        }
408        if !self.finished {
409            self.format.end_stream(&mut self.writer)?;
410            self.finished = true;
411        }
412
413        Ok(())
414    }
415
416    /// Gets a reference to the underlying writer.
417    pub fn get_ref(&self) -> &W {
418        &self.writer
419    }
420
421    /// Gets a mutable reference to the underlying writer.
422    ///
423    /// Writing to the underlying writer must be done with care
424    /// to avoid corrupting the output JSON.
425    pub fn get_mut(&mut self) -> &mut W {
426        &mut self.writer
427    }
428
429    /// Unwraps this `Writer<W>`, returning the underlying writer
430    pub fn into_inner(self) -> W {
431        self.writer
432    }
433}
434
435impl<W, F> RecordBatchWriter for Writer<W, F>
436where
437    W: Write,
438    F: JsonFormat,
439{
440    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
441        self.write(batch)
442    }
443
444    fn close(mut self) -> Result<(), ArrowError> {
445        self.finish()
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use core::str;
452    use std::collections::HashMap;
453    use std::fs::{read_to_string, File};
454    use std::io::{BufReader, Seek};
455    use std::sync::Arc;
456
457    use arrow_array::cast::AsArray;
458    use serde_json::{json, Value};
459
460    use super::LineDelimited;
461    use super::{Encoder, WriterBuilder};
462    use arrow_array::builder::*;
463    use arrow_array::types::*;
464    use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice};
465    use arrow_data::ArrayData;
466
467    use crate::reader::*;
468
469    use super::*;
470
471    /// Asserts that the NDJSON `input` is semantically identical to `expected`
472    fn assert_json_eq(input: &[u8], expected: &str) {
473        let expected: Vec<Option<Value>> = expected
474            .split('\n')
475            .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
476            .collect();
477
478        let actual: Vec<Option<Value>> = input
479            .split(|b| *b == b'\n')
480            .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
481            .collect();
482
483        assert_eq!(actual, expected);
484    }
485
486    #[test]
487    fn write_simple_rows() {
488        let schema = Schema::new(vec![
489            Field::new("c1", DataType::Int32, true),
490            Field::new("c2", DataType::Utf8, true),
491        ]);
492
493        let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
494        let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
495
496        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
497
498        let mut buf = Vec::new();
499        {
500            let mut writer = LineDelimitedWriter::new(&mut buf);
501            writer.write_batches(&[&batch]).unwrap();
502        }
503
504        assert_json_eq(
505            &buf,
506            r#"{"c1":1,"c2":"a"}
507{"c1":2,"c2":"b"}
508{"c1":3,"c2":"c"}
509{"c2":"d"}
510{"c1":5}
511"#,
512        );
513    }
514
515    #[test]
516    fn write_large_utf8_and_utf8_view() {
517        let schema = Schema::new(vec![
518            Field::new("c1", DataType::Utf8, true),
519            Field::new("c2", DataType::LargeUtf8, true),
520            Field::new("c3", DataType::Utf8View, true),
521        ]);
522
523        let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
524        let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
525        let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
526
527        let batch = RecordBatch::try_new(
528            Arc::new(schema),
529            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
530        )
531        .unwrap();
532
533        let mut buf = Vec::new();
534        {
535            let mut writer = LineDelimitedWriter::new(&mut buf);
536            writer.write_batches(&[&batch]).unwrap();
537        }
538
539        assert_json_eq(
540            &buf,
541            r#"{"c1":"a","c2":"a","c3":"a"}
542{"c2":"b","c3":"b"}
543{"c1":"c"}
544{"c1":"d","c2":"d","c3":"d"}
545{}
546"#,
547        );
548    }
549
550    #[test]
551    fn write_dictionary() {
552        let schema = Schema::new(vec![
553            Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
554            Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
555        ]);
556
557        let a: DictionaryArray<Int32Type> = vec![
558            Some("cupcakes"),
559            Some("foo"),
560            Some("foo"),
561            None,
562            Some("cupcakes"),
563        ]
564        .into_iter()
565        .collect();
566        let b: DictionaryArray<Int8Type> =
567            vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
568                .into_iter()
569                .collect();
570
571        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
572
573        let mut buf = Vec::new();
574        {
575            let mut writer = LineDelimitedWriter::new(&mut buf);
576            writer.write_batches(&[&batch]).unwrap();
577        }
578
579        assert_json_eq(
580            &buf,
581            r#"{"c1":"cupcakes","c2":"sdsd"}
582{"c1":"foo","c2":"sdsd"}
583{"c1":"foo"}
584{"c2":"sd"}
585{"c1":"cupcakes","c2":"sdsd"}
586"#,
587        );
588    }
589
590    #[test]
591    fn write_list_of_dictionary() {
592        let dict_field = Arc::new(Field::new_dictionary(
593            "item",
594            DataType::Int32,
595            DataType::Utf8,
596            true,
597        ));
598        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
599
600        let dict_array: DictionaryArray<Int32Type> =
601            vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
602                .into_iter()
603                .collect();
604        let list_array = LargeListArray::try_new(
605            dict_field,
606            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
607            Arc::new(dict_array),
608            Some(NullBuffer::from_iter([true, true, false, true])),
609        )
610        .unwrap();
611
612        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
613
614        let mut buf = Vec::new();
615        {
616            let mut writer = LineDelimitedWriter::new(&mut buf);
617            writer.write_batches(&[&batch]).unwrap();
618        }
619
620        assert_json_eq(
621            &buf,
622            r#"{"l":["a","b","c"]}
623{"l":["a",null]}
624{}
625{"l":["c"]}
626"#,
627        );
628    }
629
630    #[test]
631    fn write_list_of_dictionary_large_values() {
632        let dict_field = Arc::new(Field::new_dictionary(
633            "item",
634            DataType::Int32,
635            DataType::LargeUtf8,
636            true,
637        ));
638        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
639
640        let keys = PrimitiveArray::<Int32Type>::from(vec![
641            Some(0),
642            Some(1),
643            Some(2),
644            Some(0),
645            None,
646            Some(2),
647        ]);
648        let values = LargeStringArray::from(vec!["a", "b", "c"]);
649        let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
650
651        let list_array = LargeListArray::try_new(
652            dict_field,
653            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
654            Arc::new(dict_array),
655            Some(NullBuffer::from_iter([true, true, false, true])),
656        )
657        .unwrap();
658
659        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
660
661        let mut buf = Vec::new();
662        {
663            let mut writer = LineDelimitedWriter::new(&mut buf);
664            writer.write_batches(&[&batch]).unwrap();
665        }
666
667        assert_json_eq(
668            &buf,
669            r#"{"l":["a","b","c"]}
670{"l":["a",null]}
671{}
672{"l":["c"]}
673"#,
674        );
675    }
676
677    #[test]
678    fn write_timestamps() {
679        let ts_string = "2018-11-13T17:11:10.011375885995";
680        let ts_nanos = ts_string
681            .parse::<chrono::NaiveDateTime>()
682            .unwrap()
683            .and_utc()
684            .timestamp_nanos_opt()
685            .unwrap();
686        let ts_micros = ts_nanos / 1000;
687        let ts_millis = ts_micros / 1000;
688        let ts_secs = ts_millis / 1000;
689
690        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
691        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
692        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
693        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
694        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
695
696        let schema = Schema::new(vec![
697            Field::new("nanos", arr_nanos.data_type().clone(), true),
698            Field::new("micros", arr_micros.data_type().clone(), true),
699            Field::new("millis", arr_millis.data_type().clone(), true),
700            Field::new("secs", arr_secs.data_type().clone(), true),
701            Field::new("name", arr_names.data_type().clone(), true),
702        ]);
703        let schema = Arc::new(schema);
704
705        let batch = RecordBatch::try_new(
706            schema,
707            vec![
708                Arc::new(arr_nanos),
709                Arc::new(arr_micros),
710                Arc::new(arr_millis),
711                Arc::new(arr_secs),
712                Arc::new(arr_names),
713            ],
714        )
715        .unwrap();
716
717        let mut buf = Vec::new();
718        {
719            let mut writer = LineDelimitedWriter::new(&mut buf);
720            writer.write_batches(&[&batch]).unwrap();
721        }
722
723        assert_json_eq(
724            &buf,
725            r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
726{"name":"b"}
727"#,
728        );
729    }
730
731    #[test]
732    fn write_timestamps_with_tz() {
733        let ts_string = "2018-11-13T17:11:10.011375885995";
734        let ts_nanos = ts_string
735            .parse::<chrono::NaiveDateTime>()
736            .unwrap()
737            .and_utc()
738            .timestamp_nanos_opt()
739            .unwrap();
740        let ts_micros = ts_nanos / 1000;
741        let ts_millis = ts_micros / 1000;
742        let ts_secs = ts_millis / 1000;
743
744        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
745        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
746        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
747        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
748        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
749
750        let tz = "+00:00";
751
752        let arr_nanos = arr_nanos.with_timezone(tz);
753        let arr_micros = arr_micros.with_timezone(tz);
754        let arr_millis = arr_millis.with_timezone(tz);
755        let arr_secs = arr_secs.with_timezone(tz);
756
757        let schema = Schema::new(vec![
758            Field::new("nanos", arr_nanos.data_type().clone(), true),
759            Field::new("micros", arr_micros.data_type().clone(), true),
760            Field::new("millis", arr_millis.data_type().clone(), true),
761            Field::new("secs", arr_secs.data_type().clone(), true),
762            Field::new("name", arr_names.data_type().clone(), true),
763        ]);
764        let schema = Arc::new(schema);
765
766        let batch = RecordBatch::try_new(
767            schema,
768            vec![
769                Arc::new(arr_nanos),
770                Arc::new(arr_micros),
771                Arc::new(arr_millis),
772                Arc::new(arr_secs),
773                Arc::new(arr_names),
774            ],
775        )
776        .unwrap();
777
778        let mut buf = Vec::new();
779        {
780            let mut writer = LineDelimitedWriter::new(&mut buf);
781            writer.write_batches(&[&batch]).unwrap();
782        }
783
784        assert_json_eq(
785            &buf,
786            r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
787{"name":"b"}
788"#,
789        );
790    }
791
792    #[test]
793    fn write_dates() {
794        let ts_string = "2018-11-13T17:11:10.011375885995";
795        let ts_millis = ts_string
796            .parse::<chrono::NaiveDateTime>()
797            .unwrap()
798            .and_utc()
799            .timestamp_millis();
800
801        let arr_date32 = Date32Array::from(vec![
802            Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
803            None,
804        ]);
805        let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
806        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
807
808        let schema = Schema::new(vec![
809            Field::new("date32", arr_date32.data_type().clone(), true),
810            Field::new("date64", arr_date64.data_type().clone(), true),
811            Field::new("name", arr_names.data_type().clone(), false),
812        ]);
813        let schema = Arc::new(schema);
814
815        let batch = RecordBatch::try_new(
816            schema,
817            vec![
818                Arc::new(arr_date32),
819                Arc::new(arr_date64),
820                Arc::new(arr_names),
821            ],
822        )
823        .unwrap();
824
825        let mut buf = Vec::new();
826        {
827            let mut writer = LineDelimitedWriter::new(&mut buf);
828            writer.write_batches(&[&batch]).unwrap();
829        }
830
831        assert_json_eq(
832            &buf,
833            r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
834{"name":"b"}
835"#,
836        );
837    }
838
839    #[test]
840    fn write_times() {
841        let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
842        let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
843        let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
844        let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
845        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
846
847        let schema = Schema::new(vec![
848            Field::new("time32sec", arr_time32sec.data_type().clone(), true),
849            Field::new("time32msec", arr_time32msec.data_type().clone(), true),
850            Field::new("time64usec", arr_time64usec.data_type().clone(), true),
851            Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
852            Field::new("name", arr_names.data_type().clone(), true),
853        ]);
854        let schema = Arc::new(schema);
855
856        let batch = RecordBatch::try_new(
857            schema,
858            vec![
859                Arc::new(arr_time32sec),
860                Arc::new(arr_time32msec),
861                Arc::new(arr_time64usec),
862                Arc::new(arr_time64nsec),
863                Arc::new(arr_names),
864            ],
865        )
866        .unwrap();
867
868        let mut buf = Vec::new();
869        {
870            let mut writer = LineDelimitedWriter::new(&mut buf);
871            writer.write_batches(&[&batch]).unwrap();
872        }
873
874        assert_json_eq(
875            &buf,
876            r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
877{"name":"b"}
878"#,
879        );
880    }
881
882    #[test]
883    fn write_durations() {
884        let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
885        let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
886        let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
887        let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
888        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
889
890        let schema = Schema::new(vec![
891            Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
892            Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
893            Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
894            Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
895            Field::new("name", arr_names.data_type().clone(), true),
896        ]);
897        let schema = Arc::new(schema);
898
899        let batch = RecordBatch::try_new(
900            schema,
901            vec![
902                Arc::new(arr_durationsec),
903                Arc::new(arr_durationmsec),
904                Arc::new(arr_durationusec),
905                Arc::new(arr_durationnsec),
906                Arc::new(arr_names),
907            ],
908        )
909        .unwrap();
910
911        let mut buf = Vec::new();
912        {
913            let mut writer = LineDelimitedWriter::new(&mut buf);
914            writer.write_batches(&[&batch]).unwrap();
915        }
916
917        assert_json_eq(
918            &buf,
919            r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
920{"name":"b"}
921"#,
922        );
923    }
924
925    #[test]
926    fn write_nested_structs() {
927        let schema = Schema::new(vec![
928            Field::new(
929                "c1",
930                DataType::Struct(Fields::from(vec![
931                    Field::new("c11", DataType::Int32, true),
932                    Field::new(
933                        "c12",
934                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
935                        false,
936                    ),
937                ])),
938                false,
939            ),
940            Field::new("c2", DataType::Utf8, false),
941        ]);
942
943        let c1 = StructArray::from(vec![
944            (
945                Arc::new(Field::new("c11", DataType::Int32, true)),
946                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
947            ),
948            (
949                Arc::new(Field::new(
950                    "c12",
951                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
952                    false,
953                )),
954                Arc::new(StructArray::from(vec![(
955                    Arc::new(Field::new("c121", DataType::Utf8, false)),
956                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
957                )])) as ArrayRef,
958            ),
959        ]);
960        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
961
962        let batch =
963            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
964
965        let mut buf = Vec::new();
966        {
967            let mut writer = LineDelimitedWriter::new(&mut buf);
968            writer.write_batches(&[&batch]).unwrap();
969        }
970
971        assert_json_eq(
972            &buf,
973            r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
974{"c1":{"c12":{"c121":"f"}},"c2":"b"}
975{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
976"#,
977        );
978    }
979
980    #[test]
981    fn write_struct_with_list_field() {
982        let field_c1 = Field::new(
983            "c1",
984            DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
985            false,
986        );
987        let field_c2 = Field::new("c2", DataType::Int32, false);
988        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
989
990        let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
991        // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"]
992        let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
993        let a_list_data = ArrayData::builder(field_c1.data_type().clone())
994            .len(5)
995            .add_buffer(a_value_offsets)
996            .add_child_data(a_values.into_data())
997            .null_bit_buffer(Some(Buffer::from([0b00011111])))
998            .build()
999            .unwrap();
1000        let a = ListArray::from(a_list_data);
1001
1002        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1003
1004        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1005
1006        let mut buf = Vec::new();
1007        {
1008            let mut writer = LineDelimitedWriter::new(&mut buf);
1009            writer.write_batches(&[&batch]).unwrap();
1010        }
1011
1012        assert_json_eq(
1013            &buf,
1014            r#"{"c1":["a","a1"],"c2":1}
1015{"c1":["b"],"c2":2}
1016{"c1":["c"],"c2":3}
1017{"c1":["d"],"c2":4}
1018{"c1":["e"],"c2":5}
1019"#,
1020        );
1021    }
1022
1023    #[test]
1024    fn write_nested_list() {
1025        let list_inner_type = Field::new(
1026            "a",
1027            DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
1028            false,
1029        );
1030        let field_c1 = Field::new(
1031            "c1",
1032            DataType::List(Arc::new(list_inner_type.clone())),
1033            false,
1034        );
1035        let field_c2 = Field::new("c2", DataType::Utf8, true);
1036        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1037
1038        // list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
1039        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1040
1041        let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1042        // Construct a list array from the above two
1043        let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1044            .len(3)
1045            .add_buffer(a_value_offsets)
1046            .null_bit_buffer(Some(Buffer::from([0b00000111])))
1047            .add_child_data(a_values.into_data())
1048            .build()
1049            .unwrap();
1050
1051        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1052        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1053            .len(3)
1054            .add_buffer(c1_value_offsets)
1055            .add_child_data(a_list_data)
1056            .build()
1057            .unwrap();
1058
1059        let c1 = ListArray::from(c1_list_data);
1060        let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1061
1062        let batch =
1063            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1064
1065        let mut buf = Vec::new();
1066        {
1067            let mut writer = LineDelimitedWriter::new(&mut buf);
1068            writer.write_batches(&[&batch]).unwrap();
1069        }
1070
1071        assert_json_eq(
1072            &buf,
1073            r#"{"c1":[[1,2],[3]],"c2":"foo"}
1074{"c1":[],"c2":"bar"}
1075{"c1":[[4,5,6]]}
1076"#,
1077        );
1078    }
1079
1080    #[test]
1081    fn write_list_of_struct() {
1082        let field_c1 = Field::new(
1083            "c1",
1084            DataType::List(Arc::new(Field::new(
1085                "s",
1086                DataType::Struct(Fields::from(vec![
1087                    Field::new("c11", DataType::Int32, true),
1088                    Field::new(
1089                        "c12",
1090                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1091                        false,
1092                    ),
1093                ])),
1094                false,
1095            ))),
1096            true,
1097        );
1098        let field_c2 = Field::new("c2", DataType::Int32, false);
1099        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1100
1101        let struct_values = StructArray::from(vec![
1102            (
1103                Arc::new(Field::new("c11", DataType::Int32, true)),
1104                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1105            ),
1106            (
1107                Arc::new(Field::new(
1108                    "c12",
1109                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1110                    false,
1111                )),
1112                Arc::new(StructArray::from(vec![(
1113                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1114                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1115                )])) as ArrayRef,
1116            ),
1117        ]);
1118
1119        // list column rows (c1):
1120        // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}],
1121        // null,
1122        // [{"c11": 5, "c12": {"c121": "g"}}]
1123        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1124        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1125            .len(3)
1126            .add_buffer(c1_value_offsets)
1127            .add_child_data(struct_values.into_data())
1128            .null_bit_buffer(Some(Buffer::from([0b00000101])))
1129            .build()
1130            .unwrap();
1131        let c1 = ListArray::from(c1_list_data);
1132
1133        let c2 = Int32Array::from(vec![1, 2, 3]);
1134
1135        let batch =
1136            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1137
1138        let mut buf = Vec::new();
1139        {
1140            let mut writer = LineDelimitedWriter::new(&mut buf);
1141            writer.write_batches(&[&batch]).unwrap();
1142        }
1143
1144        assert_json_eq(
1145            &buf,
1146            r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1147{"c2":2}
1148{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1149"#,
1150        );
1151    }
1152
1153    fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1154        let file = File::open(test_file).unwrap();
1155        let mut reader = BufReader::new(file);
1156        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1157        reader.rewind().unwrap();
1158
1159        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1160        let mut reader = builder.build(reader).unwrap();
1161        let batch = reader.next().unwrap().unwrap();
1162
1163        let mut buf = Vec::new();
1164        {
1165            if remove_nulls {
1166                let mut writer = LineDelimitedWriter::new(&mut buf);
1167                writer.write_batches(&[&batch]).unwrap();
1168            } else {
1169                let mut writer = WriterBuilder::new()
1170                    .with_explicit_nulls(true)
1171                    .build::<_, LineDelimited>(&mut buf);
1172                writer.write_batches(&[&batch]).unwrap();
1173            }
1174        }
1175
1176        let result = str::from_utf8(&buf).unwrap();
1177        let expected = read_to_string(test_file).unwrap();
1178        for (r, e) in result.lines().zip(expected.lines()) {
1179            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1180            if remove_nulls {
1181                // remove null value from object to make comparison consistent:
1182                if let Value::Object(obj) = expected_json {
1183                    expected_json =
1184                        Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1185                }
1186            }
1187            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1188        }
1189    }
1190
1191    #[test]
1192    fn write_basic_rows() {
1193        test_write_for_file("test/data/basic.json", true);
1194    }
1195
1196    #[test]
1197    fn write_arrays() {
1198        test_write_for_file("test/data/arrays.json", true);
1199    }
1200
1201    #[test]
1202    fn write_basic_nulls() {
1203        test_write_for_file("test/data/basic_nulls.json", true);
1204    }
1205
1206    #[test]
1207    fn write_nested_with_nulls() {
1208        test_write_for_file("test/data/nested_with_nulls.json", false);
1209    }
1210
1211    #[test]
1212    fn json_line_writer_empty() {
1213        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1214        writer.finish().unwrap();
1215        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1216    }
1217
1218    #[test]
1219    fn json_array_writer_empty() {
1220        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1221        writer.finish().unwrap();
1222        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1223    }
1224
1225    #[test]
1226    fn json_line_writer_empty_batch() {
1227        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1228
1229        let array = Int32Array::from(Vec::<i32>::new());
1230        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1231        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1232
1233        writer.write(&batch).unwrap();
1234        writer.finish().unwrap();
1235        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1236    }
1237
1238    #[test]
1239    fn json_array_writer_empty_batch() {
1240        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1241
1242        let array = Int32Array::from(Vec::<i32>::new());
1243        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1244        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1245
1246        writer.write(&batch).unwrap();
1247        writer.finish().unwrap();
1248        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1249    }
1250
1251    #[test]
1252    fn json_struct_array_nulls() {
1253        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1254            Some(vec![Some(1), Some(2)]),
1255            Some(vec![None]),
1256            Some(vec![]),
1257            Some(vec![Some(3), None]), // masked for a
1258            Some(vec![Some(4), Some(5)]),
1259            None, // masked for a
1260            None,
1261        ]);
1262
1263        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1264        let array = Arc::new(inner) as ArrayRef;
1265        let struct_array_a = StructArray::from((
1266            vec![(field.clone(), array.clone())],
1267            Buffer::from([0b01010111]),
1268        ));
1269        let struct_array_b = StructArray::from(vec![(field, array)]);
1270
1271        let schema = Schema::new(vec![
1272            Field::new_struct("a", struct_array_a.fields().clone(), true),
1273            Field::new_struct("b", struct_array_b.fields().clone(), true),
1274        ]);
1275
1276        let batch = RecordBatch::try_new(
1277            Arc::new(schema),
1278            vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1279        )
1280        .unwrap();
1281
1282        let mut buf = Vec::new();
1283        {
1284            let mut writer = LineDelimitedWriter::new(&mut buf);
1285            writer.write_batches(&[&batch]).unwrap();
1286        }
1287
1288        assert_json_eq(
1289            &buf,
1290            r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1291{"a":{"list":[null]},"b":{"list":[null]}}
1292{"a":{"list":[]},"b":{"list":[]}}
1293{"b":{"list":[3,null]}}
1294{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1295{"b":{}}
1296{"a":{},"b":{}}
1297"#,
1298        );
1299    }
1300
1301    #[test]
1302    fn json_writer_map() {
1303        let keys_array = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1304        let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1305
1306        let keys = Arc::new(Field::new("keys", DataType::Utf8, false));
1307        let values = Arc::new(Field::new("values", DataType::Int64, false));
1308        let entry_struct = StructArray::from(vec![
1309            (keys, Arc::new(keys_array) as ArrayRef),
1310            (values, Arc::new(values_array) as ArrayRef),
1311        ]);
1312
1313        let map_data_type = DataType::Map(
1314            Arc::new(Field::new(
1315                "entries",
1316                entry_struct.data_type().clone(),
1317                false,
1318            )),
1319            false,
1320        );
1321
1322        // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}, {"quux": 50}, {}]
1323        let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1324        let valid_buffer = Buffer::from([0b00111101]);
1325
1326        let map_data = ArrayData::builder(map_data_type.clone())
1327            .len(6)
1328            .null_bit_buffer(Some(valid_buffer))
1329            .add_buffer(entry_offsets)
1330            .add_child_data(entry_struct.into_data())
1331            .build()
1332            .unwrap();
1333
1334        let map = MapArray::from(map_data);
1335
1336        let map_field = Field::new("map", map_data_type, true);
1337        let schema = Arc::new(Schema::new(vec![map_field]));
1338
1339        let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1340
1341        let mut buf = Vec::new();
1342        {
1343            let mut writer = LineDelimitedWriter::new(&mut buf);
1344            writer.write_batches(&[&batch]).unwrap();
1345        }
1346
1347        assert_json_eq(
1348            &buf,
1349            r#"{"map":{"foo":10}}
1350{}
1351{"map":{}}
1352{"map":{"bar":20,"baz":30,"qux":40}}
1353{"map":{"quux":50}}
1354{"map":{}}
1355"#,
1356        );
1357    }
1358
1359    #[test]
1360    fn test_write_single_batch() {
1361        let test_file = "test/data/basic.json";
1362        let file = File::open(test_file).unwrap();
1363        let mut reader = BufReader::new(file);
1364        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1365        reader.rewind().unwrap();
1366
1367        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1368        let mut reader = builder.build(reader).unwrap();
1369        let batch = reader.next().unwrap().unwrap();
1370
1371        let mut buf = Vec::new();
1372        {
1373            let mut writer = LineDelimitedWriter::new(&mut buf);
1374            writer.write(&batch).unwrap();
1375        }
1376
1377        let result = str::from_utf8(&buf).unwrap();
1378        let expected = read_to_string(test_file).unwrap();
1379        for (r, e) in result.lines().zip(expected.lines()) {
1380            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1381            // remove null value from object to make comparison consistent:
1382            if let Value::Object(obj) = expected_json {
1383                expected_json =
1384                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1385            }
1386            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1387        }
1388    }
1389
1390    #[test]
1391    fn test_write_multi_batches() {
1392        let test_file = "test/data/basic.json";
1393
1394        let schema = SchemaRef::new(Schema::new(vec![
1395            Field::new("a", DataType::Int64, true),
1396            Field::new("b", DataType::Float64, true),
1397            Field::new("c", DataType::Boolean, true),
1398            Field::new("d", DataType::Utf8, true),
1399            Field::new("e", DataType::Utf8, true),
1400            Field::new("f", DataType::Utf8, true),
1401            Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1402            Field::new("h", DataType::Float16, true),
1403        ]));
1404
1405        let mut reader = ReaderBuilder::new(schema.clone())
1406            .build(BufReader::new(File::open(test_file).unwrap()))
1407            .unwrap();
1408        let batch = reader.next().unwrap().unwrap();
1409
1410        // test batches = an empty batch + 2 same batches, finally result should be eq to 2 same batches
1411        let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1412
1413        let mut buf = Vec::new();
1414        {
1415            let mut writer = LineDelimitedWriter::new(&mut buf);
1416            writer.write_batches(&batches).unwrap();
1417        }
1418
1419        let result = str::from_utf8(&buf).unwrap();
1420        let expected = read_to_string(test_file).unwrap();
1421        // result is eq to 2 same batches
1422        let expected = format!("{expected}\n{expected}");
1423        for (r, e) in result.lines().zip(expected.lines()) {
1424            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1425            // remove null value from object to make comparison consistent:
1426            if let Value::Object(obj) = expected_json {
1427                expected_json =
1428                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1429            }
1430            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1431        }
1432    }
1433
1434    #[test]
1435    fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1436        fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1437            let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1438                Some(vec![None, None, None]),
1439                Some(vec![Some(1), Some(2), Some(3)]),
1440                None,
1441                Some(vec![None, None, None]),
1442            ]));
1443            let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1444            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1445            (array, field)
1446        }
1447
1448        fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1449            let array = Arc::new(DictionaryArray::from_iter(vec![
1450                Some("cupcakes"),
1451                None,
1452                Some("bear"),
1453                Some("kuma"),
1454            ]));
1455            let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1456            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1457            (array, field)
1458        }
1459
1460        fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1461            let string_builder = StringBuilder::new();
1462            let int_builder = Int64Builder::new();
1463            let mut builder = MapBuilder::new(None, string_builder, int_builder);
1464
1465            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1466            builder.keys().append_value("foo");
1467            builder.values().append_value(10);
1468            builder.append(true).unwrap();
1469
1470            builder.append(false).unwrap();
1471
1472            builder.append(true).unwrap();
1473
1474            builder.keys().append_value("bar");
1475            builder.values().append_value(20);
1476            builder.keys().append_value("baz");
1477            builder.values().append_value(30);
1478            builder.keys().append_value("qux");
1479            builder.values().append_value(40);
1480            builder.append(true).unwrap();
1481
1482            let array = Arc::new(builder.finish());
1483            let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1484            (array, field)
1485        }
1486
1487        fn root_list() -> (Arc<ListArray>, Field) {
1488            let struct_array = StructArray::from(vec![
1489                (
1490                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1491                    Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1492                ),
1493                (
1494                    Arc::new(Field::new("int32", DataType::Int32, true)),
1495                    Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1496                ),
1497            ]);
1498
1499            let field = Field::new_list(
1500                "list",
1501                Field::new("struct", struct_array.data_type().clone(), true),
1502                true,
1503            );
1504
1505            // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1506            let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1507            let data = ArrayData::builder(field.data_type().clone())
1508                .len(4)
1509                .add_buffer(entry_offsets)
1510                .add_child_data(struct_array.into_data())
1511                .null_bit_buffer(Some([0b00000101].into()))
1512                .build()
1513                .unwrap();
1514            let array = Arc::new(ListArray::from(data));
1515            (array, field)
1516        }
1517
1518        let (nested_list_array, nested_list_field) = nested_list();
1519        let (nested_dict_array, nested_dict_field) = nested_dict();
1520        let (nested_map_array, nested_map_field) = nested_map();
1521        let (root_list_array, root_list_field) = root_list();
1522
1523        let schema = Schema::new(vec![
1524            Field::new("date", DataType::Date32, true),
1525            Field::new("null", DataType::Null, true),
1526            Field::new_struct(
1527                "struct",
1528                vec![
1529                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1530                    nested_list_field.clone(),
1531                    nested_dict_field.clone(),
1532                    nested_map_field.clone(),
1533                ],
1534                true,
1535            ),
1536            root_list_field,
1537        ]);
1538
1539        let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1540        let arr_null = NullArray::new(4);
1541        let arr_struct = StructArray::from(vec![
1542            // [{"utf8":"a"},{"utf8":null},{"utf8":null},{"utf8":"b"}]
1543            (
1544                Arc::new(Field::new("utf8", DataType::Utf8, true)),
1545                Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1546            ),
1547            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1548            (nested_list_field, nested_list_array as ArrayRef),
1549            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1550            (nested_dict_field, nested_dict_array as ArrayRef),
1551            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1552            (nested_map_field, nested_map_array as ArrayRef),
1553        ]);
1554
1555        let batch = RecordBatch::try_new(
1556            Arc::new(schema),
1557            vec![
1558                // [{"date":"1970-01-01"},{"date":null},{"date":"1970-01-02"},{"date":null}]
1559                Arc::new(arr_date32),
1560                // [{"null":null},{"null":null},{"null":null},{"null":null}]
1561                Arc::new(arr_null),
1562                Arc::new(arr_struct),
1563                // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1564                root_list_array,
1565            ],
1566        )?;
1567
1568        let mut buf = Vec::new();
1569        {
1570            let mut writer = WriterBuilder::new()
1571                .with_explicit_nulls(true)
1572                .build::<_, JsonArray>(&mut buf);
1573            writer.write_batches(&[&batch])?;
1574            writer.finish()?;
1575        }
1576
1577        let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1578        let expected = serde_json::from_value::<Vec<Value>>(json!([
1579          {
1580            "date": "1970-01-01",
1581            "list": [
1582              {
1583                "int32": 1,
1584                "utf8": "a"
1585              },
1586              {
1587                "int32": null,
1588                "utf8": "b"
1589              }
1590            ],
1591            "null": null,
1592            "struct": {
1593              "dict": "cupcakes",
1594              "list": [
1595                null,
1596                null,
1597                null
1598              ],
1599              "map": {
1600                "foo": 10
1601              },
1602              "utf8": "a"
1603            }
1604          },
1605          {
1606            "date": null,
1607            "list": null,
1608            "null": null,
1609            "struct": {
1610              "dict": null,
1611              "list": [
1612                1,
1613                2,
1614                3
1615              ],
1616              "map": null,
1617              "utf8": null
1618            }
1619          },
1620          {
1621            "date": "1970-01-02",
1622            "list": [
1623              {
1624                "int32": 5,
1625                "utf8": null
1626              }
1627            ],
1628            "null": null,
1629            "struct": {
1630              "dict": "bear",
1631              "list": null,
1632              "map": {},
1633              "utf8": null
1634            }
1635          },
1636          {
1637            "date": null,
1638            "list": null,
1639            "null": null,
1640            "struct": {
1641              "dict": "kuma",
1642              "list": [
1643                null,
1644                null,
1645                null
1646              ],
1647              "map": {
1648                "bar": 20,
1649                "baz": 30,
1650                "qux": 40
1651              },
1652              "utf8": "b"
1653            }
1654          }
1655        ]))
1656        .unwrap();
1657
1658        assert_eq!(actual, expected);
1659
1660        Ok(())
1661    }
1662
1663    fn binary_encoding_test<O: OffsetSizeTrait>() {
1664        // set up schema
1665        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1666            "bytes",
1667            GenericBinaryType::<O>::DATA_TYPE,
1668            true,
1669        )]));
1670
1671        // build record batch:
1672        let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1673        let values = [Some(b"Ned Flanders"), None, Some(b"Troy McClure")];
1674        for value in values {
1675            match value {
1676                Some(v) => builder.append_value(v),
1677                None => builder.append_null(),
1678            }
1679        }
1680        let array = Arc::new(builder.finish()) as ArrayRef;
1681        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1682
1683        // encode and check JSON with explicit nulls:
1684        {
1685            let mut buf = Vec::new();
1686            let json_value: Value = {
1687                let mut writer = WriterBuilder::new()
1688                    .with_explicit_nulls(true)
1689                    .build::<_, JsonArray>(&mut buf);
1690                writer.write(&batch).unwrap();
1691                writer.close().unwrap();
1692                serde_json::from_slice(&buf).unwrap()
1693            };
1694
1695            assert_eq!(
1696                json!([
1697                    {
1698                        "bytes": "4e656420466c616e64657273"
1699                    },
1700                    {
1701                        "bytes": null // the explicit null
1702                    },
1703                    {
1704                        "bytes": "54726f79204d63436c757265"
1705                    }
1706                ]),
1707                json_value,
1708            );
1709        }
1710
1711        // encode and check JSON with no explicit nulls:
1712        {
1713            let mut buf = Vec::new();
1714            let json_value: Value = {
1715                // explicit nulls are off by default, so we don't need
1716                // to set that when creating the writer:
1717                let mut writer = ArrayWriter::new(&mut buf);
1718                writer.write(&batch).unwrap();
1719                writer.close().unwrap();
1720                serde_json::from_slice(&buf).unwrap()
1721            };
1722
1723            assert_eq!(
1724                json!([
1725                    {
1726                        "bytes": "4e656420466c616e64657273"
1727                    },
1728                    {}, // empty because nulls are omitted
1729                    {
1730                        "bytes": "54726f79204d63436c757265"
1731                    }
1732                ]),
1733                json_value
1734            );
1735        }
1736    }
1737
1738    #[test]
1739    fn test_writer_binary() {
1740        // Binary:
1741        binary_encoding_test::<i32>();
1742        // LargeBinary:
1743        binary_encoding_test::<i64>();
1744    }
1745
1746    #[test]
1747    fn test_writer_fixed_size_binary() {
1748        // set up schema:
1749        let size = 11;
1750        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1751            "bytes",
1752            DataType::FixedSizeBinary(size),
1753            true,
1754        )]));
1755
1756        // build record batch:
1757        let mut builder = FixedSizeBinaryBuilder::new(size);
1758        let values = [Some(b"hello world"), None, Some(b"summer rain")];
1759        for value in values {
1760            match value {
1761                Some(v) => builder.append_value(v).unwrap(),
1762                None => builder.append_null(),
1763            }
1764        }
1765        let array = Arc::new(builder.finish()) as ArrayRef;
1766        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1767
1768        // encode and check JSON with explicit nulls:
1769        {
1770            let mut buf = Vec::new();
1771            let json_value: Value = {
1772                let mut writer = WriterBuilder::new()
1773                    .with_explicit_nulls(true)
1774                    .build::<_, JsonArray>(&mut buf);
1775                writer.write(&batch).unwrap();
1776                writer.close().unwrap();
1777                serde_json::from_slice(&buf).unwrap()
1778            };
1779
1780            assert_eq!(
1781                json!([
1782                    {
1783                        "bytes": "68656c6c6f20776f726c64"
1784                    },
1785                    {
1786                        "bytes": null // the explicit null
1787                    },
1788                    {
1789                        "bytes": "73756d6d6572207261696e"
1790                    }
1791                ]),
1792                json_value,
1793            );
1794        }
1795        // encode and check JSON with no explicit nulls:
1796        {
1797            let mut buf = Vec::new();
1798            let json_value: Value = {
1799                // explicit nulls are off by default, so we don't need
1800                // to set that when creating the writer:
1801                let mut writer = ArrayWriter::new(&mut buf);
1802                writer.write(&batch).unwrap();
1803                writer.close().unwrap();
1804                serde_json::from_slice(&buf).unwrap()
1805            };
1806
1807            assert_eq!(
1808                json!([
1809                    {
1810                        "bytes": "68656c6c6f20776f726c64"
1811                    },
1812                    {}, // empty because nulls are omitted
1813                    {
1814                        "bytes": "73756d6d6572207261696e"
1815                    }
1816                ]),
1817                json_value,
1818            );
1819        }
1820    }
1821
1822    #[test]
1823    fn test_writer_fixed_size_list() {
1824        let size = 3;
1825        let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1826        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1827            "list",
1828            DataType::FixedSizeList(field, size),
1829            true,
1830        )]));
1831
1832        let values_builder = Int32Builder::new();
1833        let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1834        let lists = [
1835            Some([Some(1), Some(2), None]),
1836            Some([Some(3), None, Some(4)]),
1837            Some([None, Some(5), Some(6)]),
1838            None,
1839        ];
1840        for list in lists {
1841            match list {
1842                Some(l) => {
1843                    for value in l {
1844                        match value {
1845                            Some(v) => list_builder.values().append_value(v),
1846                            None => list_builder.values().append_null(),
1847                        }
1848                    }
1849                    list_builder.append(true);
1850                }
1851                None => {
1852                    for _ in 0..size {
1853                        list_builder.values().append_null();
1854                    }
1855                    list_builder.append(false);
1856                }
1857            }
1858        }
1859        let array = Arc::new(list_builder.finish()) as ArrayRef;
1860        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1861
1862        //encode and check JSON with explicit nulls:
1863        {
1864            let json_value: Value = {
1865                let mut buf = Vec::new();
1866                let mut writer = WriterBuilder::new()
1867                    .with_explicit_nulls(true)
1868                    .build::<_, JsonArray>(&mut buf);
1869                writer.write(&batch).unwrap();
1870                writer.close().unwrap();
1871                serde_json::from_slice(&buf).unwrap()
1872            };
1873            assert_eq!(
1874                json!([
1875                    {"list": [1, 2, null]},
1876                    {"list": [3, null, 4]},
1877                    {"list": [null, 5, 6]},
1878                    {"list": null},
1879                ]),
1880                json_value
1881            );
1882        }
1883        // encode and check JSON with no explicit nulls:
1884        {
1885            let json_value: Value = {
1886                let mut buf = Vec::new();
1887                let mut writer = ArrayWriter::new(&mut buf);
1888                writer.write(&batch).unwrap();
1889                writer.close().unwrap();
1890                serde_json::from_slice(&buf).unwrap()
1891            };
1892            assert_eq!(
1893                json!([
1894                    {"list": [1, 2, null]},
1895                    {"list": [3, null, 4]},
1896                    {"list": [null, 5, 6]},
1897                    {}, // empty because nulls are omitted
1898                ]),
1899                json_value
1900            );
1901        }
1902    }
1903
1904    #[test]
1905    fn test_writer_null_dict() {
1906        let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
1907        let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
1908        let dict = DictionaryArray::new(keys, values);
1909
1910        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1911            "my_dict",
1912            DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
1913            true,
1914        )]));
1915
1916        let array = Arc::new(dict) as ArrayRef;
1917        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1918
1919        let mut json = Vec::new();
1920        let write_builder = WriterBuilder::new().with_explicit_nulls(true);
1921        let mut writer = write_builder.build::<_, JsonArray>(&mut json);
1922        writer.write(&batch).unwrap();
1923        writer.close().unwrap();
1924
1925        let json_str = str::from_utf8(&json).unwrap();
1926        assert_eq!(
1927            json_str,
1928            r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
1929        )
1930    }
1931
1932    #[test]
1933    fn test_decimal32_encoder() {
1934        let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
1935            .with_precision_and_scale(8, 2)
1936            .unwrap();
1937        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1938        let schema = Schema::new(vec![field]);
1939        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1940
1941        let mut buf = Vec::new();
1942        {
1943            let mut writer = LineDelimitedWriter::new(&mut buf);
1944            writer.write_batches(&[&batch]).unwrap();
1945        }
1946
1947        assert_json_eq(
1948            &buf,
1949            r#"{"decimal":12.34}
1950{"decimal":56.78}
1951{"decimal":90.12}
1952"#,
1953        );
1954    }
1955
1956    #[test]
1957    fn test_decimal64_encoder() {
1958        let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
1959            .with_precision_and_scale(10, 2)
1960            .unwrap();
1961        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1962        let schema = Schema::new(vec![field]);
1963        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1964
1965        let mut buf = Vec::new();
1966        {
1967            let mut writer = LineDelimitedWriter::new(&mut buf);
1968            writer.write_batches(&[&batch]).unwrap();
1969        }
1970
1971        assert_json_eq(
1972            &buf,
1973            r#"{"decimal":12.34}
1974{"decimal":56.78}
1975{"decimal":90.12}
1976"#,
1977        );
1978    }
1979
1980    #[test]
1981    fn test_decimal128_encoder() {
1982        let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
1983            .with_precision_and_scale(10, 2)
1984            .unwrap();
1985        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1986        let schema = Schema::new(vec![field]);
1987        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1988
1989        let mut buf = Vec::new();
1990        {
1991            let mut writer = LineDelimitedWriter::new(&mut buf);
1992            writer.write_batches(&[&batch]).unwrap();
1993        }
1994
1995        assert_json_eq(
1996            &buf,
1997            r#"{"decimal":12.34}
1998{"decimal":56.78}
1999{"decimal":90.12}
2000"#,
2001        );
2002    }
2003
2004    #[test]
2005    fn test_decimal256_encoder() {
2006        let array = Decimal256Array::from_iter_values([
2007            i256::from(123400),
2008            i256::from(567800),
2009            i256::from(901200),
2010        ])
2011        .with_precision_and_scale(10, 4)
2012        .unwrap();
2013        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2014        let schema = Schema::new(vec![field]);
2015        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2016
2017        let mut buf = Vec::new();
2018        {
2019            let mut writer = LineDelimitedWriter::new(&mut buf);
2020            writer.write_batches(&[&batch]).unwrap();
2021        }
2022
2023        assert_json_eq(
2024            &buf,
2025            r#"{"decimal":12.3400}
2026{"decimal":56.7800}
2027{"decimal":90.1200}
2028"#,
2029        );
2030    }
2031
2032    #[test]
2033    fn test_decimal_encoder_with_nulls() {
2034        let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
2035            .with_precision_and_scale(10, 2)
2036            .unwrap();
2037        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2038        let schema = Schema::new(vec![field]);
2039        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2040
2041        let mut buf = Vec::new();
2042        {
2043            let mut writer = LineDelimitedWriter::new(&mut buf);
2044            writer.write_batches(&[&batch]).unwrap();
2045        }
2046
2047        assert_json_eq(
2048            &buf,
2049            r#"{"decimal":12.34}
2050{}
2051{"decimal":56.78}
2052"#,
2053        );
2054    }
2055
2056    #[test]
2057    fn write_structs_as_list() {
2058        let schema = Schema::new(vec![
2059            Field::new(
2060                "c1",
2061                DataType::Struct(Fields::from(vec![
2062                    Field::new("c11", DataType::Int32, true),
2063                    Field::new(
2064                        "c12",
2065                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2066                        false,
2067                    ),
2068                ])),
2069                false,
2070            ),
2071            Field::new("c2", DataType::Utf8, false),
2072        ]);
2073
2074        let c1 = StructArray::from(vec![
2075            (
2076                Arc::new(Field::new("c11", DataType::Int32, true)),
2077                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2078            ),
2079            (
2080                Arc::new(Field::new(
2081                    "c12",
2082                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2083                    false,
2084                )),
2085                Arc::new(StructArray::from(vec![(
2086                    Arc::new(Field::new("c121", DataType::Utf8, false)),
2087                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2088                )])) as ArrayRef,
2089            ),
2090        ]);
2091        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2092
2093        let batch =
2094            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2095
2096        let expected = r#"[[1,["e"]],"a"]
2097[[null,["f"]],"b"]
2098[[5,["g"]],"c"]
2099"#;
2100
2101        let mut buf = Vec::new();
2102        {
2103            let builder = WriterBuilder::new()
2104                .with_explicit_nulls(true)
2105                .with_struct_mode(StructMode::ListOnly);
2106            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2107            writer.write_batches(&[&batch]).unwrap();
2108        }
2109        assert_json_eq(&buf, expected);
2110
2111        let mut buf = Vec::new();
2112        {
2113            let builder = WriterBuilder::new()
2114                .with_explicit_nulls(false)
2115                .with_struct_mode(StructMode::ListOnly);
2116            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2117            writer.write_batches(&[&batch]).unwrap();
2118        }
2119        assert_json_eq(&buf, expected);
2120    }
2121
2122    fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2123        // Note: this is not intended to be an efficient implementation.
2124        // Just a simple example to demonstrate how to implement a custom encoder.
2125        #[derive(Debug)]
2126        enum UnionValue {
2127            Int32(i32),
2128            String(String),
2129        }
2130
2131        #[derive(Debug)]
2132        struct UnionEncoder {
2133            array: Vec<Option<UnionValue>>,
2134        }
2135
2136        impl Encoder for UnionEncoder {
2137            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2138                match &self.array[idx] {
2139                    None => out.extend_from_slice(b"null"),
2140                    Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2141                    Some(UnionValue::String(v)) => {
2142                        out.extend_from_slice(format!("\"{v}\"").as_bytes())
2143                    }
2144                }
2145            }
2146        }
2147
2148        #[derive(Debug)]
2149        struct UnionEncoderFactory;
2150
2151        impl EncoderFactory for UnionEncoderFactory {
2152            fn make_default_encoder<'a>(
2153                &self,
2154                _field: &'a FieldRef,
2155                array: &'a dyn Array,
2156                _options: &'a EncoderOptions,
2157            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2158                let data_type = array.data_type();
2159                let fields = match data_type {
2160                    DataType::Union(fields, UnionMode::Sparse) => fields,
2161                    _ => return Ok(None),
2162                };
2163                // check that the fields are supported
2164                let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2165                for f in fields.iter() {
2166                    match f.data_type() {
2167                        DataType::Null => {}
2168                        DataType::Int32 => {}
2169                        DataType::Utf8 => {}
2170                        _ => return Ok(None),
2171                    }
2172                }
2173                let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2174                let mut values = Vec::with_capacity(type_ids.len());
2175                for idx in 0..type_ids.len() {
2176                    let type_id = type_ids[idx];
2177                    let field = &fields[type_id as usize];
2178                    let value = match field.data_type() {
2179                        DataType::Null => None,
2180                        DataType::Int32 => Some(UnionValue::Int32(
2181                            buffers[type_id as usize]
2182                                .as_primitive::<Int32Type>()
2183                                .value(idx),
2184                        )),
2185                        DataType::Utf8 => Some(UnionValue::String(
2186                            buffers[type_id as usize]
2187                                .as_string::<i32>()
2188                                .value(idx)
2189                                .to_string(),
2190                        )),
2191                        _ => unreachable!(),
2192                    };
2193                    values.push(value);
2194                }
2195                let array_encoder =
2196                    Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2197                let nulls = array.nulls().cloned();
2198                Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2199            }
2200        }
2201
2202        let int_array = Int32Array::from(vec![Some(1), None, None]);
2203        let string_array = StringArray::from(vec![None, Some("a"), None]);
2204        let null_array = NullArray::new(3);
2205        let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2206
2207        let union_fields = [
2208            (0, Arc::new(Field::new("A", DataType::Int32, false))),
2209            (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2210            (2, Arc::new(Field::new("C", DataType::Null, false))),
2211        ]
2212        .into_iter()
2213        .collect::<UnionFields>();
2214
2215        let children = vec![
2216            Arc::new(int_array) as Arc<dyn Array>,
2217            Arc::new(string_array),
2218            Arc::new(null_array),
2219        ];
2220
2221        let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2222
2223        let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2224
2225        let fields = vec![
2226            Field::new(
2227                "union",
2228                DataType::Union(union_fields, UnionMode::Sparse),
2229                true,
2230            ),
2231            Field::new("float", DataType::Float64, true),
2232        ];
2233
2234        let batch = RecordBatch::try_new(
2235            Arc::new(Schema::new(fields)),
2236            vec![
2237                Arc::new(array) as Arc<dyn Array>,
2238                Arc::new(float_array) as Arc<dyn Array>,
2239            ],
2240        )
2241        .unwrap();
2242
2243        (batch, Arc::new(UnionEncoderFactory))
2244    }
2245
2246    #[test]
2247    fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2248        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2249
2250        let mut buf = Vec::new();
2251        {
2252            let mut writer = WriterBuilder::new()
2253                .with_encoder_factory(encoder_factory)
2254                .with_explicit_nulls(false)
2255                .build::<_, LineDelimited>(&mut buf);
2256            writer.write_batches(&[&batch]).unwrap();
2257            writer.finish().unwrap();
2258        }
2259
2260        println!("{}", str::from_utf8(&buf).unwrap());
2261
2262        assert_json_eq(
2263            &buf,
2264            r#"{"union":1,"float":1.0}
2265{"union":"a"}
2266{"union":null,"float":3.4}
2267"#,
2268        );
2269    }
2270
2271    #[test]
2272    fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2273        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2274
2275        let mut buf = Vec::new();
2276        {
2277            let mut writer = WriterBuilder::new()
2278                .with_encoder_factory(encoder_factory)
2279                .with_explicit_nulls(true)
2280                .build::<_, LineDelimited>(&mut buf);
2281            writer.write_batches(&[&batch]).unwrap();
2282            writer.finish().unwrap();
2283        }
2284
2285        assert_json_eq(
2286            &buf,
2287            r#"{"union":1,"float":1.0}
2288{"union":"a","float":null}
2289{"union":null,"float":3.4}
2290"#,
2291        );
2292    }
2293
2294    #[test]
2295    fn test_fallback_encoder_factory_array_implicit_nulls() {
2296        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2297
2298        let json_value: Value = {
2299            let mut buf = Vec::new();
2300            let mut writer = WriterBuilder::new()
2301                .with_encoder_factory(encoder_factory)
2302                .build::<_, JsonArray>(&mut buf);
2303            writer.write_batches(&[&batch]).unwrap();
2304            writer.finish().unwrap();
2305            serde_json::from_slice(&buf).unwrap()
2306        };
2307
2308        let expected = json!([
2309            {"union":1,"float":1.0},
2310            {"union":"a"},
2311            {"float":3.4,"union":null},
2312        ]);
2313
2314        assert_eq!(json_value, expected);
2315    }
2316
2317    #[test]
2318    fn test_fallback_encoder_factory_array_explicit_nulls() {
2319        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2320
2321        let json_value: Value = {
2322            let mut buf = Vec::new();
2323            let mut writer = WriterBuilder::new()
2324                .with_encoder_factory(encoder_factory)
2325                .with_explicit_nulls(true)
2326                .build::<_, JsonArray>(&mut buf);
2327            writer.write_batches(&[&batch]).unwrap();
2328            writer.finish().unwrap();
2329            serde_json::from_slice(&buf).unwrap()
2330        };
2331
2332        let expected = json!([
2333            {"union":1,"float":1.0},
2334            {"union":"a", "float": null},
2335            {"union":null,"float":3.4},
2336        ]);
2337
2338        assert_eq!(json_value, expected);
2339    }
2340
2341    #[test]
2342    fn test_default_encoder_byte_array() {
2343        struct IntArrayBinaryEncoder<B> {
2344            array: B,
2345        }
2346
2347        impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2348        where
2349            B: ArrayAccessor<Item = &'a [u8]>,
2350        {
2351            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2352                out.push(b'[');
2353                let child = self.array.value(idx);
2354                for (idx, byte) in child.iter().enumerate() {
2355                    write!(out, "{byte}").unwrap();
2356                    if idx < child.len() - 1 {
2357                        out.push(b',');
2358                    }
2359                }
2360                out.push(b']');
2361            }
2362        }
2363
2364        #[derive(Debug)]
2365        struct IntArayBinaryEncoderFactory;
2366
2367        impl EncoderFactory for IntArayBinaryEncoderFactory {
2368            fn make_default_encoder<'a>(
2369                &self,
2370                _field: &'a FieldRef,
2371                array: &'a dyn Array,
2372                _options: &'a EncoderOptions,
2373            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2374                match array.data_type() {
2375                    DataType::Binary => {
2376                        let array = array.as_binary::<i32>();
2377                        let encoder = IntArrayBinaryEncoder { array };
2378                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2379                        let nulls = array.nulls().cloned();
2380                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2381                    }
2382                    _ => Ok(None),
2383                }
2384            }
2385        }
2386
2387        let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2388        let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2389        let fields = vec![
2390            Field::new("bytes", DataType::Binary, true),
2391            Field::new("float", DataType::Float64, true),
2392        ];
2393        let batch = RecordBatch::try_new(
2394            Arc::new(Schema::new(fields)),
2395            vec![
2396                Arc::new(binary_array) as Arc<dyn Array>,
2397                Arc::new(float_array) as Arc<dyn Array>,
2398            ],
2399        )
2400        .unwrap();
2401
2402        let json_value: Value = {
2403            let mut buf = Vec::new();
2404            let mut writer = WriterBuilder::new()
2405                .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2406                .build::<_, JsonArray>(&mut buf);
2407            writer.write_batches(&[&batch]).unwrap();
2408            writer.finish().unwrap();
2409            serde_json::from_slice(&buf).unwrap()
2410        };
2411
2412        let expected = json!([
2413            {"bytes": [97], "float": 1.0},
2414            {"float": 2.3},
2415            {"bytes": [98]},
2416        ]);
2417
2418        assert_eq!(json_value, expected);
2419    }
2420
2421    #[test]
2422    fn test_encoder_factory_customize_dictionary() {
2423        // Test that we can customize the encoding of T even when it shows up as Dictionary<_, T>.
2424
2425        // No particular reason to choose this example.
2426        // Just trying to add some variety to the test cases and demonstrate use cases of the encoder factory.
2427        struct PaddedInt32Encoder {
2428            array: Int32Array,
2429        }
2430
2431        impl Encoder for PaddedInt32Encoder {
2432            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2433                let value = self.array.value(idx);
2434                write!(out, "\"{value:0>8}\"").unwrap();
2435            }
2436        }
2437
2438        #[derive(Debug)]
2439        struct CustomEncoderFactory;
2440
2441        impl EncoderFactory for CustomEncoderFactory {
2442            fn make_default_encoder<'a>(
2443                &self,
2444                field: &'a FieldRef,
2445                array: &'a dyn Array,
2446                _options: &'a EncoderOptions,
2447            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2448                // The point here is:
2449                // 1. You can use information from Field to determine how to do the encoding.
2450                // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array
2451                //    and thus the data type of `field` may not match the data type of `array`.
2452                let padded = field
2453                    .metadata()
2454                    .get("padded")
2455                    .map(|v| v == "true")
2456                    .unwrap_or_default();
2457                match (array.data_type(), padded) {
2458                    (DataType::Int32, true) => {
2459                        let array = array.as_primitive::<Int32Type>();
2460                        let nulls = array.nulls().cloned();
2461                        let encoder = PaddedInt32Encoder {
2462                            array: array.clone(),
2463                        };
2464                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2465                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2466                    }
2467                    _ => Ok(None),
2468                }
2469            }
2470        }
2471
2472        let to_json = |batch| {
2473            let mut buf = Vec::new();
2474            let mut writer = WriterBuilder::new()
2475                .with_encoder_factory(Arc::new(CustomEncoderFactory))
2476                .build::<_, JsonArray>(&mut buf);
2477            writer.write_batches(&[batch]).unwrap();
2478            writer.finish().unwrap();
2479            serde_json::from_slice::<Value>(&buf).unwrap()
2480        };
2481
2482        // Control case: no dictionary wrapping works as expected.
2483        let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2484        let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2485            HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2486        ));
2487        let batch = RecordBatch::try_new(
2488            Arc::new(Schema::new(vec![field.clone()])),
2489            vec![Arc::new(array)],
2490        )
2491        .unwrap();
2492
2493        let json_value = to_json(&batch);
2494
2495        let expected = json!([
2496            {"int": "00000001"},
2497            {},
2498            {"int": "00000002"},
2499        ]);
2500
2501        assert_eq!(json_value, expected);
2502
2503        // Now make a dictionary batch
2504        let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2505        array_builder.append_value(1);
2506        array_builder.append_null();
2507        array_builder.append_value(1);
2508        let array = array_builder.finish();
2509        let field = Field::new(
2510            "int",
2511            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2512            true,
2513        )
2514        .with_metadata(HashMap::from_iter(vec![(
2515            "padded".to_string(),
2516            "true".to_string(),
2517        )]));
2518        let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2519            .unwrap();
2520
2521        let json_value = to_json(&batch);
2522
2523        let expected = json!([
2524            {"int": "00000001"},
2525            {},
2526            {"int": "00000001"},
2527        ]);
2528
2529        assert_eq!(json_value, expected);
2530    }
2531}