Skip to main content

arrow_json/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # JSON Writer
19//!
20//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
21//! JSON objects or JSON formatted byte streams.
22//!
23//! ## Writing JSON formatted byte streams
24//!
25//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
26//! [`LineDelimitedWriter`]:
27//!
28//! ```
29//! # use std::sync::Arc;
30//! # use arrow_array::{Int32Array, RecordBatch};
31//! # use arrow_schema::{DataType, Field, Schema};
32//!
33//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
34//! let a = Int32Array::from(vec![1, 2, 3]);
35//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
36//!
37//! // Write the record batch out as JSON
38//! let buf = Vec::new();
39//! let mut writer = arrow_json::LineDelimitedWriter::new(buf);
40//! writer.write_batches(&vec![&batch]).unwrap();
41//! writer.finish().unwrap();
42//!
43//! // Get the underlying buffer back,
44//! let buf = writer.into_inner();
45//! assert_eq!(r#"{"a":1}
46//! {"a":2}
47//! {"a":3}
48//!"#, String::from_utf8(buf).unwrap())
49//! ```
50//!
51//! To serialize [`RecordBatch`]es into a well formed JSON array, use
52//! [`ArrayWriter`]:
53//!
54//! ```
55//! # use std::sync::Arc;
56//! # use arrow_array::{Int32Array, RecordBatch};
57//! use arrow_schema::{DataType, Field, Schema};
58//!
59//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
60//! let a = Int32Array::from(vec![1, 2, 3]);
61//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
62//!
63//! // Write the record batch out as a JSON array
64//! let buf = Vec::new();
65//! let mut writer = arrow_json::ArrayWriter::new(buf);
66//! writer.write_batches(&vec![&batch]).unwrap();
67//! writer.finish().unwrap();
68//!
69//! // Get the underlying buffer back,
70//! let buf = writer.into_inner();
71//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
72//! ```
73//!
74//! [`LineDelimitedWriter`] and [`ArrayWriter`] will omit writing keys with null values.
75//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
76//! using a [`WriterBuilder`] to construct a [`Writer`].
77//!
78//! ## Writing to [serde_json] JSON Objects
79//!
80//! To serialize [`RecordBatch`]es into an array of
81//! [JSON](https://docs.serde.rs/serde_json/) objects you can reparse the resulting JSON string.
82//! Note that this is less efficient than using the `Writer` API.
83//!
84//! ```
85//! # use std::sync::Arc;
86//! # use arrow_array::{Int32Array, RecordBatch};
87//! # use arrow_schema::{DataType, Field, Schema};
88//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
89//! let a = Int32Array::from(vec![1, 2, 3]);
90//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
91//!
92//! // Write the record batch out as json bytes (string)
93//! let buf = Vec::new();
94//! let mut writer = arrow_json::ArrayWriter::new(buf);
95//! writer.write_batches(&vec![&batch]).unwrap();
96//! writer.finish().unwrap();
97//! let json_data = writer.into_inner();
98//!
99//! // Parse the string using serde_json
100//! use serde_json::{Map, Value};
101//! let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(json_data.as_slice()).unwrap();
102//! assert_eq!(
103//!     serde_json::Value::Object(json_rows[1].clone()),
104//!     serde_json::json!({"a": 2}),
105//! );
106//! ```
107mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{Encoder, EncoderFactory, EncoderOptions, NullableEncoder, make_encoder};
116
117/// This trait defines how to format a sequence of JSON objects to a
118/// byte stream.
119pub trait JsonFormat: Debug + Default {
120    #[inline]
121    /// write any bytes needed at the start of the file to the writer
122    fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123        Ok(())
124    }
125
126    #[inline]
127    /// write any bytes needed for the start of each row
128    fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129        Ok(())
130    }
131
132    #[inline]
133    /// write any bytes needed for the end of each row
134    fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135        Ok(())
136    }
137
138    /// write any bytes needed for the start of each row
139    fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140        Ok(())
141    }
142}
143
144/// Produces JSON output with one record per line.
145///
146/// For example:
147///
148/// ```json
149/// {"foo":1}
150/// {"bar":1}
151///
152/// ```
153#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157    fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158        writer.write_all(b"\n")?;
159        Ok(())
160    }
161}
162
163/// Produces JSON output as a single JSON array.
164///
165/// For example:
166///
167/// ```json
168/// [{"foo":1},{"bar":1}]
169/// ```
170#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174    fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175        writer.write_all(b"[")?;
176        Ok(())
177    }
178
179    fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180        if !is_first_row {
181            writer.write_all(b",")?;
182        }
183        Ok(())
184    }
185
186    fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187        writer.write_all(b"]")?;
188        Ok(())
189    }
190}
191
192/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects.
193pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays.
196pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198/// JSON writer builder.
199#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203    /// Create a new builder for configuring JSON writing options.
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// # use arrow_json::{Writer, WriterBuilder};
209    /// # use arrow_json::writer::LineDelimited;
210    /// # use std::fs::File;
211    ///
212    /// fn example() -> Writer<File, LineDelimited> {
213    ///     let file = File::create("target/out.json").unwrap();
214    ///
215    ///     // create a builder that keeps keys with null values
216    ///     let builder = WriterBuilder::new().with_explicit_nulls(true);
217    ///     let writer = builder.build::<_, LineDelimited>(file);
218    ///
219    ///     writer
220    /// }
221    /// ```
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Returns `true` if this writer is configured to keep keys with null values.
227    pub fn explicit_nulls(&self) -> bool {
228        self.0.explicit_nulls()
229    }
230
231    /// Set whether to keep keys with null values, or to omit writing them.
232    ///
233    /// For example, with [`LineDelimited`] format:
234    ///
235    /// Skip nulls (set to `false`):
236    ///
237    /// ```json
238    /// {"foo":1}
239    /// {"foo":1,"bar":2}
240    /// {}
241    /// ```
242    ///
243    /// Keep nulls (set to `true`):
244    ///
245    /// ```json
246    /// {"foo":1,"bar":null}
247    /// {"foo":1,"bar":2}
248    /// {"foo":null,"bar":null}
249    /// ```
250    ///
251    /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`,
252    /// nulls will be written explicitly regardless of this setting.
253    pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254        self.0 = self.0.with_explicit_nulls(explicit_nulls);
255        self
256    }
257
258    /// Returns if this writer is configured to write structs as JSON Objects or Arrays.
259    pub fn struct_mode(&self) -> StructMode {
260        self.0.struct_mode()
261    }
262
263    /// Set the [`StructMode`] for the writer, which determines whether structs
264    /// are encoded to JSON as objects or lists. For more details refer to the
265    /// enum documentation. Default is to use `ObjectOnly`. If this is set to
266    /// `ListOnly`, nulls will be written explicitly regardless of the
267    /// `explicit_nulls` setting.
268    pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269        self.0 = self.0.with_struct_mode(struct_mode);
270        self
271    }
272
273    /// Set an encoder factory to use when creating encoders for writing JSON.
274    ///
275    /// This can be used to override how some types are encoded or to provide
276    /// a fallback for types that are not supported by the default encoder.
277    pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278        self.0 = self.0.with_encoder_factory(factory);
279        self
280    }
281
282    /// Set the JSON file's date format
283    pub fn with_date_format(mut self, format: String) -> Self {
284        self.0 = self.0.with_date_format(format);
285        self
286    }
287
288    /// Set the JSON file's datetime format
289    pub fn with_datetime_format(mut self, format: String) -> Self {
290        self.0 = self.0.with_datetime_format(format);
291        self
292    }
293
294    /// Set the JSON file's time format
295    pub fn with_time_format(mut self, format: String) -> Self {
296        self.0 = self.0.with_time_format(format);
297        self
298    }
299
300    /// Set the JSON file's timestamp format
301    pub fn with_timestamp_format(mut self, format: String) -> Self {
302        self.0 = self.0.with_timestamp_format(format);
303        self
304    }
305
306    /// Set the JSON file's timestamp tz format
307    pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
308        self.0 = self.0.with_timestamp_tz_format(tz_format);
309        self
310    }
311
312    /// Create a new `Writer` with specified `JsonFormat` and builder options.
313    pub fn build<W, F>(self, writer: W) -> Writer<W, F>
314    where
315        W: Write,
316        F: JsonFormat,
317    {
318        Writer {
319            writer,
320            started: false,
321            finished: false,
322            format: F::default(),
323            options: self.0,
324        }
325    }
326}
327
328/// A JSON writer which serializes [`RecordBatch`]es to a stream of
329/// `u8` encoded JSON objects.
330///
331/// See the module level documentation for detailed usage and examples.
332/// The specific format of the stream is controlled by the [`JsonFormat`]
333/// type parameter.
334///
335/// By default the writer will skip writing keys with null values for
336/// backward compatibility. See [`WriterBuilder`] on how to customize
337/// this behaviour when creating a new writer.
338#[derive(Debug)]
339pub struct Writer<W, F>
340where
341    W: Write,
342    F: JsonFormat,
343{
344    /// Underlying writer to use to write bytes
345    writer: W,
346
347    /// Has the writer output any records yet?
348    started: bool,
349
350    /// Is the writer finished?
351    finished: bool,
352
353    /// Determines how the byte stream is formatted
354    format: F,
355
356    /// Controls how JSON should be encoded, e.g. whether to write explicit nulls or skip them
357    options: EncoderOptions,
358}
359
360impl<W, F> Writer<W, F>
361where
362    W: Write,
363    F: JsonFormat,
364{
365    /// Construct a new writer
366    pub fn new(writer: W) -> Self {
367        Self {
368            writer,
369            started: false,
370            finished: false,
371            format: F::default(),
372            options: EncoderOptions::default(),
373        }
374    }
375
376    /// Serialize `batch` to JSON output
377    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
378        if batch.num_rows() == 0 {
379            return Ok(());
380        }
381
382        // BufWriter uses a buffer size of 8KB
383        // We therefore double this and flush once we have more than 8KB
384        let mut buffer = Vec::with_capacity(16 * 1024);
385
386        let mut is_first_row = !self.started;
387        if !self.started {
388            self.format.start_stream(&mut buffer)?;
389            self.started = true;
390        }
391
392        let array = StructArray::from(batch.clone());
393        let field = Arc::new(Field::new_struct(
394            "",
395            batch.schema().fields().clone(),
396            false,
397        ));
398
399        let mut encoder = make_encoder(&field, &array, &self.options)?;
400
401        // Validate that the root is not nullable
402        assert!(!encoder.has_nulls(), "root cannot be nullable");
403        for idx in 0..batch.num_rows() {
404            self.format.start_row(&mut buffer, is_first_row)?;
405            is_first_row = false;
406
407            encoder.encode(idx, &mut buffer);
408            if buffer.len() > 8 * 1024 {
409                self.writer.write_all(&buffer)?;
410                buffer.clear();
411            }
412            self.format.end_row(&mut buffer)?;
413        }
414
415        if !buffer.is_empty() {
416            self.writer.write_all(&buffer)?;
417        }
418
419        Ok(())
420    }
421
422    /// Serialize `batches` to JSON output
423    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
424        for b in batches {
425            self.write(b)?;
426        }
427        Ok(())
428    }
429
430    /// Finishes the output stream. This function must be called after
431    /// all record batches have been produced. (e.g. producing the final `']'` if writing
432    /// arrays.
433    pub fn finish(&mut self) -> Result<(), ArrowError> {
434        if !self.started {
435            self.format.start_stream(&mut self.writer)?;
436            self.started = true;
437        }
438        if !self.finished {
439            self.format.end_stream(&mut self.writer)?;
440            self.finished = true;
441        }
442
443        Ok(())
444    }
445
446    /// Gets a reference to the underlying writer.
447    pub fn get_ref(&self) -> &W {
448        &self.writer
449    }
450
451    /// Gets a mutable reference to the underlying writer.
452    ///
453    /// Writing to the underlying writer must be done with care
454    /// to avoid corrupting the output JSON.
455    pub fn get_mut(&mut self) -> &mut W {
456        &mut self.writer
457    }
458
459    /// Unwraps this `Writer<W>`, returning the underlying writer
460    pub fn into_inner(self) -> W {
461        self.writer
462    }
463}
464
465impl<W, F> RecordBatchWriter for Writer<W, F>
466where
467    W: Write,
468    F: JsonFormat,
469{
470    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
471        self.write(batch)
472    }
473
474    fn close(mut self) -> Result<(), ArrowError> {
475        self.finish()
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use core::str;
482    use std::collections::HashMap;
483    use std::fs::{File, read_to_string};
484    use std::io::{BufReader, Seek};
485    use std::sync::Arc;
486
487    use arrow_array::cast::AsArray;
488    use serde_json::{Value, json};
489
490    use super::LineDelimited;
491    use super::{Encoder, WriterBuilder};
492    use arrow_array::builder::*;
493    use arrow_array::types::*;
494    use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice, i256};
495    use arrow_data::ArrayData;
496
497    use crate::reader::*;
498
499    use super::*;
500
501    /// Asserts that the NDJSON `input` is semantically identical to `expected`
502    fn assert_json_eq(input: &[u8], expected: &str) {
503        let expected: Vec<Option<Value>> = expected
504            .split('\n')
505            .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
506            .collect();
507
508        let actual: Vec<Option<Value>> = input
509            .split(|b| *b == b'\n')
510            .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
511            .collect();
512
513        assert_eq!(actual, expected);
514    }
515
516    #[test]
517    fn write_simple_rows() {
518        let schema = Schema::new(vec![
519            Field::new("c1", DataType::Int32, true),
520            Field::new("c2", DataType::Utf8, true),
521        ]);
522
523        let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
524        let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
525
526        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
527
528        let mut buf = Vec::new();
529        {
530            let mut writer = LineDelimitedWriter::new(&mut buf);
531            writer.write_batches(&[&batch]).unwrap();
532        }
533
534        assert_json_eq(
535            &buf,
536            r#"{"c1":1,"c2":"a"}
537{"c1":2,"c2":"b"}
538{"c1":3,"c2":"c"}
539{"c2":"d"}
540{"c1":5}
541"#,
542        );
543    }
544
545    #[test]
546    fn write_large_utf8_and_utf8_view() {
547        let schema = Schema::new(vec![
548            Field::new("c1", DataType::Utf8, true),
549            Field::new("c2", DataType::LargeUtf8, true),
550            Field::new("c3", DataType::Utf8View, true),
551        ]);
552
553        let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
554        let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
555        let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
556
557        let batch = RecordBatch::try_new(
558            Arc::new(schema),
559            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
560        )
561        .unwrap();
562
563        let mut buf = Vec::new();
564        {
565            let mut writer = LineDelimitedWriter::new(&mut buf);
566            writer.write_batches(&[&batch]).unwrap();
567        }
568
569        assert_json_eq(
570            &buf,
571            r#"{"c1":"a","c2":"a","c3":"a"}
572{"c2":"b","c3":"b"}
573{"c1":"c"}
574{"c1":"d","c2":"d","c3":"d"}
575{}
576"#,
577        );
578    }
579
580    #[test]
581    fn write_dictionary() {
582        let schema = Schema::new(vec![
583            Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
584            Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
585        ]);
586
587        let a: DictionaryArray<Int32Type> = vec![
588            Some("cupcakes"),
589            Some("foo"),
590            Some("foo"),
591            None,
592            Some("cupcakes"),
593        ]
594        .into_iter()
595        .collect();
596        let b: DictionaryArray<Int8Type> =
597            vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
598                .into_iter()
599                .collect();
600
601        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
602
603        let mut buf = Vec::new();
604        {
605            let mut writer = LineDelimitedWriter::new(&mut buf);
606            writer.write_batches(&[&batch]).unwrap();
607        }
608
609        assert_json_eq(
610            &buf,
611            r#"{"c1":"cupcakes","c2":"sdsd"}
612{"c1":"foo","c2":"sdsd"}
613{"c1":"foo"}
614{"c2":"sd"}
615{"c1":"cupcakes","c2":"sdsd"}
616"#,
617        );
618    }
619
620    #[test]
621    fn write_list_of_dictionary() {
622        let dict_field = Arc::new(Field::new_dictionary(
623            "item",
624            DataType::Int32,
625            DataType::Utf8,
626            true,
627        ));
628        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
629
630        let dict_array: DictionaryArray<Int32Type> =
631            vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
632                .into_iter()
633                .collect();
634        let list_array = LargeListArray::try_new(
635            dict_field,
636            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
637            Arc::new(dict_array),
638            Some(NullBuffer::from_iter([true, true, false, true])),
639        )
640        .unwrap();
641
642        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
643
644        let mut buf = Vec::new();
645        {
646            let mut writer = LineDelimitedWriter::new(&mut buf);
647            writer.write_batches(&[&batch]).unwrap();
648        }
649
650        assert_json_eq(
651            &buf,
652            r#"{"l":["a","b","c"]}
653{"l":["a",null]}
654{}
655{"l":["c"]}
656"#,
657        );
658    }
659
660    #[test]
661    fn write_list_of_dictionary_large_values() {
662        let dict_field = Arc::new(Field::new_dictionary(
663            "item",
664            DataType::Int32,
665            DataType::LargeUtf8,
666            true,
667        ));
668        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
669
670        let keys = PrimitiveArray::<Int32Type>::from(vec![
671            Some(0),
672            Some(1),
673            Some(2),
674            Some(0),
675            None,
676            Some(2),
677        ]);
678        let values = LargeStringArray::from(vec!["a", "b", "c"]);
679        let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
680
681        let list_array = LargeListArray::try_new(
682            dict_field,
683            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
684            Arc::new(dict_array),
685            Some(NullBuffer::from_iter([true, true, false, true])),
686        )
687        .unwrap();
688
689        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
690
691        let mut buf = Vec::new();
692        {
693            let mut writer = LineDelimitedWriter::new(&mut buf);
694            writer.write_batches(&[&batch]).unwrap();
695        }
696
697        assert_json_eq(
698            &buf,
699            r#"{"l":["a","b","c"]}
700{"l":["a",null]}
701{}
702{"l":["c"]}
703"#,
704        );
705    }
706
707    #[test]
708    fn write_timestamps() {
709        let ts_string = "2018-11-13T17:11:10.011375885995";
710        let ts_nanos = ts_string
711            .parse::<chrono::NaiveDateTime>()
712            .unwrap()
713            .and_utc()
714            .timestamp_nanos_opt()
715            .unwrap();
716        let ts_micros = ts_nanos / 1000;
717        let ts_millis = ts_micros / 1000;
718        let ts_secs = ts_millis / 1000;
719
720        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
721        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
722        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
723        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
724        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
725
726        let schema = Schema::new(vec![
727            Field::new("nanos", arr_nanos.data_type().clone(), true),
728            Field::new("micros", arr_micros.data_type().clone(), true),
729            Field::new("millis", arr_millis.data_type().clone(), true),
730            Field::new("secs", arr_secs.data_type().clone(), true),
731            Field::new("name", arr_names.data_type().clone(), true),
732        ]);
733        let schema = Arc::new(schema);
734
735        let batch = RecordBatch::try_new(
736            schema,
737            vec![
738                Arc::new(arr_nanos),
739                Arc::new(arr_micros),
740                Arc::new(arr_millis),
741                Arc::new(arr_secs),
742                Arc::new(arr_names),
743            ],
744        )
745        .unwrap();
746
747        let mut buf = Vec::new();
748        {
749            let mut writer = LineDelimitedWriter::new(&mut buf);
750            writer.write_batches(&[&batch]).unwrap();
751        }
752
753        assert_json_eq(
754            &buf,
755            r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
756{"name":"b"}
757"#,
758        );
759
760        let mut buf = Vec::new();
761        {
762            let mut writer = WriterBuilder::new()
763                .with_timestamp_format("%m-%d-%Y".to_string())
764                .build::<_, LineDelimited>(&mut buf);
765            writer.write_batches(&[&batch]).unwrap();
766        }
767
768        assert_json_eq(
769            &buf,
770            r#"{"nanos":"11-13-2018","micros":"11-13-2018","millis":"11-13-2018","secs":"11-13-2018","name":"a"}
771{"name":"b"}
772"#,
773        );
774    }
775
776    #[test]
777    fn write_timestamps_with_tz() {
778        let ts_string = "2018-11-13T17:11:10.011375885995";
779        let ts_nanos = ts_string
780            .parse::<chrono::NaiveDateTime>()
781            .unwrap()
782            .and_utc()
783            .timestamp_nanos_opt()
784            .unwrap();
785        let ts_micros = ts_nanos / 1000;
786        let ts_millis = ts_micros / 1000;
787        let ts_secs = ts_millis / 1000;
788
789        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
790        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
791        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
792        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
793        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
794
795        let tz = "+00:00";
796
797        let arr_nanos = arr_nanos.with_timezone(tz);
798        let arr_micros = arr_micros.with_timezone(tz);
799        let arr_millis = arr_millis.with_timezone(tz);
800        let arr_secs = arr_secs.with_timezone(tz);
801
802        let schema = Schema::new(vec![
803            Field::new("nanos", arr_nanos.data_type().clone(), true),
804            Field::new("micros", arr_micros.data_type().clone(), true),
805            Field::new("millis", arr_millis.data_type().clone(), true),
806            Field::new("secs", arr_secs.data_type().clone(), true),
807            Field::new("name", arr_names.data_type().clone(), true),
808        ]);
809        let schema = Arc::new(schema);
810
811        let batch = RecordBatch::try_new(
812            schema,
813            vec![
814                Arc::new(arr_nanos),
815                Arc::new(arr_micros),
816                Arc::new(arr_millis),
817                Arc::new(arr_secs),
818                Arc::new(arr_names),
819            ],
820        )
821        .unwrap();
822
823        let mut buf = Vec::new();
824        {
825            let mut writer = LineDelimitedWriter::new(&mut buf);
826            writer.write_batches(&[&batch]).unwrap();
827        }
828
829        assert_json_eq(
830            &buf,
831            r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
832{"name":"b"}
833"#,
834        );
835
836        let mut buf = Vec::new();
837        {
838            let mut writer = WriterBuilder::new()
839                .with_timestamp_tz_format("%m-%d-%Y %Z".to_string())
840                .build::<_, LineDelimited>(&mut buf);
841            writer.write_batches(&[&batch]).unwrap();
842        }
843
844        assert_json_eq(
845            &buf,
846            r#"{"nanos":"11-13-2018 +00:00","micros":"11-13-2018 +00:00","millis":"11-13-2018 +00:00","secs":"11-13-2018 +00:00","name":"a"}
847{"name":"b"}
848"#,
849        );
850    }
851
852    #[test]
853    fn write_dates() {
854        let ts_string = "2018-11-13T17:11:10.011375885995";
855        let ts_millis = ts_string
856            .parse::<chrono::NaiveDateTime>()
857            .unwrap()
858            .and_utc()
859            .timestamp_millis();
860
861        let arr_date32 = Date32Array::from(vec![
862            Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
863            None,
864        ]);
865        let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
866        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
867
868        let schema = Schema::new(vec![
869            Field::new("date32", arr_date32.data_type().clone(), true),
870            Field::new("date64", arr_date64.data_type().clone(), true),
871            Field::new("name", arr_names.data_type().clone(), false),
872        ]);
873        let schema = Arc::new(schema);
874
875        let batch = RecordBatch::try_new(
876            schema,
877            vec![
878                Arc::new(arr_date32),
879                Arc::new(arr_date64),
880                Arc::new(arr_names),
881            ],
882        )
883        .unwrap();
884
885        let mut buf = Vec::new();
886        {
887            let mut writer = LineDelimitedWriter::new(&mut buf);
888            writer.write_batches(&[&batch]).unwrap();
889        }
890
891        assert_json_eq(
892            &buf,
893            r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
894{"name":"b"}
895"#,
896        );
897
898        let mut buf = Vec::new();
899        {
900            let mut writer = WriterBuilder::new()
901                .with_date_format("%m-%d-%Y".to_string())
902                .with_datetime_format("%m-%d-%Y %Mmin %Ssec %Hhour".to_string())
903                .build::<_, LineDelimited>(&mut buf);
904            writer.write_batches(&[&batch]).unwrap();
905        }
906
907        assert_json_eq(
908            &buf,
909            r#"{"date32":"11-13-2018","date64":"11-13-2018 11min 10sec 17hour","name":"a"}
910{"name":"b"}
911"#,
912        );
913    }
914
915    #[test]
916    fn write_times() {
917        let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
918        let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
919        let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
920        let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
921        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
922
923        let schema = Schema::new(vec![
924            Field::new("time32sec", arr_time32sec.data_type().clone(), true),
925            Field::new("time32msec", arr_time32msec.data_type().clone(), true),
926            Field::new("time64usec", arr_time64usec.data_type().clone(), true),
927            Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
928            Field::new("name", arr_names.data_type().clone(), true),
929        ]);
930        let schema = Arc::new(schema);
931
932        let batch = RecordBatch::try_new(
933            schema,
934            vec![
935                Arc::new(arr_time32sec),
936                Arc::new(arr_time32msec),
937                Arc::new(arr_time64usec),
938                Arc::new(arr_time64nsec),
939                Arc::new(arr_names),
940            ],
941        )
942        .unwrap();
943
944        let mut buf = Vec::new();
945        {
946            let mut writer = LineDelimitedWriter::new(&mut buf);
947            writer.write_batches(&[&batch]).unwrap();
948        }
949
950        assert_json_eq(
951            &buf,
952            r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
953{"name":"b"}
954"#,
955        );
956
957        let mut buf = Vec::new();
958        {
959            let mut writer = WriterBuilder::new()
960                .with_time_format("%H-%M-%S %f".to_string())
961                .build::<_, LineDelimited>(&mut buf);
962            writer.write_batches(&[&batch]).unwrap();
963        }
964
965        assert_json_eq(
966            &buf,
967            r#"{"time32sec":"00-02-00 000000000","time32msec":"00-00-00 120000000","time64usec":"00-00-00 000120000","time64nsec":"00-00-00 000000120","name":"a"}
968{"name":"b"}
969"#,
970        );
971    }
972
973    #[test]
974    fn write_durations() {
975        let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
976        let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
977        let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
978        let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
979        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
980
981        let schema = Schema::new(vec![
982            Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
983            Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
984            Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
985            Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
986            Field::new("name", arr_names.data_type().clone(), true),
987        ]);
988        let schema = Arc::new(schema);
989
990        let batch = RecordBatch::try_new(
991            schema,
992            vec![
993                Arc::new(arr_durationsec),
994                Arc::new(arr_durationmsec),
995                Arc::new(arr_durationusec),
996                Arc::new(arr_durationnsec),
997                Arc::new(arr_names),
998            ],
999        )
1000        .unwrap();
1001
1002        let mut buf = Vec::new();
1003        {
1004            let mut writer = LineDelimitedWriter::new(&mut buf);
1005            writer.write_batches(&[&batch]).unwrap();
1006        }
1007
1008        assert_json_eq(
1009            &buf,
1010            r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
1011{"name":"b"}
1012"#,
1013        );
1014    }
1015
1016    #[test]
1017    fn write_nested_structs() {
1018        let schema = Schema::new(vec![
1019            Field::new(
1020                "c1",
1021                DataType::Struct(Fields::from(vec![
1022                    Field::new("c11", DataType::Int32, true),
1023                    Field::new(
1024                        "c12",
1025                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1026                        false,
1027                    ),
1028                ])),
1029                false,
1030            ),
1031            Field::new("c2", DataType::Utf8, false),
1032        ]);
1033
1034        let c1 = StructArray::from(vec![
1035            (
1036                Arc::new(Field::new("c11", DataType::Int32, true)),
1037                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1038            ),
1039            (
1040                Arc::new(Field::new(
1041                    "c12",
1042                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1043                    false,
1044                )),
1045                Arc::new(StructArray::from(vec![(
1046                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1047                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1048                )])) as ArrayRef,
1049            ),
1050        ]);
1051        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1052
1053        let batch =
1054            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1055
1056        let mut buf = Vec::new();
1057        {
1058            let mut writer = LineDelimitedWriter::new(&mut buf);
1059            writer.write_batches(&[&batch]).unwrap();
1060        }
1061
1062        assert_json_eq(
1063            &buf,
1064            r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
1065{"c1":{"c12":{"c121":"f"}},"c2":"b"}
1066{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
1067"#,
1068        );
1069    }
1070
1071    #[test]
1072    fn write_struct_with_list_field() {
1073        let field_c1 = Field::new(
1074            "c1",
1075            DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
1076            false,
1077        );
1078        let field_c2 = Field::new("c2", DataType::Int32, false);
1079        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1080
1081        let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
1082        // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"]
1083        let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
1084        let a_list_data = ArrayData::builder(field_c1.data_type().clone())
1085            .len(5)
1086            .add_buffer(a_value_offsets)
1087            .add_child_data(a_values.into_data())
1088            .null_bit_buffer(Some(Buffer::from([0b00011111])))
1089            .build()
1090            .unwrap();
1091        let a = ListArray::from(a_list_data);
1092
1093        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1094
1095        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1096
1097        let mut buf = Vec::new();
1098        {
1099            let mut writer = LineDelimitedWriter::new(&mut buf);
1100            writer.write_batches(&[&batch]).unwrap();
1101        }
1102
1103        assert_json_eq(
1104            &buf,
1105            r#"{"c1":["a","a1"],"c2":1}
1106{"c1":["b"],"c2":2}
1107{"c1":["c"],"c2":3}
1108{"c1":["d"],"c2":4}
1109{"c1":["e"],"c2":5}
1110"#,
1111        );
1112    }
1113
1114    #[test]
1115    fn write_nested_list() {
1116        let list_inner_type = Field::new(
1117            "a",
1118            DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
1119            false,
1120        );
1121        let field_c1 = Field::new(
1122            "c1",
1123            DataType::List(Arc::new(list_inner_type.clone())),
1124            false,
1125        );
1126        let field_c2 = Field::new("c2", DataType::Utf8, true);
1127        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1128
1129        // list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
1130        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1131
1132        let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1133        // Construct a list array from the above two
1134        let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1135            .len(3)
1136            .add_buffer(a_value_offsets)
1137            .null_bit_buffer(Some(Buffer::from([0b00000111])))
1138            .add_child_data(a_values.into_data())
1139            .build()
1140            .unwrap();
1141
1142        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1143        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1144            .len(3)
1145            .add_buffer(c1_value_offsets)
1146            .add_child_data(a_list_data)
1147            .build()
1148            .unwrap();
1149
1150        let c1 = ListArray::from(c1_list_data);
1151        let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1152
1153        let batch =
1154            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1155
1156        let mut buf = Vec::new();
1157        {
1158            let mut writer = LineDelimitedWriter::new(&mut buf);
1159            writer.write_batches(&[&batch]).unwrap();
1160        }
1161
1162        assert_json_eq(
1163            &buf,
1164            r#"{"c1":[[1,2],[3]],"c2":"foo"}
1165{"c1":[],"c2":"bar"}
1166{"c1":[[4,5,6]]}
1167"#,
1168        );
1169    }
1170
1171    #[test]
1172    fn write_list_of_struct() {
1173        let field_c1 = Field::new(
1174            "c1",
1175            DataType::List(Arc::new(Field::new(
1176                "s",
1177                DataType::Struct(Fields::from(vec![
1178                    Field::new("c11", DataType::Int32, true),
1179                    Field::new(
1180                        "c12",
1181                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1182                        false,
1183                    ),
1184                ])),
1185                false,
1186            ))),
1187            true,
1188        );
1189        let field_c2 = Field::new("c2", DataType::Int32, false);
1190        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1191
1192        let struct_values = StructArray::from(vec![
1193            (
1194                Arc::new(Field::new("c11", DataType::Int32, true)),
1195                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1196            ),
1197            (
1198                Arc::new(Field::new(
1199                    "c12",
1200                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1201                    false,
1202                )),
1203                Arc::new(StructArray::from(vec![(
1204                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1205                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1206                )])) as ArrayRef,
1207            ),
1208        ]);
1209
1210        // list column rows (c1):
1211        // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}],
1212        // null,
1213        // [{"c11": 5, "c12": {"c121": "g"}}]
1214        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1215        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1216            .len(3)
1217            .add_buffer(c1_value_offsets)
1218            .add_child_data(struct_values.into_data())
1219            .null_bit_buffer(Some(Buffer::from([0b00000101])))
1220            .build()
1221            .unwrap();
1222        let c1 = ListArray::from(c1_list_data);
1223
1224        let c2 = Int32Array::from(vec![1, 2, 3]);
1225
1226        let batch =
1227            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1228
1229        let mut buf = Vec::new();
1230        {
1231            let mut writer = LineDelimitedWriter::new(&mut buf);
1232            writer.write_batches(&[&batch]).unwrap();
1233        }
1234
1235        assert_json_eq(
1236            &buf,
1237            r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1238{"c2":2}
1239{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1240"#,
1241        );
1242    }
1243
1244    fn assert_write_list_view<O: OffsetSizeTrait>() {
1245        let field = Arc::new(Field::new("item", DataType::Int32, true));
1246        let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
1247        let schema = Schema::new(vec![Field::new("lv", data_type, true)]);
1248
1249        // rows: [1, 2, 3], [4, null], null, [6]
1250        let values = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), None, Some(6)]);
1251        let offsets = [0, 3, 0, 5]
1252            .iter()
1253            .map(|&v| O::from_usize(v).unwrap())
1254            .collect::<Vec<_>>();
1255        let sizes = [3, 2, 0, 1]
1256            .iter()
1257            .map(|&v| O::from_usize(v).unwrap())
1258            .collect::<Vec<_>>();
1259        let list_view = GenericListViewArray::<O>::try_new(
1260            field,
1261            ScalarBuffer::from(offsets),
1262            ScalarBuffer::from(sizes),
1263            Arc::new(values),
1264            Some(NullBuffer::from_iter([true, true, false, true])),
1265        )
1266        .unwrap();
1267
1268        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
1269
1270        let mut buf = Vec::new();
1271        {
1272            let mut writer = LineDelimitedWriter::new(&mut buf);
1273            writer.write_batches(&[&batch]).unwrap();
1274        }
1275
1276        assert_json_eq(
1277            &buf,
1278            r#"{"lv":[1,2,3]}
1279{"lv":[4,null]}
1280{}
1281{"lv":[6]}
1282"#,
1283        );
1284    }
1285
1286    #[test]
1287    fn write_list_view() {
1288        assert_write_list_view::<i32>();
1289        assert_write_list_view::<i64>();
1290    }
1291
1292    fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1293        let file = File::open(test_file).unwrap();
1294        let mut reader = BufReader::new(file);
1295        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1296        reader.rewind().unwrap();
1297
1298        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1299        let mut reader = builder.build(reader).unwrap();
1300        let batch = reader.next().unwrap().unwrap();
1301
1302        let mut buf = Vec::new();
1303        {
1304            if remove_nulls {
1305                let mut writer = LineDelimitedWriter::new(&mut buf);
1306                writer.write_batches(&[&batch]).unwrap();
1307            } else {
1308                let mut writer = WriterBuilder::new()
1309                    .with_explicit_nulls(true)
1310                    .build::<_, LineDelimited>(&mut buf);
1311                writer.write_batches(&[&batch]).unwrap();
1312            }
1313        }
1314
1315        let result = str::from_utf8(&buf).unwrap();
1316        let expected = read_to_string(test_file).unwrap();
1317        for (r, e) in result.lines().zip(expected.lines()) {
1318            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1319            if remove_nulls {
1320                // remove null value from object to make comparison consistent:
1321                if let Value::Object(obj) = expected_json {
1322                    expected_json =
1323                        Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1324                }
1325            }
1326            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1327        }
1328    }
1329
1330    #[test]
1331    fn write_basic_rows() {
1332        test_write_for_file("test/data/basic.json", true);
1333    }
1334
1335    #[test]
1336    fn write_arrays() {
1337        test_write_for_file("test/data/arrays.json", true);
1338    }
1339
1340    #[test]
1341    fn write_basic_nulls() {
1342        test_write_for_file("test/data/basic_nulls.json", true);
1343    }
1344
1345    #[test]
1346    fn write_nested_with_nulls() {
1347        test_write_for_file("test/data/nested_with_nulls.json", false);
1348    }
1349
1350    #[test]
1351    fn json_line_writer_empty() {
1352        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1353        writer.finish().unwrap();
1354        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1355    }
1356
1357    #[test]
1358    fn json_array_writer_empty() {
1359        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1360        writer.finish().unwrap();
1361        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1362    }
1363
1364    #[test]
1365    fn json_line_writer_empty_batch() {
1366        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1367
1368        let array = Int32Array::from(Vec::<i32>::new());
1369        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1370        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1371
1372        writer.write(&batch).unwrap();
1373        writer.finish().unwrap();
1374        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1375    }
1376
1377    #[test]
1378    fn json_array_writer_empty_batch() {
1379        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1380
1381        let array = Int32Array::from(Vec::<i32>::new());
1382        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1383        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1384
1385        writer.write(&batch).unwrap();
1386        writer.finish().unwrap();
1387        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1388    }
1389
1390    #[test]
1391    fn json_struct_array_nulls() {
1392        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1393            Some(vec![Some(1), Some(2)]),
1394            Some(vec![None]),
1395            Some(vec![]),
1396            Some(vec![Some(3), None]), // masked for a
1397            Some(vec![Some(4), Some(5)]),
1398            None, // masked for a
1399            None,
1400        ]);
1401
1402        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1403        let array = Arc::new(inner) as ArrayRef;
1404        let struct_array_a = StructArray::from((
1405            vec![(field.clone(), array.clone())],
1406            Buffer::from([0b01010111]),
1407        ));
1408        let struct_array_b = StructArray::from(vec![(field, array)]);
1409
1410        let schema = Schema::new(vec![
1411            Field::new_struct("a", struct_array_a.fields().clone(), true),
1412            Field::new_struct("b", struct_array_b.fields().clone(), true),
1413        ]);
1414
1415        let batch = RecordBatch::try_new(
1416            Arc::new(schema),
1417            vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1418        )
1419        .unwrap();
1420
1421        let mut buf = Vec::new();
1422        {
1423            let mut writer = LineDelimitedWriter::new(&mut buf);
1424            writer.write_batches(&[&batch]).unwrap();
1425        }
1426
1427        assert_json_eq(
1428            &buf,
1429            r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1430{"a":{"list":[null]},"b":{"list":[null]}}
1431{"a":{"list":[]},"b":{"list":[]}}
1432{"b":{"list":[3,null]}}
1433{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1434{"b":{}}
1435{"a":{},"b":{}}
1436"#,
1437        );
1438    }
1439
1440    fn run_json_writer_map_with_keys(keys_array: ArrayRef) {
1441        let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1442
1443        let keys_field = Arc::new(Field::new("keys", keys_array.data_type().clone(), false));
1444        let values_field = Arc::new(Field::new("values", DataType::Int64, false));
1445        let entry_struct = StructArray::from(vec![
1446            (keys_field, keys_array.clone()),
1447            (values_field, Arc::new(values_array) as ArrayRef),
1448        ]);
1449
1450        let map_data_type = DataType::Map(
1451            Arc::new(Field::new(
1452                "entries",
1453                entry_struct.data_type().clone(),
1454                false,
1455            )),
1456            false,
1457        );
1458
1459        // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}, {"quux": 50}, {}]
1460        let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1461        let valid_buffer = Buffer::from([0b00111101]);
1462
1463        let map_data = ArrayData::builder(map_data_type.clone())
1464            .len(6)
1465            .null_bit_buffer(Some(valid_buffer))
1466            .add_buffer(entry_offsets)
1467            .add_child_data(entry_struct.into_data())
1468            .build()
1469            .unwrap();
1470
1471        let map = MapArray::from(map_data);
1472
1473        let map_field = Field::new("map", map_data_type, true);
1474        let schema = Arc::new(Schema::new(vec![map_field]));
1475
1476        let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1477
1478        let mut buf = Vec::new();
1479        {
1480            let mut writer = LineDelimitedWriter::new(&mut buf);
1481            writer.write_batches(&[&batch]).unwrap();
1482        }
1483
1484        assert_json_eq(
1485            &buf,
1486            r#"{"map":{"foo":10}}
1487{}
1488{"map":{}}
1489{"map":{"bar":20,"baz":30,"qux":40}}
1490{"map":{"quux":50}}
1491{"map":{}}
1492"#,
1493        );
1494    }
1495
1496    #[test]
1497    fn json_writer_map() {
1498        // Utf8 (StringArray)
1499        let keys_utf8 = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1500        run_json_writer_map_with_keys(Arc::new(keys_utf8) as ArrayRef);
1501
1502        // LargeUtf8 (LargeStringArray)
1503        let keys_large = super::LargeStringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1504        run_json_writer_map_with_keys(Arc::new(keys_large) as ArrayRef);
1505
1506        // Utf8View (StringViewArray)
1507        let keys_view = super::StringViewArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1508        run_json_writer_map_with_keys(Arc::new(keys_view) as ArrayRef);
1509    }
1510
1511    #[test]
1512    fn test_write_single_batch() {
1513        let test_file = "test/data/basic.json";
1514        let file = File::open(test_file).unwrap();
1515        let mut reader = BufReader::new(file);
1516        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1517        reader.rewind().unwrap();
1518
1519        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1520        let mut reader = builder.build(reader).unwrap();
1521        let batch = reader.next().unwrap().unwrap();
1522
1523        let mut buf = Vec::new();
1524        {
1525            let mut writer = LineDelimitedWriter::new(&mut buf);
1526            writer.write(&batch).unwrap();
1527        }
1528
1529        let result = str::from_utf8(&buf).unwrap();
1530        let expected = read_to_string(test_file).unwrap();
1531        for (r, e) in result.lines().zip(expected.lines()) {
1532            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1533            // remove null value from object to make comparison consistent:
1534            if let Value::Object(obj) = expected_json {
1535                expected_json =
1536                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1537            }
1538            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1539        }
1540    }
1541
1542    #[test]
1543    fn test_write_multi_batches() {
1544        let test_file = "test/data/basic.json";
1545
1546        let schema = SchemaRef::new(Schema::new(vec![
1547            Field::new("a", DataType::Int64, true),
1548            Field::new("b", DataType::Float64, true),
1549            Field::new("c", DataType::Boolean, true),
1550            Field::new("d", DataType::Utf8, true),
1551            Field::new("e", DataType::Utf8, true),
1552            Field::new("f", DataType::Utf8, true),
1553            Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1554            Field::new("h", DataType::Float16, true),
1555        ]));
1556
1557        let mut reader = ReaderBuilder::new(schema.clone())
1558            .build(BufReader::new(File::open(test_file).unwrap()))
1559            .unwrap();
1560        let batch = reader.next().unwrap().unwrap();
1561
1562        // test batches = an empty batch + 2 same batches, finally result should be eq to 2 same batches
1563        let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1564
1565        let mut buf = Vec::new();
1566        {
1567            let mut writer = LineDelimitedWriter::new(&mut buf);
1568            writer.write_batches(&batches).unwrap();
1569        }
1570
1571        let result = str::from_utf8(&buf).unwrap();
1572        let expected = read_to_string(test_file).unwrap();
1573        // result is eq to 2 same batches
1574        let expected = format!("{expected}\n{expected}");
1575        for (r, e) in result.lines().zip(expected.lines()) {
1576            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1577            // remove null value from object to make comparison consistent:
1578            if let Value::Object(obj) = expected_json {
1579                expected_json =
1580                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1581            }
1582            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1583        }
1584    }
1585
1586    #[test]
1587    fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1588        fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1589            let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1590                Some(vec![None, None, None]),
1591                Some(vec![Some(1), Some(2), Some(3)]),
1592                None,
1593                Some(vec![None, None, None]),
1594            ]));
1595            let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1596            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1597            (array, field)
1598        }
1599
1600        fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1601            let array = Arc::new(DictionaryArray::from_iter(vec![
1602                Some("cupcakes"),
1603                None,
1604                Some("bear"),
1605                Some("kuma"),
1606            ]));
1607            let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1608            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1609            (array, field)
1610        }
1611
1612        fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1613            let string_builder = StringBuilder::new();
1614            let int_builder = Int64Builder::new();
1615            let mut builder = MapBuilder::new(None, string_builder, int_builder);
1616
1617            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1618            builder.keys().append_value("foo");
1619            builder.values().append_value(10);
1620            builder.append(true).unwrap();
1621
1622            builder.append(false).unwrap();
1623
1624            builder.append(true).unwrap();
1625
1626            builder.keys().append_value("bar");
1627            builder.values().append_value(20);
1628            builder.keys().append_value("baz");
1629            builder.values().append_value(30);
1630            builder.keys().append_value("qux");
1631            builder.values().append_value(40);
1632            builder.append(true).unwrap();
1633
1634            let array = Arc::new(builder.finish());
1635            let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1636            (array, field)
1637        }
1638
1639        fn root_list() -> (Arc<ListArray>, Field) {
1640            let struct_array = StructArray::from(vec![
1641                (
1642                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1643                    Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1644                ),
1645                (
1646                    Arc::new(Field::new("int32", DataType::Int32, true)),
1647                    Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1648                ),
1649            ]);
1650
1651            let field = Field::new_list(
1652                "list",
1653                Field::new("struct", struct_array.data_type().clone(), true),
1654                true,
1655            );
1656
1657            // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1658            let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1659            let data = ArrayData::builder(field.data_type().clone())
1660                .len(4)
1661                .add_buffer(entry_offsets)
1662                .add_child_data(struct_array.into_data())
1663                .null_bit_buffer(Some([0b00000101].into()))
1664                .build()
1665                .unwrap();
1666            let array = Arc::new(ListArray::from(data));
1667            (array, field)
1668        }
1669
1670        let (nested_list_array, nested_list_field) = nested_list();
1671        let (nested_dict_array, nested_dict_field) = nested_dict();
1672        let (nested_map_array, nested_map_field) = nested_map();
1673        let (root_list_array, root_list_field) = root_list();
1674
1675        let schema = Schema::new(vec![
1676            Field::new("date", DataType::Date32, true),
1677            Field::new("null", DataType::Null, true),
1678            Field::new_struct(
1679                "struct",
1680                vec![
1681                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1682                    nested_list_field.clone(),
1683                    nested_dict_field.clone(),
1684                    nested_map_field.clone(),
1685                ],
1686                true,
1687            ),
1688            root_list_field,
1689        ]);
1690
1691        let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1692        let arr_null = NullArray::new(4);
1693        let arr_struct = StructArray::from(vec![
1694            // [{"utf8":"a"},{"utf8":null},{"utf8":null},{"utf8":"b"}]
1695            (
1696                Arc::new(Field::new("utf8", DataType::Utf8, true)),
1697                Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1698            ),
1699            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1700            (nested_list_field, nested_list_array as ArrayRef),
1701            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1702            (nested_dict_field, nested_dict_array as ArrayRef),
1703            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1704            (nested_map_field, nested_map_array as ArrayRef),
1705        ]);
1706
1707        let batch = RecordBatch::try_new(
1708            Arc::new(schema),
1709            vec![
1710                // [{"date":"1970-01-01"},{"date":null},{"date":"1970-01-02"},{"date":null}]
1711                Arc::new(arr_date32),
1712                // [{"null":null},{"null":null},{"null":null},{"null":null}]
1713                Arc::new(arr_null),
1714                Arc::new(arr_struct),
1715                // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1716                root_list_array,
1717            ],
1718        )?;
1719
1720        let mut buf = Vec::new();
1721        {
1722            let mut writer = WriterBuilder::new()
1723                .with_explicit_nulls(true)
1724                .build::<_, JsonArray>(&mut buf);
1725            writer.write_batches(&[&batch])?;
1726            writer.finish()?;
1727        }
1728
1729        let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1730        let expected = serde_json::from_value::<Vec<Value>>(json!([
1731          {
1732            "date": "1970-01-01",
1733            "list": [
1734              {
1735                "int32": 1,
1736                "utf8": "a"
1737              },
1738              {
1739                "int32": null,
1740                "utf8": "b"
1741              }
1742            ],
1743            "null": null,
1744            "struct": {
1745              "dict": "cupcakes",
1746              "list": [
1747                null,
1748                null,
1749                null
1750              ],
1751              "map": {
1752                "foo": 10
1753              },
1754              "utf8": "a"
1755            }
1756          },
1757          {
1758            "date": null,
1759            "list": null,
1760            "null": null,
1761            "struct": {
1762              "dict": null,
1763              "list": [
1764                1,
1765                2,
1766                3
1767              ],
1768              "map": null,
1769              "utf8": null
1770            }
1771          },
1772          {
1773            "date": "1970-01-02",
1774            "list": [
1775              {
1776                "int32": 5,
1777                "utf8": null
1778              }
1779            ],
1780            "null": null,
1781            "struct": {
1782              "dict": "bear",
1783              "list": null,
1784              "map": {},
1785              "utf8": null
1786            }
1787          },
1788          {
1789            "date": null,
1790            "list": null,
1791            "null": null,
1792            "struct": {
1793              "dict": "kuma",
1794              "list": [
1795                null,
1796                null,
1797                null
1798              ],
1799              "map": {
1800                "bar": 20,
1801                "baz": 30,
1802                "qux": 40
1803              },
1804              "utf8": "b"
1805            }
1806          }
1807        ]))
1808        .unwrap();
1809
1810        assert_eq!(actual, expected);
1811
1812        Ok(())
1813    }
1814
1815    fn build_array_binary<O: OffsetSizeTrait>(values: &[Option<&[u8]>]) -> RecordBatch {
1816        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1817            "bytes",
1818            GenericBinaryType::<O>::DATA_TYPE,
1819            true,
1820        )]));
1821        let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1822        for value in values {
1823            match value {
1824                Some(v) => builder.append_value(v),
1825                None => builder.append_null(),
1826            }
1827        }
1828        let array = Arc::new(builder.finish()) as ArrayRef;
1829        RecordBatch::try_new(schema, vec![array]).unwrap()
1830    }
1831
1832    fn build_array_binary_view(values: &[Option<&[u8]>]) -> RecordBatch {
1833        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1834            "bytes",
1835            DataType::BinaryView,
1836            true,
1837        )]));
1838        let mut builder = BinaryViewBuilder::new();
1839        for value in values {
1840            match value {
1841                Some(v) => builder.append_value(v),
1842                None => builder.append_null(),
1843            }
1844        }
1845        let array = Arc::new(builder.finish()) as ArrayRef;
1846        RecordBatch::try_new(schema, vec![array]).unwrap()
1847    }
1848
1849    fn assert_binary_json(batch: &RecordBatch) {
1850        // encode and check JSON with explicit nulls:
1851        {
1852            let mut buf = Vec::new();
1853            let json_value: Value = {
1854                let mut writer = WriterBuilder::new()
1855                    .with_explicit_nulls(true)
1856                    .build::<_, JsonArray>(&mut buf);
1857                writer.write(batch).unwrap();
1858                writer.close().unwrap();
1859                serde_json::from_slice(&buf).unwrap()
1860            };
1861
1862            assert_eq!(
1863                json!([
1864                    {
1865                        "bytes": "4e656420466c616e64657273"
1866                    },
1867                    {
1868                        "bytes": null // the explicit null
1869                    },
1870                    {
1871                        "bytes": "54726f79204d63436c757265"
1872                    }
1873                ]),
1874                json_value,
1875            );
1876        }
1877
1878        // encode and check JSON with no explicit nulls:
1879        {
1880            let mut buf = Vec::new();
1881            let json_value: Value = {
1882                // explicit nulls are off by default, so we don't need
1883                // to set that when creating the writer:
1884                let mut writer = ArrayWriter::new(&mut buf);
1885                writer.write(batch).unwrap();
1886                writer.close().unwrap();
1887                serde_json::from_slice(&buf).unwrap()
1888            };
1889
1890            assert_eq!(
1891                json!([
1892                    { "bytes": "4e656420466c616e64657273" },
1893                    {},
1894                    { "bytes": "54726f79204d63436c757265" }
1895                ]),
1896                json_value
1897            );
1898        }
1899    }
1900
1901    #[test]
1902    fn test_writer_binary() {
1903        let values: [Option<&[u8]>; 3] = [
1904            Some(b"Ned Flanders" as &[u8]),
1905            None,
1906            Some(b"Troy McClure" as &[u8]),
1907        ];
1908        // Binary:
1909        {
1910            let batch = build_array_binary::<i32>(&values);
1911            assert_binary_json(&batch);
1912        }
1913        // LargeBinary:
1914        {
1915            let batch = build_array_binary::<i64>(&values);
1916            assert_binary_json(&batch);
1917        }
1918        {
1919            let batch = build_array_binary_view(&values);
1920            assert_binary_json(&batch);
1921        }
1922    }
1923
1924    #[test]
1925    fn test_writer_fixed_size_binary() {
1926        // set up schema:
1927        let size = 11;
1928        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1929            "bytes",
1930            DataType::FixedSizeBinary(size),
1931            true,
1932        )]));
1933
1934        // build record batch:
1935        let mut builder = FixedSizeBinaryBuilder::new(size);
1936        let values = [Some(b"hello world"), None, Some(b"summer rain")];
1937        for value in values {
1938            match value {
1939                Some(v) => builder.append_value(v).unwrap(),
1940                None => builder.append_null(),
1941            }
1942        }
1943        let array = Arc::new(builder.finish()) as ArrayRef;
1944        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1945
1946        // encode and check JSON with explicit nulls:
1947        {
1948            let mut buf = Vec::new();
1949            let json_value: Value = {
1950                let mut writer = WriterBuilder::new()
1951                    .with_explicit_nulls(true)
1952                    .build::<_, JsonArray>(&mut buf);
1953                writer.write(&batch).unwrap();
1954                writer.close().unwrap();
1955                serde_json::from_slice(&buf).unwrap()
1956            };
1957
1958            assert_eq!(
1959                json!([
1960                    {
1961                        "bytes": "68656c6c6f20776f726c64"
1962                    },
1963                    {
1964                        "bytes": null // the explicit null
1965                    },
1966                    {
1967                        "bytes": "73756d6d6572207261696e"
1968                    }
1969                ]),
1970                json_value,
1971            );
1972        }
1973        // encode and check JSON with no explicit nulls:
1974        {
1975            let mut buf = Vec::new();
1976            let json_value: Value = {
1977                // explicit nulls are off by default, so we don't need
1978                // to set that when creating the writer:
1979                let mut writer = ArrayWriter::new(&mut buf);
1980                writer.write(&batch).unwrap();
1981                writer.close().unwrap();
1982                serde_json::from_slice(&buf).unwrap()
1983            };
1984
1985            assert_eq!(
1986                json!([
1987                    {
1988                        "bytes": "68656c6c6f20776f726c64"
1989                    },
1990                    {}, // empty because nulls are omitted
1991                    {
1992                        "bytes": "73756d6d6572207261696e"
1993                    }
1994                ]),
1995                json_value,
1996            );
1997        }
1998    }
1999
2000    #[test]
2001    fn test_writer_fixed_size_list() {
2002        let size = 3;
2003        let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
2004        let schema = SchemaRef::new(Schema::new(vec![Field::new(
2005            "list",
2006            DataType::FixedSizeList(field, size),
2007            true,
2008        )]));
2009
2010        let values_builder = Int32Builder::new();
2011        let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
2012        let lists = [
2013            Some([Some(1), Some(2), None]),
2014            Some([Some(3), None, Some(4)]),
2015            Some([None, Some(5), Some(6)]),
2016            None,
2017        ];
2018        for list in lists {
2019            match list {
2020                Some(l) => {
2021                    for value in l {
2022                        match value {
2023                            Some(v) => list_builder.values().append_value(v),
2024                            None => list_builder.values().append_null(),
2025                        }
2026                    }
2027                    list_builder.append(true);
2028                }
2029                None => {
2030                    for _ in 0..size {
2031                        list_builder.values().append_null();
2032                    }
2033                    list_builder.append(false);
2034                }
2035            }
2036        }
2037        let array = Arc::new(list_builder.finish()) as ArrayRef;
2038        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2039
2040        //encode and check JSON with explicit nulls:
2041        {
2042            let json_value: Value = {
2043                let mut buf = Vec::new();
2044                let mut writer = WriterBuilder::new()
2045                    .with_explicit_nulls(true)
2046                    .build::<_, JsonArray>(&mut buf);
2047                writer.write(&batch).unwrap();
2048                writer.close().unwrap();
2049                serde_json::from_slice(&buf).unwrap()
2050            };
2051            assert_eq!(
2052                json!([
2053                    {"list": [1, 2, null]},
2054                    {"list": [3, null, 4]},
2055                    {"list": [null, 5, 6]},
2056                    {"list": null},
2057                ]),
2058                json_value
2059            );
2060        }
2061        // encode and check JSON with no explicit nulls:
2062        {
2063            let json_value: Value = {
2064                let mut buf = Vec::new();
2065                let mut writer = ArrayWriter::new(&mut buf);
2066                writer.write(&batch).unwrap();
2067                writer.close().unwrap();
2068                serde_json::from_slice(&buf).unwrap()
2069            };
2070            assert_eq!(
2071                json!([
2072                    {"list": [1, 2, null]},
2073                    {"list": [3, null, 4]},
2074                    {"list": [null, 5, 6]},
2075                    {}, // empty because nulls are omitted
2076                ]),
2077                json_value
2078            );
2079        }
2080    }
2081
2082    #[test]
2083    fn test_writer_null_dict() {
2084        let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
2085        let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
2086        let dict = DictionaryArray::new(keys, values);
2087
2088        let schema = SchemaRef::new(Schema::new(vec![Field::new(
2089            "my_dict",
2090            DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
2091            true,
2092        )]));
2093
2094        let array = Arc::new(dict) as ArrayRef;
2095        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2096
2097        let mut json = Vec::new();
2098        let write_builder = WriterBuilder::new().with_explicit_nulls(true);
2099        let mut writer = write_builder.build::<_, JsonArray>(&mut json);
2100        writer.write(&batch).unwrap();
2101        writer.close().unwrap();
2102
2103        let json_str = str::from_utf8(&json).unwrap();
2104        assert_eq!(
2105            json_str,
2106            r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
2107        )
2108    }
2109
2110    #[test]
2111    fn test_decimal32_encoder() {
2112        let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
2113            .with_precision_and_scale(8, 2)
2114            .unwrap();
2115        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2116        let schema = Schema::new(vec![field]);
2117        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2118
2119        let mut buf = Vec::new();
2120        {
2121            let mut writer = LineDelimitedWriter::new(&mut buf);
2122            writer.write_batches(&[&batch]).unwrap();
2123        }
2124
2125        assert_json_eq(
2126            &buf,
2127            r#"{"decimal":12.34}
2128{"decimal":56.78}
2129{"decimal":90.12}
2130"#,
2131        );
2132    }
2133
2134    #[test]
2135    fn test_decimal64_encoder() {
2136        let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
2137            .with_precision_and_scale(10, 2)
2138            .unwrap();
2139        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2140        let schema = Schema::new(vec![field]);
2141        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2142
2143        let mut buf = Vec::new();
2144        {
2145            let mut writer = LineDelimitedWriter::new(&mut buf);
2146            writer.write_batches(&[&batch]).unwrap();
2147        }
2148
2149        assert_json_eq(
2150            &buf,
2151            r#"{"decimal":12.34}
2152{"decimal":56.78}
2153{"decimal":90.12}
2154"#,
2155        );
2156    }
2157
2158    #[test]
2159    fn test_decimal128_encoder() {
2160        let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
2161            .with_precision_and_scale(10, 2)
2162            .unwrap();
2163        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2164        let schema = Schema::new(vec![field]);
2165        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2166
2167        let mut buf = Vec::new();
2168        {
2169            let mut writer = LineDelimitedWriter::new(&mut buf);
2170            writer.write_batches(&[&batch]).unwrap();
2171        }
2172
2173        assert_json_eq(
2174            &buf,
2175            r#"{"decimal":12.34}
2176{"decimal":56.78}
2177{"decimal":90.12}
2178"#,
2179        );
2180    }
2181
2182    #[test]
2183    fn test_decimal256_encoder() {
2184        let array = Decimal256Array::from_iter_values([
2185            i256::from(123400),
2186            i256::from(567800),
2187            i256::from(901200),
2188        ])
2189        .with_precision_and_scale(10, 4)
2190        .unwrap();
2191        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2192        let schema = Schema::new(vec![field]);
2193        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2194
2195        let mut buf = Vec::new();
2196        {
2197            let mut writer = LineDelimitedWriter::new(&mut buf);
2198            writer.write_batches(&[&batch]).unwrap();
2199        }
2200
2201        assert_json_eq(
2202            &buf,
2203            r#"{"decimal":12.3400}
2204{"decimal":56.7800}
2205{"decimal":90.1200}
2206"#,
2207        );
2208    }
2209
2210    #[test]
2211    fn test_decimal_encoder_with_nulls() {
2212        let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
2213            .with_precision_and_scale(10, 2)
2214            .unwrap();
2215        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2216        let schema = Schema::new(vec![field]);
2217        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2218
2219        let mut buf = Vec::new();
2220        {
2221            let mut writer = LineDelimitedWriter::new(&mut buf);
2222            writer.write_batches(&[&batch]).unwrap();
2223        }
2224
2225        assert_json_eq(
2226            &buf,
2227            r#"{"decimal":12.34}
2228{}
2229{"decimal":56.78}
2230"#,
2231        );
2232    }
2233
2234    #[test]
2235    fn write_structs_as_list() {
2236        let schema = Schema::new(vec![
2237            Field::new(
2238                "c1",
2239                DataType::Struct(Fields::from(vec![
2240                    Field::new("c11", DataType::Int32, true),
2241                    Field::new(
2242                        "c12",
2243                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2244                        false,
2245                    ),
2246                ])),
2247                false,
2248            ),
2249            Field::new("c2", DataType::Utf8, false),
2250        ]);
2251
2252        let c1 = StructArray::from(vec![
2253            (
2254                Arc::new(Field::new("c11", DataType::Int32, true)),
2255                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2256            ),
2257            (
2258                Arc::new(Field::new(
2259                    "c12",
2260                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2261                    false,
2262                )),
2263                Arc::new(StructArray::from(vec![(
2264                    Arc::new(Field::new("c121", DataType::Utf8, false)),
2265                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2266                )])) as ArrayRef,
2267            ),
2268        ]);
2269        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2270
2271        let batch =
2272            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2273
2274        let expected = r#"[[1,["e"]],"a"]
2275[[null,["f"]],"b"]
2276[[5,["g"]],"c"]
2277"#;
2278
2279        let mut buf = Vec::new();
2280        {
2281            let builder = WriterBuilder::new()
2282                .with_explicit_nulls(true)
2283                .with_struct_mode(StructMode::ListOnly);
2284            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2285            writer.write_batches(&[&batch]).unwrap();
2286        }
2287        assert_json_eq(&buf, expected);
2288
2289        let mut buf = Vec::new();
2290        {
2291            let builder = WriterBuilder::new()
2292                .with_explicit_nulls(false)
2293                .with_struct_mode(StructMode::ListOnly);
2294            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2295            writer.write_batches(&[&batch]).unwrap();
2296        }
2297        assert_json_eq(&buf, expected);
2298    }
2299
2300    fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2301        // Note: this is not intended to be an efficient implementation.
2302        // Just a simple example to demonstrate how to implement a custom encoder.
2303        #[derive(Debug)]
2304        enum UnionValue {
2305            Int32(i32),
2306            String(String),
2307        }
2308
2309        #[derive(Debug)]
2310        struct UnionEncoder {
2311            array: Vec<Option<UnionValue>>,
2312        }
2313
2314        impl Encoder for UnionEncoder {
2315            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2316                match &self.array[idx] {
2317                    None => out.extend_from_slice(b"null"),
2318                    Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2319                    Some(UnionValue::String(v)) => {
2320                        out.extend_from_slice(format!("\"{v}\"").as_bytes())
2321                    }
2322                }
2323            }
2324        }
2325
2326        #[derive(Debug)]
2327        struct UnionEncoderFactory;
2328
2329        impl EncoderFactory for UnionEncoderFactory {
2330            fn make_default_encoder<'a>(
2331                &self,
2332                _field: &'a FieldRef,
2333                array: &'a dyn Array,
2334                _options: &'a EncoderOptions,
2335            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2336                let data_type = array.data_type();
2337                let fields = match data_type {
2338                    DataType::Union(fields, UnionMode::Sparse) => fields,
2339                    _ => return Ok(None),
2340                };
2341                // check that the fields are supported
2342                let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2343                for f in fields.iter() {
2344                    match f.data_type() {
2345                        DataType::Null => {}
2346                        DataType::Int32 => {}
2347                        DataType::Utf8 => {}
2348                        _ => return Ok(None),
2349                    }
2350                }
2351                let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2352                let mut values = Vec::with_capacity(type_ids.len());
2353                for idx in 0..type_ids.len() {
2354                    let type_id = type_ids[idx];
2355                    let field = &fields[type_id as usize];
2356                    let value = match field.data_type() {
2357                        DataType::Null => None,
2358                        DataType::Int32 => Some(UnionValue::Int32(
2359                            buffers[type_id as usize]
2360                                .as_primitive::<Int32Type>()
2361                                .value(idx),
2362                        )),
2363                        DataType::Utf8 => Some(UnionValue::String(
2364                            buffers[type_id as usize]
2365                                .as_string::<i32>()
2366                                .value(idx)
2367                                .to_string(),
2368                        )),
2369                        _ => unreachable!(),
2370                    };
2371                    values.push(value);
2372                }
2373                let array_encoder =
2374                    Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2375                let nulls = array.nulls().cloned();
2376                Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2377            }
2378        }
2379
2380        let int_array = Int32Array::from(vec![Some(1), None, None]);
2381        let string_array = StringArray::from(vec![None, Some("a"), None]);
2382        let null_array = NullArray::new(3);
2383        let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2384
2385        let union_fields = [
2386            (0, Arc::new(Field::new("A", DataType::Int32, false))),
2387            (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2388            (2, Arc::new(Field::new("C", DataType::Null, false))),
2389        ]
2390        .into_iter()
2391        .collect::<UnionFields>();
2392
2393        let children = vec![
2394            Arc::new(int_array) as Arc<dyn Array>,
2395            Arc::new(string_array),
2396            Arc::new(null_array),
2397        ];
2398
2399        let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2400
2401        let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2402
2403        let fields = vec![
2404            Field::new(
2405                "union",
2406                DataType::Union(union_fields, UnionMode::Sparse),
2407                true,
2408            ),
2409            Field::new("float", DataType::Float64, true),
2410        ];
2411
2412        let batch = RecordBatch::try_new(
2413            Arc::new(Schema::new(fields)),
2414            vec![
2415                Arc::new(array) as Arc<dyn Array>,
2416                Arc::new(float_array) as Arc<dyn Array>,
2417            ],
2418        )
2419        .unwrap();
2420
2421        (batch, Arc::new(UnionEncoderFactory))
2422    }
2423
2424    #[test]
2425    fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2426        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2427
2428        let mut buf = Vec::new();
2429        {
2430            let mut writer = WriterBuilder::new()
2431                .with_encoder_factory(encoder_factory)
2432                .with_explicit_nulls(false)
2433                .build::<_, LineDelimited>(&mut buf);
2434            writer.write_batches(&[&batch]).unwrap();
2435            writer.finish().unwrap();
2436        }
2437
2438        println!("{}", str::from_utf8(&buf).unwrap());
2439
2440        assert_json_eq(
2441            &buf,
2442            r#"{"union":1,"float":1.0}
2443{"union":"a"}
2444{"union":null,"float":3.4}
2445"#,
2446        );
2447    }
2448
2449    #[test]
2450    fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2451        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2452
2453        let mut buf = Vec::new();
2454        {
2455            let mut writer = WriterBuilder::new()
2456                .with_encoder_factory(encoder_factory)
2457                .with_explicit_nulls(true)
2458                .build::<_, LineDelimited>(&mut buf);
2459            writer.write_batches(&[&batch]).unwrap();
2460            writer.finish().unwrap();
2461        }
2462
2463        assert_json_eq(
2464            &buf,
2465            r#"{"union":1,"float":1.0}
2466{"union":"a","float":null}
2467{"union":null,"float":3.4}
2468"#,
2469        );
2470    }
2471
2472    #[test]
2473    fn test_fallback_encoder_factory_array_implicit_nulls() {
2474        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2475
2476        let json_value: Value = {
2477            let mut buf = Vec::new();
2478            let mut writer = WriterBuilder::new()
2479                .with_encoder_factory(encoder_factory)
2480                .build::<_, JsonArray>(&mut buf);
2481            writer.write_batches(&[&batch]).unwrap();
2482            writer.finish().unwrap();
2483            serde_json::from_slice(&buf).unwrap()
2484        };
2485
2486        let expected = json!([
2487            {"union":1,"float":1.0},
2488            {"union":"a"},
2489            {"float":3.4,"union":null},
2490        ]);
2491
2492        assert_eq!(json_value, expected);
2493    }
2494
2495    #[test]
2496    fn test_fallback_encoder_factory_array_explicit_nulls() {
2497        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2498
2499        let json_value: Value = {
2500            let mut buf = Vec::new();
2501            let mut writer = WriterBuilder::new()
2502                .with_encoder_factory(encoder_factory)
2503                .with_explicit_nulls(true)
2504                .build::<_, JsonArray>(&mut buf);
2505            writer.write_batches(&[&batch]).unwrap();
2506            writer.finish().unwrap();
2507            serde_json::from_slice(&buf).unwrap()
2508        };
2509
2510        let expected = json!([
2511            {"union":1,"float":1.0},
2512            {"union":"a", "float": null},
2513            {"union":null,"float":3.4},
2514        ]);
2515
2516        assert_eq!(json_value, expected);
2517    }
2518
2519    #[test]
2520    fn test_default_encoder_byte_array() {
2521        struct IntArrayBinaryEncoder<B> {
2522            array: B,
2523        }
2524
2525        impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2526        where
2527            B: ArrayAccessor<Item = &'a [u8]>,
2528        {
2529            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2530                out.push(b'[');
2531                let child = self.array.value(idx);
2532                for (idx, byte) in child.iter().enumerate() {
2533                    write!(out, "{byte}").unwrap();
2534                    if idx < child.len() - 1 {
2535                        out.push(b',');
2536                    }
2537                }
2538                out.push(b']');
2539            }
2540        }
2541
2542        #[derive(Debug)]
2543        struct IntArayBinaryEncoderFactory;
2544
2545        impl EncoderFactory for IntArayBinaryEncoderFactory {
2546            fn make_default_encoder<'a>(
2547                &self,
2548                _field: &'a FieldRef,
2549                array: &'a dyn Array,
2550                _options: &'a EncoderOptions,
2551            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2552                match array.data_type() {
2553                    DataType::Binary => {
2554                        let array = array.as_binary::<i32>();
2555                        let encoder = IntArrayBinaryEncoder { array };
2556                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2557                        let nulls = array.nulls().cloned();
2558                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2559                    }
2560                    _ => Ok(None),
2561                }
2562            }
2563        }
2564
2565        let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2566        let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2567        let fields = vec![
2568            Field::new("bytes", DataType::Binary, true),
2569            Field::new("float", DataType::Float64, true),
2570        ];
2571        let batch = RecordBatch::try_new(
2572            Arc::new(Schema::new(fields)),
2573            vec![
2574                Arc::new(binary_array) as Arc<dyn Array>,
2575                Arc::new(float_array) as Arc<dyn Array>,
2576            ],
2577        )
2578        .unwrap();
2579
2580        let json_value: Value = {
2581            let mut buf = Vec::new();
2582            let mut writer = WriterBuilder::new()
2583                .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2584                .build::<_, JsonArray>(&mut buf);
2585            writer.write_batches(&[&batch]).unwrap();
2586            writer.finish().unwrap();
2587            serde_json::from_slice(&buf).unwrap()
2588        };
2589
2590        let expected = json!([
2591            {"bytes": [97], "float": 1.0},
2592            {"float": 2.3},
2593            {"bytes": [98]},
2594        ]);
2595
2596        assert_eq!(json_value, expected);
2597    }
2598
2599    #[test]
2600    fn test_encoder_factory_customize_dictionary() {
2601        // Test that we can customize the encoding of T even when it shows up as Dictionary<_, T>.
2602
2603        // No particular reason to choose this example.
2604        // Just trying to add some variety to the test cases and demonstrate use cases of the encoder factory.
2605        struct PaddedInt32Encoder {
2606            array: Int32Array,
2607        }
2608
2609        impl Encoder for PaddedInt32Encoder {
2610            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2611                let value = self.array.value(idx);
2612                write!(out, "\"{value:0>8}\"").unwrap();
2613            }
2614        }
2615
2616        #[derive(Debug)]
2617        struct CustomEncoderFactory;
2618
2619        impl EncoderFactory for CustomEncoderFactory {
2620            fn make_default_encoder<'a>(
2621                &self,
2622                field: &'a FieldRef,
2623                array: &'a dyn Array,
2624                _options: &'a EncoderOptions,
2625            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2626                // The point here is:
2627                // 1. You can use information from Field to determine how to do the encoding.
2628                // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array
2629                //    and thus the data type of `field` may not match the data type of `array`.
2630                let padded = field
2631                    .metadata()
2632                    .get("padded")
2633                    .map(|v| v == "true")
2634                    .unwrap_or_default();
2635                match (array.data_type(), padded) {
2636                    (DataType::Int32, true) => {
2637                        let array = array.as_primitive::<Int32Type>();
2638                        let nulls = array.nulls().cloned();
2639                        let encoder = PaddedInt32Encoder {
2640                            array: array.clone(),
2641                        };
2642                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2643                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2644                    }
2645                    _ => Ok(None),
2646                }
2647            }
2648        }
2649
2650        let to_json = |batch| {
2651            let mut buf = Vec::new();
2652            let mut writer = WriterBuilder::new()
2653                .with_encoder_factory(Arc::new(CustomEncoderFactory))
2654                .build::<_, JsonArray>(&mut buf);
2655            writer.write_batches(&[batch]).unwrap();
2656            writer.finish().unwrap();
2657            serde_json::from_slice::<Value>(&buf).unwrap()
2658        };
2659
2660        // Control case: no dictionary wrapping works as expected.
2661        let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2662        let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2663            HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2664        ));
2665        let batch = RecordBatch::try_new(
2666            Arc::new(Schema::new(vec![field.clone()])),
2667            vec![Arc::new(array)],
2668        )
2669        .unwrap();
2670
2671        let json_value = to_json(&batch);
2672
2673        let expected = json!([
2674            {"int": "00000001"},
2675            {},
2676            {"int": "00000002"},
2677        ]);
2678
2679        assert_eq!(json_value, expected);
2680
2681        // Now make a dictionary batch
2682        let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2683        array_builder.append_value(1);
2684        array_builder.append_null();
2685        array_builder.append_value(1);
2686        let array = array_builder.finish();
2687        let field = Field::new(
2688            "int",
2689            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2690            true,
2691        )
2692        .with_metadata(HashMap::from_iter(vec![(
2693            "padded".to_string(),
2694            "true".to_string(),
2695        )]));
2696        let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2697            .unwrap();
2698
2699        let json_value = to_json(&batch);
2700
2701        let expected = json!([
2702            {"int": "00000001"},
2703            {},
2704            {"int": "00000001"},
2705        ]);
2706
2707        assert_eq!(json_value, expected);
2708    }
2709
2710    #[test]
2711    fn test_write_run_end_encoded() {
2712        let run_ends = Int32Array::from(vec![2, 5, 6]);
2713        let values = StringArray::from(vec![Some("a"), Some("b"), None]);
2714        let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2715
2716        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2717            "c1",
2718            ree.data_type().clone(),
2719            true,
2720        )]));
2721
2722        let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2723
2724        let mut buf = Vec::new();
2725        {
2726            let mut writer = LineDelimitedWriter::new(&mut buf);
2727            writer.write_batches(&[&batch]).unwrap();
2728        }
2729
2730        assert_json_eq(
2731            &buf,
2732            r#"{"c1":"a"}
2733{"c1":"a"}
2734{"c1":"b"}
2735{"c1":"b"}
2736{"c1":"b"}
2737{}
2738"#,
2739        );
2740    }
2741
2742    #[test]
2743    fn test_write_run_end_encoded_int_values() {
2744        let run_ends = Int32Array::from(vec![3, 5]);
2745        let values = Int32Array::from(vec![10, 20]);
2746        let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2747
2748        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2749            "n",
2750            ree.data_type().clone(),
2751            true,
2752        )]));
2753
2754        let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2755
2756        let json_value: Value = {
2757            let mut buf = Vec::new();
2758            let mut writer = WriterBuilder::new().build::<_, JsonArray>(&mut buf);
2759            writer.write_batches(&[&batch]).unwrap();
2760            writer.finish().unwrap();
2761            serde_json::from_slice(&buf).unwrap()
2762        };
2763
2764        let expected = json!([
2765            {"n": 10},
2766            {"n": 10},
2767            {"n": 10},
2768            {"n": 20},
2769            {"n": 20},
2770        ]);
2771
2772        assert_eq!(json_value, expected);
2773    }
2774
2775    #[test]
2776    fn test_run_end_encoded_roundtrip() {
2777        let run_ends = Int32Array::from(vec![3, 5, 7]);
2778        let values = StringArray::from(vec![Some("a"), None, Some("b")]);
2779        let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2780
2781        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2782            "c",
2783            ree.data_type().clone(),
2784            true,
2785        )]));
2786        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ree)]).unwrap();
2787
2788        let mut buf = Vec::new();
2789        {
2790            let mut writer = super::LineDelimitedWriter::new(&mut buf);
2791            writer.write_batches(&[&batch]).unwrap();
2792        }
2793
2794        let batches: Vec<RecordBatch> = ReaderBuilder::new(schema)
2795            .with_batch_size(1024)
2796            .build(std::io::Cursor::new(&buf))
2797            .unwrap()
2798            .collect::<Result<Vec<_>, _>>()
2799            .unwrap();
2800        assert_eq!(batches.len(), 1);
2801
2802        let col = batches[0].column(0);
2803        let run_array = col.as_run::<Int32Type>();
2804
2805        assert_eq!(run_array.len(), 7);
2806        assert_eq!(run_array.run_ends().values(), &[3, 5, 7]);
2807
2808        let values = run_array.values().as_string::<i32>();
2809        assert_eq!(values.len(), 3);
2810        assert_eq!(values.value(0), "a");
2811        assert!(values.is_null(1));
2812        assert_eq!(values.value(2), "b");
2813    }
2814}