arrow_json/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # JSON Writer
19//!
20//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
21//! JSON objects or JSON formatted byte streams.
22//!
23//! ## Writing JSON formatted byte streams
24//!
25//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
26//! [`LineDelimitedWriter`]:
27//!
28//! ```
29//! # use std::sync::Arc;
30//! # use arrow_array::{Int32Array, RecordBatch};
31//! # use arrow_schema::{DataType, Field, Schema};
32//!
33//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
34//! let a = Int32Array::from(vec![1, 2, 3]);
35//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
36//!
37//! // Write the record batch out as JSON
38//! let buf = Vec::new();
39//! let mut writer = arrow_json::LineDelimitedWriter::new(buf);
40//! writer.write_batches(&vec![&batch]).unwrap();
41//! writer.finish().unwrap();
42//!
43//! // Get the underlying buffer back,
44//! let buf = writer.into_inner();
45//! assert_eq!(r#"{"a":1}
46//! {"a":2}
47//! {"a":3}
48//!"#, String::from_utf8(buf).unwrap())
49//! ```
50//!
51//! To serialize [`RecordBatch`]es into a well formed JSON array, use
52//! [`ArrayWriter`]:
53//!
54//! ```
55//! # use std::sync::Arc;
56//! # use arrow_array::{Int32Array, RecordBatch};
57//! use arrow_schema::{DataType, Field, Schema};
58//!
59//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
60//! let a = Int32Array::from(vec![1, 2, 3]);
61//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
62//!
63//! // Write the record batch out as a JSON array
64//! let buf = Vec::new();
65//! let mut writer = arrow_json::ArrayWriter::new(buf);
66//! writer.write_batches(&vec![&batch]).unwrap();
67//! writer.finish().unwrap();
68//!
69//! // Get the underlying buffer back,
70//! let buf = writer.into_inner();
71//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
72//! ```
73//!
74//! [`LineDelimitedWriter`] and [`ArrayWriter`] will omit writing keys with null values.
75//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
76//! using a [`WriterBuilder`] to construct a [`Writer`].
77//!
78//! ## Writing to [serde_json] JSON Objects
79//!
80//! To serialize [`RecordBatch`]es into an array of
81//! [JSON](https://docs.serde.rs/serde_json/) objects you can reparse the resulting JSON string.
82//! Note that this is less efficient than using the `Writer` API.
83//!
84//! ```
85//! # use std::sync::Arc;
86//! # use arrow_array::{Int32Array, RecordBatch};
87//! # use arrow_schema::{DataType, Field, Schema};
88//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
89//! let a = Int32Array::from(vec![1, 2, 3]);
90//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
91//!
92//! // Write the record batch out as json bytes (string)
93//! let buf = Vec::new();
94//! let mut writer = arrow_json::ArrayWriter::new(buf);
95//! writer.write_batches(&vec![&batch]).unwrap();
96//! writer.finish().unwrap();
97//! let json_data = writer.into_inner();
98//!
99//! // Parse the string using serde_json
100//! use serde_json::{Map, Value};
101//! let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(json_data.as_slice()).unwrap();
102//! assert_eq!(
103//!     serde_json::Value::Object(json_rows[1].clone()),
104//!     serde_json::json!({"a": 2}),
105//! );
106//! ```
107mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullableEncoder};
116
117/// This trait defines how to format a sequence of JSON objects to a
118/// byte stream.
119pub trait JsonFormat: Debug + Default {
120    #[inline]
121    /// write any bytes needed at the start of the file to the writer
122    fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123        Ok(())
124    }
125
126    #[inline]
127    /// write any bytes needed for the start of each row
128    fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129        Ok(())
130    }
131
132    #[inline]
133    /// write any bytes needed for the end of each row
134    fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135        Ok(())
136    }
137
138    /// write any bytes needed for the start of each row
139    fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140        Ok(())
141    }
142}
143
144/// Produces JSON output with one record per line.
145///
146/// For example:
147///
148/// ```json
149/// {"foo":1}
150/// {"bar":1}
151///
152/// ```
153#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157    fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158        writer.write_all(b"\n")?;
159        Ok(())
160    }
161}
162
163/// Produces JSON output as a single JSON array.
164///
165/// For example:
166///
167/// ```json
168/// [{"foo":1},{"bar":1}]
169/// ```
170#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174    fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175        writer.write_all(b"[")?;
176        Ok(())
177    }
178
179    fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180        if !is_first_row {
181            writer.write_all(b",")?;
182        }
183        Ok(())
184    }
185
186    fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187        writer.write_all(b"]")?;
188        Ok(())
189    }
190}
191
192/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects.
193pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays.
196pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198/// JSON writer builder.
199#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203    /// Create a new builder for configuring JSON writing options.
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// # use arrow_json::{Writer, WriterBuilder};
209    /// # use arrow_json::writer::LineDelimited;
210    /// # use std::fs::File;
211    ///
212    /// fn example() -> Writer<File, LineDelimited> {
213    ///     let file = File::create("target/out.json").unwrap();
214    ///
215    ///     // create a builder that keeps keys with null values
216    ///     let builder = WriterBuilder::new().with_explicit_nulls(true);
217    ///     let writer = builder.build::<_, LineDelimited>(file);
218    ///
219    ///     writer
220    /// }
221    /// ```
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Returns `true` if this writer is configured to keep keys with null values.
227    pub fn explicit_nulls(&self) -> bool {
228        self.0.explicit_nulls()
229    }
230
231    /// Set whether to keep keys with null values, or to omit writing them.
232    ///
233    /// For example, with [`LineDelimited`] format:
234    ///
235    /// Skip nulls (set to `false`):
236    ///
237    /// ```json
238    /// {"foo":1}
239    /// {"foo":1,"bar":2}
240    /// {}
241    /// ```
242    ///
243    /// Keep nulls (set to `true`):
244    ///
245    /// ```json
246    /// {"foo":1,"bar":null}
247    /// {"foo":1,"bar":2}
248    /// {"foo":null,"bar":null}
249    /// ```
250    ///
251    /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`,
252    /// nulls will be written explicitly regardless of this setting.
253    pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254        self.0 = self.0.with_explicit_nulls(explicit_nulls);
255        self
256    }
257
258    /// Returns if this writer is configured to write structs as JSON Objects or Arrays.
259    pub fn struct_mode(&self) -> StructMode {
260        self.0.struct_mode()
261    }
262
263    /// Set the [`StructMode`] for the writer, which determines whether structs
264    /// are encoded to JSON as objects or lists. For more details refer to the
265    /// enum documentation. Default is to use `ObjectOnly`. If this is set to
266    /// `ListOnly`, nulls will be written explicitly regardless of the
267    /// `explicit_nulls` setting.
268    pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269        self.0 = self.0.with_struct_mode(struct_mode);
270        self
271    }
272
273    /// Set an encoder factory to use when creating encoders for writing JSON.
274    ///
275    /// This can be used to override how some types are encoded or to provide
276    /// a fallback for types that are not supported by the default encoder.
277    pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278        self.0 = self.0.with_encoder_factory(factory);
279        self
280    }
281
282    /// Create a new `Writer` with specified `JsonFormat` and builder options.
283    pub fn build<W, F>(self, writer: W) -> Writer<W, F>
284    where
285        W: Write,
286        F: JsonFormat,
287    {
288        Writer {
289            writer,
290            started: false,
291            finished: false,
292            format: F::default(),
293            options: self.0,
294        }
295    }
296}
297
298/// A JSON writer which serializes [`RecordBatch`]es to a stream of
299/// `u8` encoded JSON objects.
300///
301/// See the module level documentation for detailed usage and examples.
302/// The specific format of the stream is controlled by the [`JsonFormat`]
303/// type parameter.
304///
305/// By default the writer will skip writing keys with null values for
306/// backward compatibility. See [`WriterBuilder`] on how to customize
307/// this behaviour when creating a new writer.
308#[derive(Debug)]
309pub struct Writer<W, F>
310where
311    W: Write,
312    F: JsonFormat,
313{
314    /// Underlying writer to use to write bytes
315    writer: W,
316
317    /// Has the writer output any records yet?
318    started: bool,
319
320    /// Is the writer finished?
321    finished: bool,
322
323    /// Determines how the byte stream is formatted
324    format: F,
325
326    /// Controls how JSON should be encoded, e.g. whether to write explicit nulls or skip them
327    options: EncoderOptions,
328}
329
330impl<W, F> Writer<W, F>
331where
332    W: Write,
333    F: JsonFormat,
334{
335    /// Construct a new writer
336    pub fn new(writer: W) -> Self {
337        Self {
338            writer,
339            started: false,
340            finished: false,
341            format: F::default(),
342            options: EncoderOptions::default(),
343        }
344    }
345
346    /// Serialize `batch` to JSON output
347    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
348        if batch.num_rows() == 0 {
349            return Ok(());
350        }
351
352        // BufWriter uses a buffer size of 8KB
353        // We therefore double this and flush once we have more than 8KB
354        let mut buffer = Vec::with_capacity(16 * 1024);
355
356        let mut is_first_row = !self.started;
357        if !self.started {
358            self.format.start_stream(&mut buffer)?;
359            self.started = true;
360        }
361
362        let array = StructArray::from(batch.clone());
363        let field = Arc::new(Field::new_struct(
364            "",
365            batch.schema().fields().clone(),
366            false,
367        ));
368
369        let mut encoder = make_encoder(&field, &array, &self.options)?;
370
371        // Validate that the root is not nullable
372        assert!(!encoder.has_nulls(), "root cannot be nullable");
373        for idx in 0..batch.num_rows() {
374            self.format.start_row(&mut buffer, is_first_row)?;
375            is_first_row = false;
376
377            encoder.encode(idx, &mut buffer);
378            if buffer.len() > 8 * 1024 {
379                self.writer.write_all(&buffer)?;
380                buffer.clear();
381            }
382            self.format.end_row(&mut buffer)?;
383        }
384
385        if !buffer.is_empty() {
386            self.writer.write_all(&buffer)?;
387        }
388
389        Ok(())
390    }
391
392    /// Serialize `batches` to JSON output
393    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
394        for b in batches {
395            self.write(b)?;
396        }
397        Ok(())
398    }
399
400    /// Finishes the output stream. This function must be called after
401    /// all record batches have been produced. (e.g. producing the final `']'` if writing
402    /// arrays.
403    pub fn finish(&mut self) -> Result<(), ArrowError> {
404        if !self.started {
405            self.format.start_stream(&mut self.writer)?;
406            self.started = true;
407        }
408        if !self.finished {
409            self.format.end_stream(&mut self.writer)?;
410            self.finished = true;
411        }
412
413        Ok(())
414    }
415
416    /// Unwraps this `Writer<W>`, returning the underlying writer
417    pub fn into_inner(self) -> W {
418        self.writer
419    }
420}
421
422impl<W, F> RecordBatchWriter for Writer<W, F>
423where
424    W: Write,
425    F: JsonFormat,
426{
427    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
428        self.write(batch)
429    }
430
431    fn close(mut self) -> Result<(), ArrowError> {
432        self.finish()
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use core::str;
439    use std::collections::HashMap;
440    use std::fs::{read_to_string, File};
441    use std::io::{BufReader, Seek};
442    use std::sync::Arc;
443
444    use arrow_array::cast::AsArray;
445    use serde_json::{json, Value};
446
447    use super::LineDelimited;
448    use super::{Encoder, WriterBuilder};
449    use arrow_array::builder::*;
450    use arrow_array::types::*;
451    use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice};
452    use arrow_data::ArrayData;
453
454    use crate::reader::*;
455
456    use super::*;
457
458    /// Asserts that the NDJSON `input` is semantically identical to `expected`
459    fn assert_json_eq(input: &[u8], expected: &str) {
460        let expected: Vec<Option<Value>> = expected
461            .split('\n')
462            .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
463            .collect();
464
465        let actual: Vec<Option<Value>> = input
466            .split(|b| *b == b'\n')
467            .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
468            .collect();
469
470        assert_eq!(actual, expected);
471    }
472
473    #[test]
474    fn write_simple_rows() {
475        let schema = Schema::new(vec![
476            Field::new("c1", DataType::Int32, true),
477            Field::new("c2", DataType::Utf8, true),
478        ]);
479
480        let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
481        let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
482
483        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
484
485        let mut buf = Vec::new();
486        {
487            let mut writer = LineDelimitedWriter::new(&mut buf);
488            writer.write_batches(&[&batch]).unwrap();
489        }
490
491        assert_json_eq(
492            &buf,
493            r#"{"c1":1,"c2":"a"}
494{"c1":2,"c2":"b"}
495{"c1":3,"c2":"c"}
496{"c2":"d"}
497{"c1":5}
498"#,
499        );
500    }
501
502    #[test]
503    fn write_large_utf8_and_utf8_view() {
504        let schema = Schema::new(vec![
505            Field::new("c1", DataType::Utf8, true),
506            Field::new("c2", DataType::LargeUtf8, true),
507            Field::new("c3", DataType::Utf8View, true),
508        ]);
509
510        let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
511        let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
512        let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
513
514        let batch = RecordBatch::try_new(
515            Arc::new(schema),
516            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
517        )
518        .unwrap();
519
520        let mut buf = Vec::new();
521        {
522            let mut writer = LineDelimitedWriter::new(&mut buf);
523            writer.write_batches(&[&batch]).unwrap();
524        }
525
526        assert_json_eq(
527            &buf,
528            r#"{"c1":"a","c2":"a","c3":"a"}
529{"c2":"b","c3":"b"}
530{"c1":"c"}
531{"c1":"d","c2":"d","c3":"d"}
532{}
533"#,
534        );
535    }
536
537    #[test]
538    fn write_dictionary() {
539        let schema = Schema::new(vec![
540            Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
541            Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
542        ]);
543
544        let a: DictionaryArray<Int32Type> = vec![
545            Some("cupcakes"),
546            Some("foo"),
547            Some("foo"),
548            None,
549            Some("cupcakes"),
550        ]
551        .into_iter()
552        .collect();
553        let b: DictionaryArray<Int8Type> =
554            vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
555                .into_iter()
556                .collect();
557
558        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
559
560        let mut buf = Vec::new();
561        {
562            let mut writer = LineDelimitedWriter::new(&mut buf);
563            writer.write_batches(&[&batch]).unwrap();
564        }
565
566        assert_json_eq(
567            &buf,
568            r#"{"c1":"cupcakes","c2":"sdsd"}
569{"c1":"foo","c2":"sdsd"}
570{"c1":"foo"}
571{"c2":"sd"}
572{"c1":"cupcakes","c2":"sdsd"}
573"#,
574        );
575    }
576
577    #[test]
578    fn write_list_of_dictionary() {
579        let dict_field = Arc::new(Field::new_dictionary(
580            "item",
581            DataType::Int32,
582            DataType::Utf8,
583            true,
584        ));
585        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
586
587        let dict_array: DictionaryArray<Int32Type> =
588            vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
589                .into_iter()
590                .collect();
591        let list_array = LargeListArray::try_new(
592            dict_field,
593            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
594            Arc::new(dict_array),
595            Some(NullBuffer::from_iter([true, true, false, true])),
596        )
597        .unwrap();
598
599        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
600
601        let mut buf = Vec::new();
602        {
603            let mut writer = LineDelimitedWriter::new(&mut buf);
604            writer.write_batches(&[&batch]).unwrap();
605        }
606
607        assert_json_eq(
608            &buf,
609            r#"{"l":["a","b","c"]}
610{"l":["a",null]}
611{}
612{"l":["c"]}
613"#,
614        );
615    }
616
617    #[test]
618    fn write_list_of_dictionary_large_values() {
619        let dict_field = Arc::new(Field::new_dictionary(
620            "item",
621            DataType::Int32,
622            DataType::LargeUtf8,
623            true,
624        ));
625        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
626
627        let keys = PrimitiveArray::<Int32Type>::from(vec![
628            Some(0),
629            Some(1),
630            Some(2),
631            Some(0),
632            None,
633            Some(2),
634        ]);
635        let values = LargeStringArray::from(vec!["a", "b", "c"]);
636        let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
637
638        let list_array = LargeListArray::try_new(
639            dict_field,
640            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
641            Arc::new(dict_array),
642            Some(NullBuffer::from_iter([true, true, false, true])),
643        )
644        .unwrap();
645
646        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
647
648        let mut buf = Vec::new();
649        {
650            let mut writer = LineDelimitedWriter::new(&mut buf);
651            writer.write_batches(&[&batch]).unwrap();
652        }
653
654        assert_json_eq(
655            &buf,
656            r#"{"l":["a","b","c"]}
657{"l":["a",null]}
658{}
659{"l":["c"]}
660"#,
661        );
662    }
663
664    #[test]
665    fn write_timestamps() {
666        let ts_string = "2018-11-13T17:11:10.011375885995";
667        let ts_nanos = ts_string
668            .parse::<chrono::NaiveDateTime>()
669            .unwrap()
670            .and_utc()
671            .timestamp_nanos_opt()
672            .unwrap();
673        let ts_micros = ts_nanos / 1000;
674        let ts_millis = ts_micros / 1000;
675        let ts_secs = ts_millis / 1000;
676
677        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
678        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
679        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
680        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
681        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
682
683        let schema = Schema::new(vec![
684            Field::new("nanos", arr_nanos.data_type().clone(), true),
685            Field::new("micros", arr_micros.data_type().clone(), true),
686            Field::new("millis", arr_millis.data_type().clone(), true),
687            Field::new("secs", arr_secs.data_type().clone(), true),
688            Field::new("name", arr_names.data_type().clone(), true),
689        ]);
690        let schema = Arc::new(schema);
691
692        let batch = RecordBatch::try_new(
693            schema,
694            vec![
695                Arc::new(arr_nanos),
696                Arc::new(arr_micros),
697                Arc::new(arr_millis),
698                Arc::new(arr_secs),
699                Arc::new(arr_names),
700            ],
701        )
702        .unwrap();
703
704        let mut buf = Vec::new();
705        {
706            let mut writer = LineDelimitedWriter::new(&mut buf);
707            writer.write_batches(&[&batch]).unwrap();
708        }
709
710        assert_json_eq(
711            &buf,
712            r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
713{"name":"b"}
714"#,
715        );
716    }
717
718    #[test]
719    fn write_timestamps_with_tz() {
720        let ts_string = "2018-11-13T17:11:10.011375885995";
721        let ts_nanos = ts_string
722            .parse::<chrono::NaiveDateTime>()
723            .unwrap()
724            .and_utc()
725            .timestamp_nanos_opt()
726            .unwrap();
727        let ts_micros = ts_nanos / 1000;
728        let ts_millis = ts_micros / 1000;
729        let ts_secs = ts_millis / 1000;
730
731        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
732        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
733        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
734        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
735        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
736
737        let tz = "+00:00";
738
739        let arr_nanos = arr_nanos.with_timezone(tz);
740        let arr_micros = arr_micros.with_timezone(tz);
741        let arr_millis = arr_millis.with_timezone(tz);
742        let arr_secs = arr_secs.with_timezone(tz);
743
744        let schema = Schema::new(vec![
745            Field::new("nanos", arr_nanos.data_type().clone(), true),
746            Field::new("micros", arr_micros.data_type().clone(), true),
747            Field::new("millis", arr_millis.data_type().clone(), true),
748            Field::new("secs", arr_secs.data_type().clone(), true),
749            Field::new("name", arr_names.data_type().clone(), true),
750        ]);
751        let schema = Arc::new(schema);
752
753        let batch = RecordBatch::try_new(
754            schema,
755            vec![
756                Arc::new(arr_nanos),
757                Arc::new(arr_micros),
758                Arc::new(arr_millis),
759                Arc::new(arr_secs),
760                Arc::new(arr_names),
761            ],
762        )
763        .unwrap();
764
765        let mut buf = Vec::new();
766        {
767            let mut writer = LineDelimitedWriter::new(&mut buf);
768            writer.write_batches(&[&batch]).unwrap();
769        }
770
771        assert_json_eq(
772            &buf,
773            r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
774{"name":"b"}
775"#,
776        );
777    }
778
779    #[test]
780    fn write_dates() {
781        let ts_string = "2018-11-13T17:11:10.011375885995";
782        let ts_millis = ts_string
783            .parse::<chrono::NaiveDateTime>()
784            .unwrap()
785            .and_utc()
786            .timestamp_millis();
787
788        let arr_date32 = Date32Array::from(vec![
789            Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
790            None,
791        ]);
792        let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
793        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
794
795        let schema = Schema::new(vec![
796            Field::new("date32", arr_date32.data_type().clone(), true),
797            Field::new("date64", arr_date64.data_type().clone(), true),
798            Field::new("name", arr_names.data_type().clone(), false),
799        ]);
800        let schema = Arc::new(schema);
801
802        let batch = RecordBatch::try_new(
803            schema,
804            vec![
805                Arc::new(arr_date32),
806                Arc::new(arr_date64),
807                Arc::new(arr_names),
808            ],
809        )
810        .unwrap();
811
812        let mut buf = Vec::new();
813        {
814            let mut writer = LineDelimitedWriter::new(&mut buf);
815            writer.write_batches(&[&batch]).unwrap();
816        }
817
818        assert_json_eq(
819            &buf,
820            r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
821{"name":"b"}
822"#,
823        );
824    }
825
826    #[test]
827    fn write_times() {
828        let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
829        let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
830        let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
831        let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
832        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
833
834        let schema = Schema::new(vec![
835            Field::new("time32sec", arr_time32sec.data_type().clone(), true),
836            Field::new("time32msec", arr_time32msec.data_type().clone(), true),
837            Field::new("time64usec", arr_time64usec.data_type().clone(), true),
838            Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
839            Field::new("name", arr_names.data_type().clone(), true),
840        ]);
841        let schema = Arc::new(schema);
842
843        let batch = RecordBatch::try_new(
844            schema,
845            vec![
846                Arc::new(arr_time32sec),
847                Arc::new(arr_time32msec),
848                Arc::new(arr_time64usec),
849                Arc::new(arr_time64nsec),
850                Arc::new(arr_names),
851            ],
852        )
853        .unwrap();
854
855        let mut buf = Vec::new();
856        {
857            let mut writer = LineDelimitedWriter::new(&mut buf);
858            writer.write_batches(&[&batch]).unwrap();
859        }
860
861        assert_json_eq(
862            &buf,
863            r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
864{"name":"b"}
865"#,
866        );
867    }
868
869    #[test]
870    fn write_durations() {
871        let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
872        let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
873        let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
874        let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
875        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
876
877        let schema = Schema::new(vec![
878            Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
879            Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
880            Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
881            Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
882            Field::new("name", arr_names.data_type().clone(), true),
883        ]);
884        let schema = Arc::new(schema);
885
886        let batch = RecordBatch::try_new(
887            schema,
888            vec![
889                Arc::new(arr_durationsec),
890                Arc::new(arr_durationmsec),
891                Arc::new(arr_durationusec),
892                Arc::new(arr_durationnsec),
893                Arc::new(arr_names),
894            ],
895        )
896        .unwrap();
897
898        let mut buf = Vec::new();
899        {
900            let mut writer = LineDelimitedWriter::new(&mut buf);
901            writer.write_batches(&[&batch]).unwrap();
902        }
903
904        assert_json_eq(
905            &buf,
906            r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
907{"name":"b"}
908"#,
909        );
910    }
911
912    #[test]
913    fn write_nested_structs() {
914        let schema = Schema::new(vec![
915            Field::new(
916                "c1",
917                DataType::Struct(Fields::from(vec![
918                    Field::new("c11", DataType::Int32, true),
919                    Field::new(
920                        "c12",
921                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
922                        false,
923                    ),
924                ])),
925                false,
926            ),
927            Field::new("c2", DataType::Utf8, false),
928        ]);
929
930        let c1 = StructArray::from(vec![
931            (
932                Arc::new(Field::new("c11", DataType::Int32, true)),
933                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
934            ),
935            (
936                Arc::new(Field::new(
937                    "c12",
938                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
939                    false,
940                )),
941                Arc::new(StructArray::from(vec![(
942                    Arc::new(Field::new("c121", DataType::Utf8, false)),
943                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
944                )])) as ArrayRef,
945            ),
946        ]);
947        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
948
949        let batch =
950            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
951
952        let mut buf = Vec::new();
953        {
954            let mut writer = LineDelimitedWriter::new(&mut buf);
955            writer.write_batches(&[&batch]).unwrap();
956        }
957
958        assert_json_eq(
959            &buf,
960            r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
961{"c1":{"c12":{"c121":"f"}},"c2":"b"}
962{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
963"#,
964        );
965    }
966
967    #[test]
968    fn write_struct_with_list_field() {
969        let field_c1 = Field::new(
970            "c1",
971            DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
972            false,
973        );
974        let field_c2 = Field::new("c2", DataType::Int32, false);
975        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
976
977        let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
978        // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"]
979        let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
980        let a_list_data = ArrayData::builder(field_c1.data_type().clone())
981            .len(5)
982            .add_buffer(a_value_offsets)
983            .add_child_data(a_values.into_data())
984            .null_bit_buffer(Some(Buffer::from([0b00011111])))
985            .build()
986            .unwrap();
987        let a = ListArray::from(a_list_data);
988
989        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
990
991        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
992
993        let mut buf = Vec::new();
994        {
995            let mut writer = LineDelimitedWriter::new(&mut buf);
996            writer.write_batches(&[&batch]).unwrap();
997        }
998
999        assert_json_eq(
1000            &buf,
1001            r#"{"c1":["a","a1"],"c2":1}
1002{"c1":["b"],"c2":2}
1003{"c1":["c"],"c2":3}
1004{"c1":["d"],"c2":4}
1005{"c1":["e"],"c2":5}
1006"#,
1007        );
1008    }
1009
1010    #[test]
1011    fn write_nested_list() {
1012        let list_inner_type = Field::new(
1013            "a",
1014            DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
1015            false,
1016        );
1017        let field_c1 = Field::new(
1018            "c1",
1019            DataType::List(Arc::new(list_inner_type.clone())),
1020            false,
1021        );
1022        let field_c2 = Field::new("c2", DataType::Utf8, true);
1023        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1024
1025        // list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
1026        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1027
1028        let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1029        // Construct a list array from the above two
1030        let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1031            .len(3)
1032            .add_buffer(a_value_offsets)
1033            .null_bit_buffer(Some(Buffer::from([0b00000111])))
1034            .add_child_data(a_values.into_data())
1035            .build()
1036            .unwrap();
1037
1038        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1039        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1040            .len(3)
1041            .add_buffer(c1_value_offsets)
1042            .add_child_data(a_list_data)
1043            .build()
1044            .unwrap();
1045
1046        let c1 = ListArray::from(c1_list_data);
1047        let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1048
1049        let batch =
1050            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1051
1052        let mut buf = Vec::new();
1053        {
1054            let mut writer = LineDelimitedWriter::new(&mut buf);
1055            writer.write_batches(&[&batch]).unwrap();
1056        }
1057
1058        assert_json_eq(
1059            &buf,
1060            r#"{"c1":[[1,2],[3]],"c2":"foo"}
1061{"c1":[],"c2":"bar"}
1062{"c1":[[4,5,6]]}
1063"#,
1064        );
1065    }
1066
1067    #[test]
1068    fn write_list_of_struct() {
1069        let field_c1 = Field::new(
1070            "c1",
1071            DataType::List(Arc::new(Field::new(
1072                "s",
1073                DataType::Struct(Fields::from(vec![
1074                    Field::new("c11", DataType::Int32, true),
1075                    Field::new(
1076                        "c12",
1077                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1078                        false,
1079                    ),
1080                ])),
1081                false,
1082            ))),
1083            true,
1084        );
1085        let field_c2 = Field::new("c2", DataType::Int32, false);
1086        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1087
1088        let struct_values = StructArray::from(vec![
1089            (
1090                Arc::new(Field::new("c11", DataType::Int32, true)),
1091                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1092            ),
1093            (
1094                Arc::new(Field::new(
1095                    "c12",
1096                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1097                    false,
1098                )),
1099                Arc::new(StructArray::from(vec![(
1100                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1101                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1102                )])) as ArrayRef,
1103            ),
1104        ]);
1105
1106        // list column rows (c1):
1107        // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}],
1108        // null,
1109        // [{"c11": 5, "c12": {"c121": "g"}}]
1110        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1111        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1112            .len(3)
1113            .add_buffer(c1_value_offsets)
1114            .add_child_data(struct_values.into_data())
1115            .null_bit_buffer(Some(Buffer::from([0b00000101])))
1116            .build()
1117            .unwrap();
1118        let c1 = ListArray::from(c1_list_data);
1119
1120        let c2 = Int32Array::from(vec![1, 2, 3]);
1121
1122        let batch =
1123            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1124
1125        let mut buf = Vec::new();
1126        {
1127            let mut writer = LineDelimitedWriter::new(&mut buf);
1128            writer.write_batches(&[&batch]).unwrap();
1129        }
1130
1131        assert_json_eq(
1132            &buf,
1133            r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1134{"c2":2}
1135{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1136"#,
1137        );
1138    }
1139
1140    fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1141        let file = File::open(test_file).unwrap();
1142        let mut reader = BufReader::new(file);
1143        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1144        reader.rewind().unwrap();
1145
1146        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1147        let mut reader = builder.build(reader).unwrap();
1148        let batch = reader.next().unwrap().unwrap();
1149
1150        let mut buf = Vec::new();
1151        {
1152            if remove_nulls {
1153                let mut writer = LineDelimitedWriter::new(&mut buf);
1154                writer.write_batches(&[&batch]).unwrap();
1155            } else {
1156                let mut writer = WriterBuilder::new()
1157                    .with_explicit_nulls(true)
1158                    .build::<_, LineDelimited>(&mut buf);
1159                writer.write_batches(&[&batch]).unwrap();
1160            }
1161        }
1162
1163        let result = str::from_utf8(&buf).unwrap();
1164        let expected = read_to_string(test_file).unwrap();
1165        for (r, e) in result.lines().zip(expected.lines()) {
1166            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1167            if remove_nulls {
1168                // remove null value from object to make comparison consistent:
1169                if let Value::Object(obj) = expected_json {
1170                    expected_json =
1171                        Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1172                }
1173            }
1174            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1175        }
1176    }
1177
1178    #[test]
1179    fn write_basic_rows() {
1180        test_write_for_file("test/data/basic.json", true);
1181    }
1182
1183    #[test]
1184    fn write_arrays() {
1185        test_write_for_file("test/data/arrays.json", true);
1186    }
1187
1188    #[test]
1189    fn write_basic_nulls() {
1190        test_write_for_file("test/data/basic_nulls.json", true);
1191    }
1192
1193    #[test]
1194    fn write_nested_with_nulls() {
1195        test_write_for_file("test/data/nested_with_nulls.json", false);
1196    }
1197
1198    #[test]
1199    fn json_line_writer_empty() {
1200        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1201        writer.finish().unwrap();
1202        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1203    }
1204
1205    #[test]
1206    fn json_array_writer_empty() {
1207        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1208        writer.finish().unwrap();
1209        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1210    }
1211
1212    #[test]
1213    fn json_line_writer_empty_batch() {
1214        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1215
1216        let array = Int32Array::from(Vec::<i32>::new());
1217        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1218        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1219
1220        writer.write(&batch).unwrap();
1221        writer.finish().unwrap();
1222        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1223    }
1224
1225    #[test]
1226    fn json_array_writer_empty_batch() {
1227        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1228
1229        let array = Int32Array::from(Vec::<i32>::new());
1230        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1231        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1232
1233        writer.write(&batch).unwrap();
1234        writer.finish().unwrap();
1235        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1236    }
1237
1238    #[test]
1239    fn json_struct_array_nulls() {
1240        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1241            Some(vec![Some(1), Some(2)]),
1242            Some(vec![None]),
1243            Some(vec![]),
1244            Some(vec![Some(3), None]), // masked for a
1245            Some(vec![Some(4), Some(5)]),
1246            None, // masked for a
1247            None,
1248        ]);
1249
1250        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1251        let array = Arc::new(inner) as ArrayRef;
1252        let struct_array_a = StructArray::from((
1253            vec![(field.clone(), array.clone())],
1254            Buffer::from([0b01010111]),
1255        ));
1256        let struct_array_b = StructArray::from(vec![(field, array)]);
1257
1258        let schema = Schema::new(vec![
1259            Field::new_struct("a", struct_array_a.fields().clone(), true),
1260            Field::new_struct("b", struct_array_b.fields().clone(), true),
1261        ]);
1262
1263        let batch = RecordBatch::try_new(
1264            Arc::new(schema),
1265            vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1266        )
1267        .unwrap();
1268
1269        let mut buf = Vec::new();
1270        {
1271            let mut writer = LineDelimitedWriter::new(&mut buf);
1272            writer.write_batches(&[&batch]).unwrap();
1273        }
1274
1275        assert_json_eq(
1276            &buf,
1277            r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1278{"a":{"list":[null]},"b":{"list":[null]}}
1279{"a":{"list":[]},"b":{"list":[]}}
1280{"b":{"list":[3,null]}}
1281{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1282{"b":{}}
1283{"a":{},"b":{}}
1284"#,
1285        );
1286    }
1287
1288    #[test]
1289    fn json_writer_map() {
1290        let keys_array = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1291        let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1292
1293        let keys = Arc::new(Field::new("keys", DataType::Utf8, false));
1294        let values = Arc::new(Field::new("values", DataType::Int64, false));
1295        let entry_struct = StructArray::from(vec![
1296            (keys, Arc::new(keys_array) as ArrayRef),
1297            (values, Arc::new(values_array) as ArrayRef),
1298        ]);
1299
1300        let map_data_type = DataType::Map(
1301            Arc::new(Field::new(
1302                "entries",
1303                entry_struct.data_type().clone(),
1304                false,
1305            )),
1306            false,
1307        );
1308
1309        // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}, {"quux": 50}, {}]
1310        let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1311        let valid_buffer = Buffer::from([0b00111101]);
1312
1313        let map_data = ArrayData::builder(map_data_type.clone())
1314            .len(6)
1315            .null_bit_buffer(Some(valid_buffer))
1316            .add_buffer(entry_offsets)
1317            .add_child_data(entry_struct.into_data())
1318            .build()
1319            .unwrap();
1320
1321        let map = MapArray::from(map_data);
1322
1323        let map_field = Field::new("map", map_data_type, true);
1324        let schema = Arc::new(Schema::new(vec![map_field]));
1325
1326        let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1327
1328        let mut buf = Vec::new();
1329        {
1330            let mut writer = LineDelimitedWriter::new(&mut buf);
1331            writer.write_batches(&[&batch]).unwrap();
1332        }
1333
1334        assert_json_eq(
1335            &buf,
1336            r#"{"map":{"foo":10}}
1337{}
1338{"map":{}}
1339{"map":{"bar":20,"baz":30,"qux":40}}
1340{"map":{"quux":50}}
1341{"map":{}}
1342"#,
1343        );
1344    }
1345
1346    #[test]
1347    fn test_write_single_batch() {
1348        let test_file = "test/data/basic.json";
1349        let file = File::open(test_file).unwrap();
1350        let mut reader = BufReader::new(file);
1351        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1352        reader.rewind().unwrap();
1353
1354        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1355        let mut reader = builder.build(reader).unwrap();
1356        let batch = reader.next().unwrap().unwrap();
1357
1358        let mut buf = Vec::new();
1359        {
1360            let mut writer = LineDelimitedWriter::new(&mut buf);
1361            writer.write(&batch).unwrap();
1362        }
1363
1364        let result = str::from_utf8(&buf).unwrap();
1365        let expected = read_to_string(test_file).unwrap();
1366        for (r, e) in result.lines().zip(expected.lines()) {
1367            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1368            // remove null value from object to make comparison consistent:
1369            if let Value::Object(obj) = expected_json {
1370                expected_json =
1371                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1372            }
1373            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1374        }
1375    }
1376
1377    #[test]
1378    fn test_write_multi_batches() {
1379        let test_file = "test/data/basic.json";
1380
1381        let schema = SchemaRef::new(Schema::new(vec![
1382            Field::new("a", DataType::Int64, true),
1383            Field::new("b", DataType::Float64, true),
1384            Field::new("c", DataType::Boolean, true),
1385            Field::new("d", DataType::Utf8, true),
1386            Field::new("e", DataType::Utf8, true),
1387            Field::new("f", DataType::Utf8, true),
1388            Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1389            Field::new("h", DataType::Float16, true),
1390        ]));
1391
1392        let mut reader = ReaderBuilder::new(schema.clone())
1393            .build(BufReader::new(File::open(test_file).unwrap()))
1394            .unwrap();
1395        let batch = reader.next().unwrap().unwrap();
1396
1397        // test batches = an empty batch + 2 same batches, finally result should be eq to 2 same batches
1398        let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1399
1400        let mut buf = Vec::new();
1401        {
1402            let mut writer = LineDelimitedWriter::new(&mut buf);
1403            writer.write_batches(&batches).unwrap();
1404        }
1405
1406        let result = str::from_utf8(&buf).unwrap();
1407        let expected = read_to_string(test_file).unwrap();
1408        // result is eq to 2 same batches
1409        let expected = format!("{expected}\n{expected}");
1410        for (r, e) in result.lines().zip(expected.lines()) {
1411            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1412            // remove null value from object to make comparison consistent:
1413            if let Value::Object(obj) = expected_json {
1414                expected_json =
1415                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1416            }
1417            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1418        }
1419    }
1420
1421    #[test]
1422    fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1423        fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1424            let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1425                Some(vec![None, None, None]),
1426                Some(vec![Some(1), Some(2), Some(3)]),
1427                None,
1428                Some(vec![None, None, None]),
1429            ]));
1430            let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1431            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1432            (array, field)
1433        }
1434
1435        fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1436            let array = Arc::new(DictionaryArray::from_iter(vec![
1437                Some("cupcakes"),
1438                None,
1439                Some("bear"),
1440                Some("kuma"),
1441            ]));
1442            let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1443            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1444            (array, field)
1445        }
1446
1447        fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1448            let string_builder = StringBuilder::new();
1449            let int_builder = Int64Builder::new();
1450            let mut builder = MapBuilder::new(None, string_builder, int_builder);
1451
1452            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1453            builder.keys().append_value("foo");
1454            builder.values().append_value(10);
1455            builder.append(true).unwrap();
1456
1457            builder.append(false).unwrap();
1458
1459            builder.append(true).unwrap();
1460
1461            builder.keys().append_value("bar");
1462            builder.values().append_value(20);
1463            builder.keys().append_value("baz");
1464            builder.values().append_value(30);
1465            builder.keys().append_value("qux");
1466            builder.values().append_value(40);
1467            builder.append(true).unwrap();
1468
1469            let array = Arc::new(builder.finish());
1470            let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1471            (array, field)
1472        }
1473
1474        fn root_list() -> (Arc<ListArray>, Field) {
1475            let struct_array = StructArray::from(vec![
1476                (
1477                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1478                    Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1479                ),
1480                (
1481                    Arc::new(Field::new("int32", DataType::Int32, true)),
1482                    Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1483                ),
1484            ]);
1485
1486            let field = Field::new_list(
1487                "list",
1488                Field::new("struct", struct_array.data_type().clone(), true),
1489                true,
1490            );
1491
1492            // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1493            let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1494            let data = ArrayData::builder(field.data_type().clone())
1495                .len(4)
1496                .add_buffer(entry_offsets)
1497                .add_child_data(struct_array.into_data())
1498                .null_bit_buffer(Some([0b00000101].into()))
1499                .build()
1500                .unwrap();
1501            let array = Arc::new(ListArray::from(data));
1502            (array, field)
1503        }
1504
1505        let (nested_list_array, nested_list_field) = nested_list();
1506        let (nested_dict_array, nested_dict_field) = nested_dict();
1507        let (nested_map_array, nested_map_field) = nested_map();
1508        let (root_list_array, root_list_field) = root_list();
1509
1510        let schema = Schema::new(vec![
1511            Field::new("date", DataType::Date32, true),
1512            Field::new("null", DataType::Null, true),
1513            Field::new_struct(
1514                "struct",
1515                vec![
1516                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1517                    nested_list_field.clone(),
1518                    nested_dict_field.clone(),
1519                    nested_map_field.clone(),
1520                ],
1521                true,
1522            ),
1523            root_list_field,
1524        ]);
1525
1526        let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1527        let arr_null = NullArray::new(4);
1528        let arr_struct = StructArray::from(vec![
1529            // [{"utf8":"a"},{"utf8":null},{"utf8":null},{"utf8":"b"}]
1530            (
1531                Arc::new(Field::new("utf8", DataType::Utf8, true)),
1532                Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1533            ),
1534            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1535            (nested_list_field, nested_list_array as ArrayRef),
1536            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1537            (nested_dict_field, nested_dict_array as ArrayRef),
1538            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1539            (nested_map_field, nested_map_array as ArrayRef),
1540        ]);
1541
1542        let batch = RecordBatch::try_new(
1543            Arc::new(schema),
1544            vec![
1545                // [{"date":"1970-01-01"},{"date":null},{"date":"1970-01-02"},{"date":null}]
1546                Arc::new(arr_date32),
1547                // [{"null":null},{"null":null},{"null":null},{"null":null}]
1548                Arc::new(arr_null),
1549                Arc::new(arr_struct),
1550                // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1551                root_list_array,
1552            ],
1553        )?;
1554
1555        let mut buf = Vec::new();
1556        {
1557            let mut writer = WriterBuilder::new()
1558                .with_explicit_nulls(true)
1559                .build::<_, JsonArray>(&mut buf);
1560            writer.write_batches(&[&batch])?;
1561            writer.finish()?;
1562        }
1563
1564        let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1565        let expected = serde_json::from_value::<Vec<Value>>(json!([
1566          {
1567            "date": "1970-01-01",
1568            "list": [
1569              {
1570                "int32": 1,
1571                "utf8": "a"
1572              },
1573              {
1574                "int32": null,
1575                "utf8": "b"
1576              }
1577            ],
1578            "null": null,
1579            "struct": {
1580              "dict": "cupcakes",
1581              "list": [
1582                null,
1583                null,
1584                null
1585              ],
1586              "map": {
1587                "foo": 10
1588              },
1589              "utf8": "a"
1590            }
1591          },
1592          {
1593            "date": null,
1594            "list": null,
1595            "null": null,
1596            "struct": {
1597              "dict": null,
1598              "list": [
1599                1,
1600                2,
1601                3
1602              ],
1603              "map": null,
1604              "utf8": null
1605            }
1606          },
1607          {
1608            "date": "1970-01-02",
1609            "list": [
1610              {
1611                "int32": 5,
1612                "utf8": null
1613              }
1614            ],
1615            "null": null,
1616            "struct": {
1617              "dict": "bear",
1618              "list": null,
1619              "map": {},
1620              "utf8": null
1621            }
1622          },
1623          {
1624            "date": null,
1625            "list": null,
1626            "null": null,
1627            "struct": {
1628              "dict": "kuma",
1629              "list": [
1630                null,
1631                null,
1632                null
1633              ],
1634              "map": {
1635                "bar": 20,
1636                "baz": 30,
1637                "qux": 40
1638              },
1639              "utf8": "b"
1640            }
1641          }
1642        ]))
1643        .unwrap();
1644
1645        assert_eq!(actual, expected);
1646
1647        Ok(())
1648    }
1649
1650    fn binary_encoding_test<O: OffsetSizeTrait>() {
1651        // set up schema
1652        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1653            "bytes",
1654            GenericBinaryType::<O>::DATA_TYPE,
1655            true,
1656        )]));
1657
1658        // build record batch:
1659        let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1660        let values = [Some(b"Ned Flanders"), None, Some(b"Troy McClure")];
1661        for value in values {
1662            match value {
1663                Some(v) => builder.append_value(v),
1664                None => builder.append_null(),
1665            }
1666        }
1667        let array = Arc::new(builder.finish()) as ArrayRef;
1668        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1669
1670        // encode and check JSON with explicit nulls:
1671        {
1672            let mut buf = Vec::new();
1673            let json_value: Value = {
1674                let mut writer = WriterBuilder::new()
1675                    .with_explicit_nulls(true)
1676                    .build::<_, JsonArray>(&mut buf);
1677                writer.write(&batch).unwrap();
1678                writer.close().unwrap();
1679                serde_json::from_slice(&buf).unwrap()
1680            };
1681
1682            assert_eq!(
1683                json!([
1684                    {
1685                        "bytes": "4e656420466c616e64657273"
1686                    },
1687                    {
1688                        "bytes": null // the explicit null
1689                    },
1690                    {
1691                        "bytes": "54726f79204d63436c757265"
1692                    }
1693                ]),
1694                json_value,
1695            );
1696        }
1697
1698        // encode and check JSON with no explicit nulls:
1699        {
1700            let mut buf = Vec::new();
1701            let json_value: Value = {
1702                // explicit nulls are off by default, so we don't need
1703                // to set that when creating the writer:
1704                let mut writer = ArrayWriter::new(&mut buf);
1705                writer.write(&batch).unwrap();
1706                writer.close().unwrap();
1707                serde_json::from_slice(&buf).unwrap()
1708            };
1709
1710            assert_eq!(
1711                json!([
1712                    {
1713                        "bytes": "4e656420466c616e64657273"
1714                    },
1715                    {}, // empty because nulls are omitted
1716                    {
1717                        "bytes": "54726f79204d63436c757265"
1718                    }
1719                ]),
1720                json_value
1721            );
1722        }
1723    }
1724
1725    #[test]
1726    fn test_writer_binary() {
1727        // Binary:
1728        binary_encoding_test::<i32>();
1729        // LargeBinary:
1730        binary_encoding_test::<i64>();
1731    }
1732
1733    #[test]
1734    fn test_writer_fixed_size_binary() {
1735        // set up schema:
1736        let size = 11;
1737        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1738            "bytes",
1739            DataType::FixedSizeBinary(size),
1740            true,
1741        )]));
1742
1743        // build record batch:
1744        let mut builder = FixedSizeBinaryBuilder::new(size);
1745        let values = [Some(b"hello world"), None, Some(b"summer rain")];
1746        for value in values {
1747            match value {
1748                Some(v) => builder.append_value(v).unwrap(),
1749                None => builder.append_null(),
1750            }
1751        }
1752        let array = Arc::new(builder.finish()) as ArrayRef;
1753        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1754
1755        // encode and check JSON with explicit nulls:
1756        {
1757            let mut buf = Vec::new();
1758            let json_value: Value = {
1759                let mut writer = WriterBuilder::new()
1760                    .with_explicit_nulls(true)
1761                    .build::<_, JsonArray>(&mut buf);
1762                writer.write(&batch).unwrap();
1763                writer.close().unwrap();
1764                serde_json::from_slice(&buf).unwrap()
1765            };
1766
1767            assert_eq!(
1768                json!([
1769                    {
1770                        "bytes": "68656c6c6f20776f726c64"
1771                    },
1772                    {
1773                        "bytes": null // the explicit null
1774                    },
1775                    {
1776                        "bytes": "73756d6d6572207261696e"
1777                    }
1778                ]),
1779                json_value,
1780            );
1781        }
1782        // encode and check JSON with no explicit nulls:
1783        {
1784            let mut buf = Vec::new();
1785            let json_value: Value = {
1786                // explicit nulls are off by default, so we don't need
1787                // to set that when creating the writer:
1788                let mut writer = ArrayWriter::new(&mut buf);
1789                writer.write(&batch).unwrap();
1790                writer.close().unwrap();
1791                serde_json::from_slice(&buf).unwrap()
1792            };
1793
1794            assert_eq!(
1795                json!([
1796                    {
1797                        "bytes": "68656c6c6f20776f726c64"
1798                    },
1799                    {}, // empty because nulls are omitted
1800                    {
1801                        "bytes": "73756d6d6572207261696e"
1802                    }
1803                ]),
1804                json_value,
1805            );
1806        }
1807    }
1808
1809    #[test]
1810    fn test_writer_fixed_size_list() {
1811        let size = 3;
1812        let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1813        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1814            "list",
1815            DataType::FixedSizeList(field, size),
1816            true,
1817        )]));
1818
1819        let values_builder = Int32Builder::new();
1820        let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1821        let lists = [
1822            Some([Some(1), Some(2), None]),
1823            Some([Some(3), None, Some(4)]),
1824            Some([None, Some(5), Some(6)]),
1825            None,
1826        ];
1827        for list in lists {
1828            match list {
1829                Some(l) => {
1830                    for value in l {
1831                        match value {
1832                            Some(v) => list_builder.values().append_value(v),
1833                            None => list_builder.values().append_null(),
1834                        }
1835                    }
1836                    list_builder.append(true);
1837                }
1838                None => {
1839                    for _ in 0..size {
1840                        list_builder.values().append_null();
1841                    }
1842                    list_builder.append(false);
1843                }
1844            }
1845        }
1846        let array = Arc::new(list_builder.finish()) as ArrayRef;
1847        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1848
1849        //encode and check JSON with explicit nulls:
1850        {
1851            let json_value: Value = {
1852                let mut buf = Vec::new();
1853                let mut writer = WriterBuilder::new()
1854                    .with_explicit_nulls(true)
1855                    .build::<_, JsonArray>(&mut buf);
1856                writer.write(&batch).unwrap();
1857                writer.close().unwrap();
1858                serde_json::from_slice(&buf).unwrap()
1859            };
1860            assert_eq!(
1861                json!([
1862                    {"list": [1, 2, null]},
1863                    {"list": [3, null, 4]},
1864                    {"list": [null, 5, 6]},
1865                    {"list": null},
1866                ]),
1867                json_value
1868            );
1869        }
1870        // encode and check JSON with no explicit nulls:
1871        {
1872            let json_value: Value = {
1873                let mut buf = Vec::new();
1874                let mut writer = ArrayWriter::new(&mut buf);
1875                writer.write(&batch).unwrap();
1876                writer.close().unwrap();
1877                serde_json::from_slice(&buf).unwrap()
1878            };
1879            assert_eq!(
1880                json!([
1881                    {"list": [1, 2, null]},
1882                    {"list": [3, null, 4]},
1883                    {"list": [null, 5, 6]},
1884                    {}, // empty because nulls are omitted
1885                ]),
1886                json_value
1887            );
1888        }
1889    }
1890
1891    #[test]
1892    fn test_writer_null_dict() {
1893        let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
1894        let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
1895        let dict = DictionaryArray::new(keys, values);
1896
1897        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1898            "my_dict",
1899            DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
1900            true,
1901        )]));
1902
1903        let array = Arc::new(dict) as ArrayRef;
1904        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1905
1906        let mut json = Vec::new();
1907        let write_builder = WriterBuilder::new().with_explicit_nulls(true);
1908        let mut writer = write_builder.build::<_, JsonArray>(&mut json);
1909        writer.write(&batch).unwrap();
1910        writer.close().unwrap();
1911
1912        let json_str = str::from_utf8(&json).unwrap();
1913        assert_eq!(
1914            json_str,
1915            r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
1916        )
1917    }
1918
1919    #[test]
1920    fn test_decimal128_encoder() {
1921        let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
1922            .with_precision_and_scale(10, 2)
1923            .unwrap();
1924        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1925        let schema = Schema::new(vec![field]);
1926        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1927
1928        let mut buf = Vec::new();
1929        {
1930            let mut writer = LineDelimitedWriter::new(&mut buf);
1931            writer.write_batches(&[&batch]).unwrap();
1932        }
1933
1934        assert_json_eq(
1935            &buf,
1936            r#"{"decimal":12.34}
1937{"decimal":56.78}
1938{"decimal":90.12}
1939"#,
1940        );
1941    }
1942
1943    #[test]
1944    fn test_decimal256_encoder() {
1945        let array = Decimal256Array::from_iter_values([
1946            i256::from(123400),
1947            i256::from(567800),
1948            i256::from(901200),
1949        ])
1950        .with_precision_and_scale(10, 4)
1951        .unwrap();
1952        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1953        let schema = Schema::new(vec![field]);
1954        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1955
1956        let mut buf = Vec::new();
1957        {
1958            let mut writer = LineDelimitedWriter::new(&mut buf);
1959            writer.write_batches(&[&batch]).unwrap();
1960        }
1961
1962        assert_json_eq(
1963            &buf,
1964            r#"{"decimal":12.3400}
1965{"decimal":56.7800}
1966{"decimal":90.1200}
1967"#,
1968        );
1969    }
1970
1971    #[test]
1972    fn test_decimal_encoder_with_nulls() {
1973        let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
1974            .with_precision_and_scale(10, 2)
1975            .unwrap();
1976        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1977        let schema = Schema::new(vec![field]);
1978        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1979
1980        let mut buf = Vec::new();
1981        {
1982            let mut writer = LineDelimitedWriter::new(&mut buf);
1983            writer.write_batches(&[&batch]).unwrap();
1984        }
1985
1986        assert_json_eq(
1987            &buf,
1988            r#"{"decimal":12.34}
1989{}
1990{"decimal":56.78}
1991"#,
1992        );
1993    }
1994
1995    #[test]
1996    fn write_structs_as_list() {
1997        let schema = Schema::new(vec![
1998            Field::new(
1999                "c1",
2000                DataType::Struct(Fields::from(vec![
2001                    Field::new("c11", DataType::Int32, true),
2002                    Field::new(
2003                        "c12",
2004                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2005                        false,
2006                    ),
2007                ])),
2008                false,
2009            ),
2010            Field::new("c2", DataType::Utf8, false),
2011        ]);
2012
2013        let c1 = StructArray::from(vec![
2014            (
2015                Arc::new(Field::new("c11", DataType::Int32, true)),
2016                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2017            ),
2018            (
2019                Arc::new(Field::new(
2020                    "c12",
2021                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2022                    false,
2023                )),
2024                Arc::new(StructArray::from(vec![(
2025                    Arc::new(Field::new("c121", DataType::Utf8, false)),
2026                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2027                )])) as ArrayRef,
2028            ),
2029        ]);
2030        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2031
2032        let batch =
2033            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2034
2035        let expected = r#"[[1,["e"]],"a"]
2036[[null,["f"]],"b"]
2037[[5,["g"]],"c"]
2038"#;
2039
2040        let mut buf = Vec::new();
2041        {
2042            let builder = WriterBuilder::new()
2043                .with_explicit_nulls(true)
2044                .with_struct_mode(StructMode::ListOnly);
2045            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2046            writer.write_batches(&[&batch]).unwrap();
2047        }
2048        assert_json_eq(&buf, expected);
2049
2050        let mut buf = Vec::new();
2051        {
2052            let builder = WriterBuilder::new()
2053                .with_explicit_nulls(false)
2054                .with_struct_mode(StructMode::ListOnly);
2055            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2056            writer.write_batches(&[&batch]).unwrap();
2057        }
2058        assert_json_eq(&buf, expected);
2059    }
2060
2061    fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2062        // Note: this is not intended to be an efficient implementation.
2063        // Just a simple example to demonstrate how to implement a custom encoder.
2064        #[derive(Debug)]
2065        enum UnionValue {
2066            Int32(i32),
2067            String(String),
2068        }
2069
2070        #[derive(Debug)]
2071        struct UnionEncoder {
2072            array: Vec<Option<UnionValue>>,
2073        }
2074
2075        impl Encoder for UnionEncoder {
2076            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2077                match &self.array[idx] {
2078                    None => out.extend_from_slice(b"null"),
2079                    Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2080                    Some(UnionValue::String(v)) => {
2081                        out.extend_from_slice(format!("\"{}\"", v).as_bytes())
2082                    }
2083                }
2084            }
2085        }
2086
2087        #[derive(Debug)]
2088        struct UnionEncoderFactory;
2089
2090        impl EncoderFactory for UnionEncoderFactory {
2091            fn make_default_encoder<'a>(
2092                &self,
2093                _field: &'a FieldRef,
2094                array: &'a dyn Array,
2095                _options: &'a EncoderOptions,
2096            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2097                let data_type = array.data_type();
2098                let fields = match data_type {
2099                    DataType::Union(fields, UnionMode::Sparse) => fields,
2100                    _ => return Ok(None),
2101                };
2102                // check that the fields are supported
2103                let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2104                for f in fields.iter() {
2105                    match f.data_type() {
2106                        DataType::Null => {}
2107                        DataType::Int32 => {}
2108                        DataType::Utf8 => {}
2109                        _ => return Ok(None),
2110                    }
2111                }
2112                let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2113                let mut values = Vec::with_capacity(type_ids.len());
2114                for idx in 0..type_ids.len() {
2115                    let type_id = type_ids[idx];
2116                    let field = &fields[type_id as usize];
2117                    let value = match field.data_type() {
2118                        DataType::Null => None,
2119                        DataType::Int32 => Some(UnionValue::Int32(
2120                            buffers[type_id as usize]
2121                                .as_primitive::<Int32Type>()
2122                                .value(idx),
2123                        )),
2124                        DataType::Utf8 => Some(UnionValue::String(
2125                            buffers[type_id as usize]
2126                                .as_string::<i32>()
2127                                .value(idx)
2128                                .to_string(),
2129                        )),
2130                        _ => unreachable!(),
2131                    };
2132                    values.push(value);
2133                }
2134                let array_encoder =
2135                    Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2136                let nulls = array.nulls().cloned();
2137                Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2138            }
2139        }
2140
2141        let int_array = Int32Array::from(vec![Some(1), None, None]);
2142        let string_array = StringArray::from(vec![None, Some("a"), None]);
2143        let null_array = NullArray::new(3);
2144        let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2145
2146        let union_fields = [
2147            (0, Arc::new(Field::new("A", DataType::Int32, false))),
2148            (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2149            (2, Arc::new(Field::new("C", DataType::Null, false))),
2150        ]
2151        .into_iter()
2152        .collect::<UnionFields>();
2153
2154        let children = vec![
2155            Arc::new(int_array) as Arc<dyn Array>,
2156            Arc::new(string_array),
2157            Arc::new(null_array),
2158        ];
2159
2160        let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2161
2162        let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2163
2164        let fields = vec![
2165            Field::new(
2166                "union",
2167                DataType::Union(union_fields, UnionMode::Sparse),
2168                true,
2169            ),
2170            Field::new("float", DataType::Float64, true),
2171        ];
2172
2173        let batch = RecordBatch::try_new(
2174            Arc::new(Schema::new(fields)),
2175            vec![
2176                Arc::new(array) as Arc<dyn Array>,
2177                Arc::new(float_array) as Arc<dyn Array>,
2178            ],
2179        )
2180        .unwrap();
2181
2182        (batch, Arc::new(UnionEncoderFactory))
2183    }
2184
2185    #[test]
2186    fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2187        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2188
2189        let mut buf = Vec::new();
2190        {
2191            let mut writer = WriterBuilder::new()
2192                .with_encoder_factory(encoder_factory)
2193                .with_explicit_nulls(false)
2194                .build::<_, LineDelimited>(&mut buf);
2195            writer.write_batches(&[&batch]).unwrap();
2196            writer.finish().unwrap();
2197        }
2198
2199        println!("{}", str::from_utf8(&buf).unwrap());
2200
2201        assert_json_eq(
2202            &buf,
2203            r#"{"union":1,"float":1.0}
2204{"union":"a"}
2205{"union":null,"float":3.4}
2206"#,
2207        );
2208    }
2209
2210    #[test]
2211    fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2212        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2213
2214        let mut buf = Vec::new();
2215        {
2216            let mut writer = WriterBuilder::new()
2217                .with_encoder_factory(encoder_factory)
2218                .with_explicit_nulls(true)
2219                .build::<_, LineDelimited>(&mut buf);
2220            writer.write_batches(&[&batch]).unwrap();
2221            writer.finish().unwrap();
2222        }
2223
2224        assert_json_eq(
2225            &buf,
2226            r#"{"union":1,"float":1.0}
2227{"union":"a","float":null}
2228{"union":null,"float":3.4}
2229"#,
2230        );
2231    }
2232
2233    #[test]
2234    fn test_fallback_encoder_factory_array_implicit_nulls() {
2235        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2236
2237        let json_value: Value = {
2238            let mut buf = Vec::new();
2239            let mut writer = WriterBuilder::new()
2240                .with_encoder_factory(encoder_factory)
2241                .build::<_, JsonArray>(&mut buf);
2242            writer.write_batches(&[&batch]).unwrap();
2243            writer.finish().unwrap();
2244            serde_json::from_slice(&buf).unwrap()
2245        };
2246
2247        let expected = json!([
2248            {"union":1,"float":1.0},
2249            {"union":"a"},
2250            {"float":3.4,"union":null},
2251        ]);
2252
2253        assert_eq!(json_value, expected);
2254    }
2255
2256    #[test]
2257    fn test_fallback_encoder_factory_array_explicit_nulls() {
2258        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2259
2260        let json_value: Value = {
2261            let mut buf = Vec::new();
2262            let mut writer = WriterBuilder::new()
2263                .with_encoder_factory(encoder_factory)
2264                .with_explicit_nulls(true)
2265                .build::<_, JsonArray>(&mut buf);
2266            writer.write_batches(&[&batch]).unwrap();
2267            writer.finish().unwrap();
2268            serde_json::from_slice(&buf).unwrap()
2269        };
2270
2271        let expected = json!([
2272            {"union":1,"float":1.0},
2273            {"union":"a", "float": null},
2274            {"union":null,"float":3.4},
2275        ]);
2276
2277        assert_eq!(json_value, expected);
2278    }
2279
2280    #[test]
2281    fn test_default_encoder_byte_array() {
2282        struct IntArrayBinaryEncoder<B> {
2283            array: B,
2284        }
2285
2286        impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2287        where
2288            B: ArrayAccessor<Item = &'a [u8]>,
2289        {
2290            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2291                out.push(b'[');
2292                let child = self.array.value(idx);
2293                for (idx, byte) in child.iter().enumerate() {
2294                    write!(out, "{byte}").unwrap();
2295                    if idx < child.len() - 1 {
2296                        out.push(b',');
2297                    }
2298                }
2299                out.push(b']');
2300            }
2301        }
2302
2303        #[derive(Debug)]
2304        struct IntArayBinaryEncoderFactory;
2305
2306        impl EncoderFactory for IntArayBinaryEncoderFactory {
2307            fn make_default_encoder<'a>(
2308                &self,
2309                _field: &'a FieldRef,
2310                array: &'a dyn Array,
2311                _options: &'a EncoderOptions,
2312            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2313                match array.data_type() {
2314                    DataType::Binary => {
2315                        let array = array.as_binary::<i32>();
2316                        let encoder = IntArrayBinaryEncoder { array };
2317                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2318                        let nulls = array.nulls().cloned();
2319                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2320                    }
2321                    _ => Ok(None),
2322                }
2323            }
2324        }
2325
2326        let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2327        let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2328        let fields = vec![
2329            Field::new("bytes", DataType::Binary, true),
2330            Field::new("float", DataType::Float64, true),
2331        ];
2332        let batch = RecordBatch::try_new(
2333            Arc::new(Schema::new(fields)),
2334            vec![
2335                Arc::new(binary_array) as Arc<dyn Array>,
2336                Arc::new(float_array) as Arc<dyn Array>,
2337            ],
2338        )
2339        .unwrap();
2340
2341        let json_value: Value = {
2342            let mut buf = Vec::new();
2343            let mut writer = WriterBuilder::new()
2344                .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2345                .build::<_, JsonArray>(&mut buf);
2346            writer.write_batches(&[&batch]).unwrap();
2347            writer.finish().unwrap();
2348            serde_json::from_slice(&buf).unwrap()
2349        };
2350
2351        let expected = json!([
2352            {"bytes": [97], "float": 1.0},
2353            {"float": 2.3},
2354            {"bytes": [98]},
2355        ]);
2356
2357        assert_eq!(json_value, expected);
2358    }
2359
2360    #[test]
2361    fn test_encoder_factory_customize_dictionary() {
2362        // Test that we can customize the encoding of T even when it shows up as Dictionary<_, T>.
2363
2364        // No particular reason to choose this example.
2365        // Just trying to add some variety to the test cases and demonstrate use cases of the encoder factory.
2366        struct PaddedInt32Encoder {
2367            array: Int32Array,
2368        }
2369
2370        impl Encoder for PaddedInt32Encoder {
2371            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2372                let value = self.array.value(idx);
2373                write!(out, "\"{value:0>8}\"").unwrap();
2374            }
2375        }
2376
2377        #[derive(Debug)]
2378        struct CustomEncoderFactory;
2379
2380        impl EncoderFactory for CustomEncoderFactory {
2381            fn make_default_encoder<'a>(
2382                &self,
2383                field: &'a FieldRef,
2384                array: &'a dyn Array,
2385                _options: &'a EncoderOptions,
2386            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2387                // The point here is:
2388                // 1. You can use information from Field to determine how to do the encoding.
2389                // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array
2390                //    and thus the data type of `field` may not match the data type of `array`.
2391                let padded = field
2392                    .metadata()
2393                    .get("padded")
2394                    .map(|v| v == "true")
2395                    .unwrap_or_default();
2396                match (array.data_type(), padded) {
2397                    (DataType::Int32, true) => {
2398                        let array = array.as_primitive::<Int32Type>();
2399                        let nulls = array.nulls().cloned();
2400                        let encoder = PaddedInt32Encoder {
2401                            array: array.clone(),
2402                        };
2403                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2404                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2405                    }
2406                    _ => Ok(None),
2407                }
2408            }
2409        }
2410
2411        let to_json = |batch| {
2412            let mut buf = Vec::new();
2413            let mut writer = WriterBuilder::new()
2414                .with_encoder_factory(Arc::new(CustomEncoderFactory))
2415                .build::<_, JsonArray>(&mut buf);
2416            writer.write_batches(&[batch]).unwrap();
2417            writer.finish().unwrap();
2418            serde_json::from_slice::<Value>(&buf).unwrap()
2419        };
2420
2421        // Control case: no dictionary wrapping works as expected.
2422        let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2423        let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2424            HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2425        ));
2426        let batch = RecordBatch::try_new(
2427            Arc::new(Schema::new(vec![field.clone()])),
2428            vec![Arc::new(array)],
2429        )
2430        .unwrap();
2431
2432        let json_value = to_json(&batch);
2433
2434        let expected = json!([
2435            {"int": "00000001"},
2436            {},
2437            {"int": "00000002"},
2438        ]);
2439
2440        assert_eq!(json_value, expected);
2441
2442        // Now make a dictionary batch
2443        let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2444        array_builder.append_value(1);
2445        array_builder.append_null();
2446        array_builder.append_value(1);
2447        let array = array_builder.finish();
2448        let field = Field::new(
2449            "int",
2450            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2451            true,
2452        )
2453        .with_metadata(HashMap::from_iter(vec![(
2454            "padded".to_string(),
2455            "true".to_string(),
2456        )]));
2457        let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2458            .unwrap();
2459
2460        let json_value = to_json(&batch);
2461
2462        let expected = json!([
2463            {"int": "00000001"},
2464            {},
2465            {"int": "00000001"},
2466        ]);
2467
2468        assert_eq!(json_value, expected);
2469    }
2470}