1mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullableEncoder};
116
117pub trait JsonFormat: Debug + Default {
120 #[inline]
121 fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123 Ok(())
124 }
125
126 #[inline]
127 fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129 Ok(())
130 }
131
132 #[inline]
133 fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135 Ok(())
136 }
137
138 fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140 Ok(())
141 }
142}
143
144#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157 fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158 writer.write_all(b"\n")?;
159 Ok(())
160 }
161}
162
163#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174 fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175 writer.write_all(b"[")?;
176 Ok(())
177 }
178
179 fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180 if !is_first_row {
181 writer.write_all(b",")?;
182 }
183 Ok(())
184 }
185
186 fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187 writer.write_all(b"]")?;
188 Ok(())
189 }
190}
191
192pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn explicit_nulls(&self) -> bool {
228 self.0.explicit_nulls()
229 }
230
231 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254 self.0 = self.0.with_explicit_nulls(explicit_nulls);
255 self
256 }
257
258 pub fn struct_mode(&self) -> StructMode {
260 self.0.struct_mode()
261 }
262
263 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269 self.0 = self.0.with_struct_mode(struct_mode);
270 self
271 }
272
273 pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278 self.0 = self.0.with_encoder_factory(factory);
279 self
280 }
281
282 pub fn build<W, F>(self, writer: W) -> Writer<W, F>
284 where
285 W: Write,
286 F: JsonFormat,
287 {
288 Writer {
289 writer,
290 started: false,
291 finished: false,
292 format: F::default(),
293 options: self.0,
294 }
295 }
296}
297
298#[derive(Debug)]
309pub struct Writer<W, F>
310where
311 W: Write,
312 F: JsonFormat,
313{
314 writer: W,
316
317 started: bool,
319
320 finished: bool,
322
323 format: F,
325
326 options: EncoderOptions,
328}
329
330impl<W, F> Writer<W, F>
331where
332 W: Write,
333 F: JsonFormat,
334{
335 pub fn new(writer: W) -> Self {
337 Self {
338 writer,
339 started: false,
340 finished: false,
341 format: F::default(),
342 options: EncoderOptions::default(),
343 }
344 }
345
346 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
348 if batch.num_rows() == 0 {
349 return Ok(());
350 }
351
352 let mut buffer = Vec::with_capacity(16 * 1024);
355
356 let mut is_first_row = !self.started;
357 if !self.started {
358 self.format.start_stream(&mut buffer)?;
359 self.started = true;
360 }
361
362 let array = StructArray::from(batch.clone());
363 let field = Arc::new(Field::new_struct(
364 "",
365 batch.schema().fields().clone(),
366 false,
367 ));
368
369 let mut encoder = make_encoder(&field, &array, &self.options)?;
370
371 assert!(!encoder.has_nulls(), "root cannot be nullable");
373 for idx in 0..batch.num_rows() {
374 self.format.start_row(&mut buffer, is_first_row)?;
375 is_first_row = false;
376
377 encoder.encode(idx, &mut buffer);
378 if buffer.len() > 8 * 1024 {
379 self.writer.write_all(&buffer)?;
380 buffer.clear();
381 }
382 self.format.end_row(&mut buffer)?;
383 }
384
385 if !buffer.is_empty() {
386 self.writer.write_all(&buffer)?;
387 }
388
389 Ok(())
390 }
391
392 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
394 for b in batches {
395 self.write(b)?;
396 }
397 Ok(())
398 }
399
400 pub fn finish(&mut self) -> Result<(), ArrowError> {
404 if !self.started {
405 self.format.start_stream(&mut self.writer)?;
406 self.started = true;
407 }
408 if !self.finished {
409 self.format.end_stream(&mut self.writer)?;
410 self.finished = true;
411 }
412
413 Ok(())
414 }
415
416 pub fn into_inner(self) -> W {
418 self.writer
419 }
420}
421
422impl<W, F> RecordBatchWriter for Writer<W, F>
423where
424 W: Write,
425 F: JsonFormat,
426{
427 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
428 self.write(batch)
429 }
430
431 fn close(mut self) -> Result<(), ArrowError> {
432 self.finish()
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use core::str;
439 use std::collections::HashMap;
440 use std::fs::{read_to_string, File};
441 use std::io::{BufReader, Seek};
442 use std::sync::Arc;
443
444 use arrow_array::cast::AsArray;
445 use serde_json::{json, Value};
446
447 use super::LineDelimited;
448 use super::{Encoder, WriterBuilder};
449 use arrow_array::builder::*;
450 use arrow_array::types::*;
451 use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice};
452 use arrow_data::ArrayData;
453
454 use crate::reader::*;
455
456 use super::*;
457
458 fn assert_json_eq(input: &[u8], expected: &str) {
460 let expected: Vec<Option<Value>> = expected
461 .split('\n')
462 .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
463 .collect();
464
465 let actual: Vec<Option<Value>> = input
466 .split(|b| *b == b'\n')
467 .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
468 .collect();
469
470 assert_eq!(actual, expected);
471 }
472
473 #[test]
474 fn write_simple_rows() {
475 let schema = Schema::new(vec![
476 Field::new("c1", DataType::Int32, true),
477 Field::new("c2", DataType::Utf8, true),
478 ]);
479
480 let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
481 let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
482
483 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
484
485 let mut buf = Vec::new();
486 {
487 let mut writer = LineDelimitedWriter::new(&mut buf);
488 writer.write_batches(&[&batch]).unwrap();
489 }
490
491 assert_json_eq(
492 &buf,
493 r#"{"c1":1,"c2":"a"}
494{"c1":2,"c2":"b"}
495{"c1":3,"c2":"c"}
496{"c2":"d"}
497{"c1":5}
498"#,
499 );
500 }
501
502 #[test]
503 fn write_large_utf8_and_utf8_view() {
504 let schema = Schema::new(vec![
505 Field::new("c1", DataType::Utf8, true),
506 Field::new("c2", DataType::LargeUtf8, true),
507 Field::new("c3", DataType::Utf8View, true),
508 ]);
509
510 let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
511 let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
512 let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
513
514 let batch = RecordBatch::try_new(
515 Arc::new(schema),
516 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
517 )
518 .unwrap();
519
520 let mut buf = Vec::new();
521 {
522 let mut writer = LineDelimitedWriter::new(&mut buf);
523 writer.write_batches(&[&batch]).unwrap();
524 }
525
526 assert_json_eq(
527 &buf,
528 r#"{"c1":"a","c2":"a","c3":"a"}
529{"c2":"b","c3":"b"}
530{"c1":"c"}
531{"c1":"d","c2":"d","c3":"d"}
532{}
533"#,
534 );
535 }
536
537 #[test]
538 fn write_dictionary() {
539 let schema = Schema::new(vec![
540 Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
541 Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
542 ]);
543
544 let a: DictionaryArray<Int32Type> = vec![
545 Some("cupcakes"),
546 Some("foo"),
547 Some("foo"),
548 None,
549 Some("cupcakes"),
550 ]
551 .into_iter()
552 .collect();
553 let b: DictionaryArray<Int8Type> =
554 vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
555 .into_iter()
556 .collect();
557
558 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
559
560 let mut buf = Vec::new();
561 {
562 let mut writer = LineDelimitedWriter::new(&mut buf);
563 writer.write_batches(&[&batch]).unwrap();
564 }
565
566 assert_json_eq(
567 &buf,
568 r#"{"c1":"cupcakes","c2":"sdsd"}
569{"c1":"foo","c2":"sdsd"}
570{"c1":"foo"}
571{"c2":"sd"}
572{"c1":"cupcakes","c2":"sdsd"}
573"#,
574 );
575 }
576
577 #[test]
578 fn write_list_of_dictionary() {
579 let dict_field = Arc::new(Field::new_dictionary(
580 "item",
581 DataType::Int32,
582 DataType::Utf8,
583 true,
584 ));
585 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
586
587 let dict_array: DictionaryArray<Int32Type> =
588 vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
589 .into_iter()
590 .collect();
591 let list_array = LargeListArray::try_new(
592 dict_field,
593 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
594 Arc::new(dict_array),
595 Some(NullBuffer::from_iter([true, true, false, true])),
596 )
597 .unwrap();
598
599 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
600
601 let mut buf = Vec::new();
602 {
603 let mut writer = LineDelimitedWriter::new(&mut buf);
604 writer.write_batches(&[&batch]).unwrap();
605 }
606
607 assert_json_eq(
608 &buf,
609 r#"{"l":["a","b","c"]}
610{"l":["a",null]}
611{}
612{"l":["c"]}
613"#,
614 );
615 }
616
617 #[test]
618 fn write_list_of_dictionary_large_values() {
619 let dict_field = Arc::new(Field::new_dictionary(
620 "item",
621 DataType::Int32,
622 DataType::LargeUtf8,
623 true,
624 ));
625 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
626
627 let keys = PrimitiveArray::<Int32Type>::from(vec![
628 Some(0),
629 Some(1),
630 Some(2),
631 Some(0),
632 None,
633 Some(2),
634 ]);
635 let values = LargeStringArray::from(vec!["a", "b", "c"]);
636 let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
637
638 let list_array = LargeListArray::try_new(
639 dict_field,
640 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
641 Arc::new(dict_array),
642 Some(NullBuffer::from_iter([true, true, false, true])),
643 )
644 .unwrap();
645
646 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
647
648 let mut buf = Vec::new();
649 {
650 let mut writer = LineDelimitedWriter::new(&mut buf);
651 writer.write_batches(&[&batch]).unwrap();
652 }
653
654 assert_json_eq(
655 &buf,
656 r#"{"l":["a","b","c"]}
657{"l":["a",null]}
658{}
659{"l":["c"]}
660"#,
661 );
662 }
663
664 #[test]
665 fn write_timestamps() {
666 let ts_string = "2018-11-13T17:11:10.011375885995";
667 let ts_nanos = ts_string
668 .parse::<chrono::NaiveDateTime>()
669 .unwrap()
670 .and_utc()
671 .timestamp_nanos_opt()
672 .unwrap();
673 let ts_micros = ts_nanos / 1000;
674 let ts_millis = ts_micros / 1000;
675 let ts_secs = ts_millis / 1000;
676
677 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
678 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
679 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
680 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
681 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
682
683 let schema = Schema::new(vec![
684 Field::new("nanos", arr_nanos.data_type().clone(), true),
685 Field::new("micros", arr_micros.data_type().clone(), true),
686 Field::new("millis", arr_millis.data_type().clone(), true),
687 Field::new("secs", arr_secs.data_type().clone(), true),
688 Field::new("name", arr_names.data_type().clone(), true),
689 ]);
690 let schema = Arc::new(schema);
691
692 let batch = RecordBatch::try_new(
693 schema,
694 vec![
695 Arc::new(arr_nanos),
696 Arc::new(arr_micros),
697 Arc::new(arr_millis),
698 Arc::new(arr_secs),
699 Arc::new(arr_names),
700 ],
701 )
702 .unwrap();
703
704 let mut buf = Vec::new();
705 {
706 let mut writer = LineDelimitedWriter::new(&mut buf);
707 writer.write_batches(&[&batch]).unwrap();
708 }
709
710 assert_json_eq(
711 &buf,
712 r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
713{"name":"b"}
714"#,
715 );
716 }
717
718 #[test]
719 fn write_timestamps_with_tz() {
720 let ts_string = "2018-11-13T17:11:10.011375885995";
721 let ts_nanos = ts_string
722 .parse::<chrono::NaiveDateTime>()
723 .unwrap()
724 .and_utc()
725 .timestamp_nanos_opt()
726 .unwrap();
727 let ts_micros = ts_nanos / 1000;
728 let ts_millis = ts_micros / 1000;
729 let ts_secs = ts_millis / 1000;
730
731 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
732 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
733 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
734 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
735 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
736
737 let tz = "+00:00";
738
739 let arr_nanos = arr_nanos.with_timezone(tz);
740 let arr_micros = arr_micros.with_timezone(tz);
741 let arr_millis = arr_millis.with_timezone(tz);
742 let arr_secs = arr_secs.with_timezone(tz);
743
744 let schema = Schema::new(vec![
745 Field::new("nanos", arr_nanos.data_type().clone(), true),
746 Field::new("micros", arr_micros.data_type().clone(), true),
747 Field::new("millis", arr_millis.data_type().clone(), true),
748 Field::new("secs", arr_secs.data_type().clone(), true),
749 Field::new("name", arr_names.data_type().clone(), true),
750 ]);
751 let schema = Arc::new(schema);
752
753 let batch = RecordBatch::try_new(
754 schema,
755 vec![
756 Arc::new(arr_nanos),
757 Arc::new(arr_micros),
758 Arc::new(arr_millis),
759 Arc::new(arr_secs),
760 Arc::new(arr_names),
761 ],
762 )
763 .unwrap();
764
765 let mut buf = Vec::new();
766 {
767 let mut writer = LineDelimitedWriter::new(&mut buf);
768 writer.write_batches(&[&batch]).unwrap();
769 }
770
771 assert_json_eq(
772 &buf,
773 r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
774{"name":"b"}
775"#,
776 );
777 }
778
779 #[test]
780 fn write_dates() {
781 let ts_string = "2018-11-13T17:11:10.011375885995";
782 let ts_millis = ts_string
783 .parse::<chrono::NaiveDateTime>()
784 .unwrap()
785 .and_utc()
786 .timestamp_millis();
787
788 let arr_date32 = Date32Array::from(vec![
789 Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
790 None,
791 ]);
792 let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
793 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
794
795 let schema = Schema::new(vec![
796 Field::new("date32", arr_date32.data_type().clone(), true),
797 Field::new("date64", arr_date64.data_type().clone(), true),
798 Field::new("name", arr_names.data_type().clone(), false),
799 ]);
800 let schema = Arc::new(schema);
801
802 let batch = RecordBatch::try_new(
803 schema,
804 vec![
805 Arc::new(arr_date32),
806 Arc::new(arr_date64),
807 Arc::new(arr_names),
808 ],
809 )
810 .unwrap();
811
812 let mut buf = Vec::new();
813 {
814 let mut writer = LineDelimitedWriter::new(&mut buf);
815 writer.write_batches(&[&batch]).unwrap();
816 }
817
818 assert_json_eq(
819 &buf,
820 r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
821{"name":"b"}
822"#,
823 );
824 }
825
826 #[test]
827 fn write_times() {
828 let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
829 let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
830 let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
831 let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
832 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
833
834 let schema = Schema::new(vec![
835 Field::new("time32sec", arr_time32sec.data_type().clone(), true),
836 Field::new("time32msec", arr_time32msec.data_type().clone(), true),
837 Field::new("time64usec", arr_time64usec.data_type().clone(), true),
838 Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
839 Field::new("name", arr_names.data_type().clone(), true),
840 ]);
841 let schema = Arc::new(schema);
842
843 let batch = RecordBatch::try_new(
844 schema,
845 vec![
846 Arc::new(arr_time32sec),
847 Arc::new(arr_time32msec),
848 Arc::new(arr_time64usec),
849 Arc::new(arr_time64nsec),
850 Arc::new(arr_names),
851 ],
852 )
853 .unwrap();
854
855 let mut buf = Vec::new();
856 {
857 let mut writer = LineDelimitedWriter::new(&mut buf);
858 writer.write_batches(&[&batch]).unwrap();
859 }
860
861 assert_json_eq(
862 &buf,
863 r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
864{"name":"b"}
865"#,
866 );
867 }
868
869 #[test]
870 fn write_durations() {
871 let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
872 let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
873 let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
874 let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
875 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
876
877 let schema = Schema::new(vec![
878 Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
879 Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
880 Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
881 Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
882 Field::new("name", arr_names.data_type().clone(), true),
883 ]);
884 let schema = Arc::new(schema);
885
886 let batch = RecordBatch::try_new(
887 schema,
888 vec![
889 Arc::new(arr_durationsec),
890 Arc::new(arr_durationmsec),
891 Arc::new(arr_durationusec),
892 Arc::new(arr_durationnsec),
893 Arc::new(arr_names),
894 ],
895 )
896 .unwrap();
897
898 let mut buf = Vec::new();
899 {
900 let mut writer = LineDelimitedWriter::new(&mut buf);
901 writer.write_batches(&[&batch]).unwrap();
902 }
903
904 assert_json_eq(
905 &buf,
906 r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
907{"name":"b"}
908"#,
909 );
910 }
911
912 #[test]
913 fn write_nested_structs() {
914 let schema = Schema::new(vec![
915 Field::new(
916 "c1",
917 DataType::Struct(Fields::from(vec![
918 Field::new("c11", DataType::Int32, true),
919 Field::new(
920 "c12",
921 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
922 false,
923 ),
924 ])),
925 false,
926 ),
927 Field::new("c2", DataType::Utf8, false),
928 ]);
929
930 let c1 = StructArray::from(vec![
931 (
932 Arc::new(Field::new("c11", DataType::Int32, true)),
933 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
934 ),
935 (
936 Arc::new(Field::new(
937 "c12",
938 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
939 false,
940 )),
941 Arc::new(StructArray::from(vec![(
942 Arc::new(Field::new("c121", DataType::Utf8, false)),
943 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
944 )])) as ArrayRef,
945 ),
946 ]);
947 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
948
949 let batch =
950 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
951
952 let mut buf = Vec::new();
953 {
954 let mut writer = LineDelimitedWriter::new(&mut buf);
955 writer.write_batches(&[&batch]).unwrap();
956 }
957
958 assert_json_eq(
959 &buf,
960 r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
961{"c1":{"c12":{"c121":"f"}},"c2":"b"}
962{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
963"#,
964 );
965 }
966
967 #[test]
968 fn write_struct_with_list_field() {
969 let field_c1 = Field::new(
970 "c1",
971 DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
972 false,
973 );
974 let field_c2 = Field::new("c2", DataType::Int32, false);
975 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
976
977 let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
978 let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
980 let a_list_data = ArrayData::builder(field_c1.data_type().clone())
981 .len(5)
982 .add_buffer(a_value_offsets)
983 .add_child_data(a_values.into_data())
984 .null_bit_buffer(Some(Buffer::from([0b00011111])))
985 .build()
986 .unwrap();
987 let a = ListArray::from(a_list_data);
988
989 let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
990
991 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
992
993 let mut buf = Vec::new();
994 {
995 let mut writer = LineDelimitedWriter::new(&mut buf);
996 writer.write_batches(&[&batch]).unwrap();
997 }
998
999 assert_json_eq(
1000 &buf,
1001 r#"{"c1":["a","a1"],"c2":1}
1002{"c1":["b"],"c2":2}
1003{"c1":["c"],"c2":3}
1004{"c1":["d"],"c2":4}
1005{"c1":["e"],"c2":5}
1006"#,
1007 );
1008 }
1009
1010 #[test]
1011 fn write_nested_list() {
1012 let list_inner_type = Field::new(
1013 "a",
1014 DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
1015 false,
1016 );
1017 let field_c1 = Field::new(
1018 "c1",
1019 DataType::List(Arc::new(list_inner_type.clone())),
1020 false,
1021 );
1022 let field_c2 = Field::new("c2", DataType::Utf8, true);
1023 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1024
1025 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1027
1028 let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1029 let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1031 .len(3)
1032 .add_buffer(a_value_offsets)
1033 .null_bit_buffer(Some(Buffer::from([0b00000111])))
1034 .add_child_data(a_values.into_data())
1035 .build()
1036 .unwrap();
1037
1038 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1039 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1040 .len(3)
1041 .add_buffer(c1_value_offsets)
1042 .add_child_data(a_list_data)
1043 .build()
1044 .unwrap();
1045
1046 let c1 = ListArray::from(c1_list_data);
1047 let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1048
1049 let batch =
1050 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1051
1052 let mut buf = Vec::new();
1053 {
1054 let mut writer = LineDelimitedWriter::new(&mut buf);
1055 writer.write_batches(&[&batch]).unwrap();
1056 }
1057
1058 assert_json_eq(
1059 &buf,
1060 r#"{"c1":[[1,2],[3]],"c2":"foo"}
1061{"c1":[],"c2":"bar"}
1062{"c1":[[4,5,6]]}
1063"#,
1064 );
1065 }
1066
1067 #[test]
1068 fn write_list_of_struct() {
1069 let field_c1 = Field::new(
1070 "c1",
1071 DataType::List(Arc::new(Field::new(
1072 "s",
1073 DataType::Struct(Fields::from(vec![
1074 Field::new("c11", DataType::Int32, true),
1075 Field::new(
1076 "c12",
1077 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1078 false,
1079 ),
1080 ])),
1081 false,
1082 ))),
1083 true,
1084 );
1085 let field_c2 = Field::new("c2", DataType::Int32, false);
1086 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1087
1088 let struct_values = StructArray::from(vec![
1089 (
1090 Arc::new(Field::new("c11", DataType::Int32, true)),
1091 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1092 ),
1093 (
1094 Arc::new(Field::new(
1095 "c12",
1096 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1097 false,
1098 )),
1099 Arc::new(StructArray::from(vec![(
1100 Arc::new(Field::new("c121", DataType::Utf8, false)),
1101 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1102 )])) as ArrayRef,
1103 ),
1104 ]);
1105
1106 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1111 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1112 .len(3)
1113 .add_buffer(c1_value_offsets)
1114 .add_child_data(struct_values.into_data())
1115 .null_bit_buffer(Some(Buffer::from([0b00000101])))
1116 .build()
1117 .unwrap();
1118 let c1 = ListArray::from(c1_list_data);
1119
1120 let c2 = Int32Array::from(vec![1, 2, 3]);
1121
1122 let batch =
1123 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1124
1125 let mut buf = Vec::new();
1126 {
1127 let mut writer = LineDelimitedWriter::new(&mut buf);
1128 writer.write_batches(&[&batch]).unwrap();
1129 }
1130
1131 assert_json_eq(
1132 &buf,
1133 r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1134{"c2":2}
1135{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1136"#,
1137 );
1138 }
1139
1140 fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1141 let file = File::open(test_file).unwrap();
1142 let mut reader = BufReader::new(file);
1143 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1144 reader.rewind().unwrap();
1145
1146 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1147 let mut reader = builder.build(reader).unwrap();
1148 let batch = reader.next().unwrap().unwrap();
1149
1150 let mut buf = Vec::new();
1151 {
1152 if remove_nulls {
1153 let mut writer = LineDelimitedWriter::new(&mut buf);
1154 writer.write_batches(&[&batch]).unwrap();
1155 } else {
1156 let mut writer = WriterBuilder::new()
1157 .with_explicit_nulls(true)
1158 .build::<_, LineDelimited>(&mut buf);
1159 writer.write_batches(&[&batch]).unwrap();
1160 }
1161 }
1162
1163 let result = str::from_utf8(&buf).unwrap();
1164 let expected = read_to_string(test_file).unwrap();
1165 for (r, e) in result.lines().zip(expected.lines()) {
1166 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1167 if remove_nulls {
1168 if let Value::Object(obj) = expected_json {
1170 expected_json =
1171 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1172 }
1173 }
1174 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1175 }
1176 }
1177
1178 #[test]
1179 fn write_basic_rows() {
1180 test_write_for_file("test/data/basic.json", true);
1181 }
1182
1183 #[test]
1184 fn write_arrays() {
1185 test_write_for_file("test/data/arrays.json", true);
1186 }
1187
1188 #[test]
1189 fn write_basic_nulls() {
1190 test_write_for_file("test/data/basic_nulls.json", true);
1191 }
1192
1193 #[test]
1194 fn write_nested_with_nulls() {
1195 test_write_for_file("test/data/nested_with_nulls.json", false);
1196 }
1197
1198 #[test]
1199 fn json_line_writer_empty() {
1200 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1201 writer.finish().unwrap();
1202 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1203 }
1204
1205 #[test]
1206 fn json_array_writer_empty() {
1207 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1208 writer.finish().unwrap();
1209 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1210 }
1211
1212 #[test]
1213 fn json_line_writer_empty_batch() {
1214 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1215
1216 let array = Int32Array::from(Vec::<i32>::new());
1217 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1218 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1219
1220 writer.write(&batch).unwrap();
1221 writer.finish().unwrap();
1222 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1223 }
1224
1225 #[test]
1226 fn json_array_writer_empty_batch() {
1227 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1228
1229 let array = Int32Array::from(Vec::<i32>::new());
1230 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1231 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1232
1233 writer.write(&batch).unwrap();
1234 writer.finish().unwrap();
1235 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1236 }
1237
1238 #[test]
1239 fn json_struct_array_nulls() {
1240 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1241 Some(vec![Some(1), Some(2)]),
1242 Some(vec![None]),
1243 Some(vec![]),
1244 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1246 None, None,
1248 ]);
1249
1250 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1251 let array = Arc::new(inner) as ArrayRef;
1252 let struct_array_a = StructArray::from((
1253 vec![(field.clone(), array.clone())],
1254 Buffer::from([0b01010111]),
1255 ));
1256 let struct_array_b = StructArray::from(vec![(field, array)]);
1257
1258 let schema = Schema::new(vec![
1259 Field::new_struct("a", struct_array_a.fields().clone(), true),
1260 Field::new_struct("b", struct_array_b.fields().clone(), true),
1261 ]);
1262
1263 let batch = RecordBatch::try_new(
1264 Arc::new(schema),
1265 vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1266 )
1267 .unwrap();
1268
1269 let mut buf = Vec::new();
1270 {
1271 let mut writer = LineDelimitedWriter::new(&mut buf);
1272 writer.write_batches(&[&batch]).unwrap();
1273 }
1274
1275 assert_json_eq(
1276 &buf,
1277 r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1278{"a":{"list":[null]},"b":{"list":[null]}}
1279{"a":{"list":[]},"b":{"list":[]}}
1280{"b":{"list":[3,null]}}
1281{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1282{"b":{}}
1283{"a":{},"b":{}}
1284"#,
1285 );
1286 }
1287
1288 #[test]
1289 fn json_writer_map() {
1290 let keys_array = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1291 let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1292
1293 let keys = Arc::new(Field::new("keys", DataType::Utf8, false));
1294 let values = Arc::new(Field::new("values", DataType::Int64, false));
1295 let entry_struct = StructArray::from(vec![
1296 (keys, Arc::new(keys_array) as ArrayRef),
1297 (values, Arc::new(values_array) as ArrayRef),
1298 ]);
1299
1300 let map_data_type = DataType::Map(
1301 Arc::new(Field::new(
1302 "entries",
1303 entry_struct.data_type().clone(),
1304 false,
1305 )),
1306 false,
1307 );
1308
1309 let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1311 let valid_buffer = Buffer::from([0b00111101]);
1312
1313 let map_data = ArrayData::builder(map_data_type.clone())
1314 .len(6)
1315 .null_bit_buffer(Some(valid_buffer))
1316 .add_buffer(entry_offsets)
1317 .add_child_data(entry_struct.into_data())
1318 .build()
1319 .unwrap();
1320
1321 let map = MapArray::from(map_data);
1322
1323 let map_field = Field::new("map", map_data_type, true);
1324 let schema = Arc::new(Schema::new(vec![map_field]));
1325
1326 let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1327
1328 let mut buf = Vec::new();
1329 {
1330 let mut writer = LineDelimitedWriter::new(&mut buf);
1331 writer.write_batches(&[&batch]).unwrap();
1332 }
1333
1334 assert_json_eq(
1335 &buf,
1336 r#"{"map":{"foo":10}}
1337{}
1338{"map":{}}
1339{"map":{"bar":20,"baz":30,"qux":40}}
1340{"map":{"quux":50}}
1341{"map":{}}
1342"#,
1343 );
1344 }
1345
1346 #[test]
1347 fn test_write_single_batch() {
1348 let test_file = "test/data/basic.json";
1349 let file = File::open(test_file).unwrap();
1350 let mut reader = BufReader::new(file);
1351 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1352 reader.rewind().unwrap();
1353
1354 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1355 let mut reader = builder.build(reader).unwrap();
1356 let batch = reader.next().unwrap().unwrap();
1357
1358 let mut buf = Vec::new();
1359 {
1360 let mut writer = LineDelimitedWriter::new(&mut buf);
1361 writer.write(&batch).unwrap();
1362 }
1363
1364 let result = str::from_utf8(&buf).unwrap();
1365 let expected = read_to_string(test_file).unwrap();
1366 for (r, e) in result.lines().zip(expected.lines()) {
1367 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1368 if let Value::Object(obj) = expected_json {
1370 expected_json =
1371 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1372 }
1373 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1374 }
1375 }
1376
1377 #[test]
1378 fn test_write_multi_batches() {
1379 let test_file = "test/data/basic.json";
1380
1381 let schema = SchemaRef::new(Schema::new(vec![
1382 Field::new("a", DataType::Int64, true),
1383 Field::new("b", DataType::Float64, true),
1384 Field::new("c", DataType::Boolean, true),
1385 Field::new("d", DataType::Utf8, true),
1386 Field::new("e", DataType::Utf8, true),
1387 Field::new("f", DataType::Utf8, true),
1388 Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1389 Field::new("h", DataType::Float16, true),
1390 ]));
1391
1392 let mut reader = ReaderBuilder::new(schema.clone())
1393 .build(BufReader::new(File::open(test_file).unwrap()))
1394 .unwrap();
1395 let batch = reader.next().unwrap().unwrap();
1396
1397 let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1399
1400 let mut buf = Vec::new();
1401 {
1402 let mut writer = LineDelimitedWriter::new(&mut buf);
1403 writer.write_batches(&batches).unwrap();
1404 }
1405
1406 let result = str::from_utf8(&buf).unwrap();
1407 let expected = read_to_string(test_file).unwrap();
1408 let expected = format!("{expected}\n{expected}");
1410 for (r, e) in result.lines().zip(expected.lines()) {
1411 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1412 if let Value::Object(obj) = expected_json {
1414 expected_json =
1415 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1416 }
1417 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1418 }
1419 }
1420
1421 #[test]
1422 fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1423 fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1424 let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1425 Some(vec![None, None, None]),
1426 Some(vec![Some(1), Some(2), Some(3)]),
1427 None,
1428 Some(vec![None, None, None]),
1429 ]));
1430 let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1431 (array, field)
1433 }
1434
1435 fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1436 let array = Arc::new(DictionaryArray::from_iter(vec![
1437 Some("cupcakes"),
1438 None,
1439 Some("bear"),
1440 Some("kuma"),
1441 ]));
1442 let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1443 (array, field)
1445 }
1446
1447 fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1448 let string_builder = StringBuilder::new();
1449 let int_builder = Int64Builder::new();
1450 let mut builder = MapBuilder::new(None, string_builder, int_builder);
1451
1452 builder.keys().append_value("foo");
1454 builder.values().append_value(10);
1455 builder.append(true).unwrap();
1456
1457 builder.append(false).unwrap();
1458
1459 builder.append(true).unwrap();
1460
1461 builder.keys().append_value("bar");
1462 builder.values().append_value(20);
1463 builder.keys().append_value("baz");
1464 builder.values().append_value(30);
1465 builder.keys().append_value("qux");
1466 builder.values().append_value(40);
1467 builder.append(true).unwrap();
1468
1469 let array = Arc::new(builder.finish());
1470 let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1471 (array, field)
1472 }
1473
1474 fn root_list() -> (Arc<ListArray>, Field) {
1475 let struct_array = StructArray::from(vec![
1476 (
1477 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1478 Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1479 ),
1480 (
1481 Arc::new(Field::new("int32", DataType::Int32, true)),
1482 Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1483 ),
1484 ]);
1485
1486 let field = Field::new_list(
1487 "list",
1488 Field::new("struct", struct_array.data_type().clone(), true),
1489 true,
1490 );
1491
1492 let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1494 let data = ArrayData::builder(field.data_type().clone())
1495 .len(4)
1496 .add_buffer(entry_offsets)
1497 .add_child_data(struct_array.into_data())
1498 .null_bit_buffer(Some([0b00000101].into()))
1499 .build()
1500 .unwrap();
1501 let array = Arc::new(ListArray::from(data));
1502 (array, field)
1503 }
1504
1505 let (nested_list_array, nested_list_field) = nested_list();
1506 let (nested_dict_array, nested_dict_field) = nested_dict();
1507 let (nested_map_array, nested_map_field) = nested_map();
1508 let (root_list_array, root_list_field) = root_list();
1509
1510 let schema = Schema::new(vec![
1511 Field::new("date", DataType::Date32, true),
1512 Field::new("null", DataType::Null, true),
1513 Field::new_struct(
1514 "struct",
1515 vec![
1516 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1517 nested_list_field.clone(),
1518 nested_dict_field.clone(),
1519 nested_map_field.clone(),
1520 ],
1521 true,
1522 ),
1523 root_list_field,
1524 ]);
1525
1526 let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1527 let arr_null = NullArray::new(4);
1528 let arr_struct = StructArray::from(vec![
1529 (
1531 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1532 Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1533 ),
1534 (nested_list_field, nested_list_array as ArrayRef),
1536 (nested_dict_field, nested_dict_array as ArrayRef),
1538 (nested_map_field, nested_map_array as ArrayRef),
1540 ]);
1541
1542 let batch = RecordBatch::try_new(
1543 Arc::new(schema),
1544 vec![
1545 Arc::new(arr_date32),
1547 Arc::new(arr_null),
1549 Arc::new(arr_struct),
1550 root_list_array,
1552 ],
1553 )?;
1554
1555 let mut buf = Vec::new();
1556 {
1557 let mut writer = WriterBuilder::new()
1558 .with_explicit_nulls(true)
1559 .build::<_, JsonArray>(&mut buf);
1560 writer.write_batches(&[&batch])?;
1561 writer.finish()?;
1562 }
1563
1564 let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1565 let expected = serde_json::from_value::<Vec<Value>>(json!([
1566 {
1567 "date": "1970-01-01",
1568 "list": [
1569 {
1570 "int32": 1,
1571 "utf8": "a"
1572 },
1573 {
1574 "int32": null,
1575 "utf8": "b"
1576 }
1577 ],
1578 "null": null,
1579 "struct": {
1580 "dict": "cupcakes",
1581 "list": [
1582 null,
1583 null,
1584 null
1585 ],
1586 "map": {
1587 "foo": 10
1588 },
1589 "utf8": "a"
1590 }
1591 },
1592 {
1593 "date": null,
1594 "list": null,
1595 "null": null,
1596 "struct": {
1597 "dict": null,
1598 "list": [
1599 1,
1600 2,
1601 3
1602 ],
1603 "map": null,
1604 "utf8": null
1605 }
1606 },
1607 {
1608 "date": "1970-01-02",
1609 "list": [
1610 {
1611 "int32": 5,
1612 "utf8": null
1613 }
1614 ],
1615 "null": null,
1616 "struct": {
1617 "dict": "bear",
1618 "list": null,
1619 "map": {},
1620 "utf8": null
1621 }
1622 },
1623 {
1624 "date": null,
1625 "list": null,
1626 "null": null,
1627 "struct": {
1628 "dict": "kuma",
1629 "list": [
1630 null,
1631 null,
1632 null
1633 ],
1634 "map": {
1635 "bar": 20,
1636 "baz": 30,
1637 "qux": 40
1638 },
1639 "utf8": "b"
1640 }
1641 }
1642 ]))
1643 .unwrap();
1644
1645 assert_eq!(actual, expected);
1646
1647 Ok(())
1648 }
1649
1650 fn binary_encoding_test<O: OffsetSizeTrait>() {
1651 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1653 "bytes",
1654 GenericBinaryType::<O>::DATA_TYPE,
1655 true,
1656 )]));
1657
1658 let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1660 let values = [Some(b"Ned Flanders"), None, Some(b"Troy McClure")];
1661 for value in values {
1662 match value {
1663 Some(v) => builder.append_value(v),
1664 None => builder.append_null(),
1665 }
1666 }
1667 let array = Arc::new(builder.finish()) as ArrayRef;
1668 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1669
1670 {
1672 let mut buf = Vec::new();
1673 let json_value: Value = {
1674 let mut writer = WriterBuilder::new()
1675 .with_explicit_nulls(true)
1676 .build::<_, JsonArray>(&mut buf);
1677 writer.write(&batch).unwrap();
1678 writer.close().unwrap();
1679 serde_json::from_slice(&buf).unwrap()
1680 };
1681
1682 assert_eq!(
1683 json!([
1684 {
1685 "bytes": "4e656420466c616e64657273"
1686 },
1687 {
1688 "bytes": null },
1690 {
1691 "bytes": "54726f79204d63436c757265"
1692 }
1693 ]),
1694 json_value,
1695 );
1696 }
1697
1698 {
1700 let mut buf = Vec::new();
1701 let json_value: Value = {
1702 let mut writer = ArrayWriter::new(&mut buf);
1705 writer.write(&batch).unwrap();
1706 writer.close().unwrap();
1707 serde_json::from_slice(&buf).unwrap()
1708 };
1709
1710 assert_eq!(
1711 json!([
1712 {
1713 "bytes": "4e656420466c616e64657273"
1714 },
1715 {}, {
1717 "bytes": "54726f79204d63436c757265"
1718 }
1719 ]),
1720 json_value
1721 );
1722 }
1723 }
1724
1725 #[test]
1726 fn test_writer_binary() {
1727 binary_encoding_test::<i32>();
1729 binary_encoding_test::<i64>();
1731 }
1732
1733 #[test]
1734 fn test_writer_fixed_size_binary() {
1735 let size = 11;
1737 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1738 "bytes",
1739 DataType::FixedSizeBinary(size),
1740 true,
1741 )]));
1742
1743 let mut builder = FixedSizeBinaryBuilder::new(size);
1745 let values = [Some(b"hello world"), None, Some(b"summer rain")];
1746 for value in values {
1747 match value {
1748 Some(v) => builder.append_value(v).unwrap(),
1749 None => builder.append_null(),
1750 }
1751 }
1752 let array = Arc::new(builder.finish()) as ArrayRef;
1753 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1754
1755 {
1757 let mut buf = Vec::new();
1758 let json_value: Value = {
1759 let mut writer = WriterBuilder::new()
1760 .with_explicit_nulls(true)
1761 .build::<_, JsonArray>(&mut buf);
1762 writer.write(&batch).unwrap();
1763 writer.close().unwrap();
1764 serde_json::from_slice(&buf).unwrap()
1765 };
1766
1767 assert_eq!(
1768 json!([
1769 {
1770 "bytes": "68656c6c6f20776f726c64"
1771 },
1772 {
1773 "bytes": null },
1775 {
1776 "bytes": "73756d6d6572207261696e"
1777 }
1778 ]),
1779 json_value,
1780 );
1781 }
1782 {
1784 let mut buf = Vec::new();
1785 let json_value: Value = {
1786 let mut writer = ArrayWriter::new(&mut buf);
1789 writer.write(&batch).unwrap();
1790 writer.close().unwrap();
1791 serde_json::from_slice(&buf).unwrap()
1792 };
1793
1794 assert_eq!(
1795 json!([
1796 {
1797 "bytes": "68656c6c6f20776f726c64"
1798 },
1799 {}, {
1801 "bytes": "73756d6d6572207261696e"
1802 }
1803 ]),
1804 json_value,
1805 );
1806 }
1807 }
1808
1809 #[test]
1810 fn test_writer_fixed_size_list() {
1811 let size = 3;
1812 let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1813 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1814 "list",
1815 DataType::FixedSizeList(field, size),
1816 true,
1817 )]));
1818
1819 let values_builder = Int32Builder::new();
1820 let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1821 let lists = [
1822 Some([Some(1), Some(2), None]),
1823 Some([Some(3), None, Some(4)]),
1824 Some([None, Some(5), Some(6)]),
1825 None,
1826 ];
1827 for list in lists {
1828 match list {
1829 Some(l) => {
1830 for value in l {
1831 match value {
1832 Some(v) => list_builder.values().append_value(v),
1833 None => list_builder.values().append_null(),
1834 }
1835 }
1836 list_builder.append(true);
1837 }
1838 None => {
1839 for _ in 0..size {
1840 list_builder.values().append_null();
1841 }
1842 list_builder.append(false);
1843 }
1844 }
1845 }
1846 let array = Arc::new(list_builder.finish()) as ArrayRef;
1847 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1848
1849 {
1851 let json_value: Value = {
1852 let mut buf = Vec::new();
1853 let mut writer = WriterBuilder::new()
1854 .with_explicit_nulls(true)
1855 .build::<_, JsonArray>(&mut buf);
1856 writer.write(&batch).unwrap();
1857 writer.close().unwrap();
1858 serde_json::from_slice(&buf).unwrap()
1859 };
1860 assert_eq!(
1861 json!([
1862 {"list": [1, 2, null]},
1863 {"list": [3, null, 4]},
1864 {"list": [null, 5, 6]},
1865 {"list": null},
1866 ]),
1867 json_value
1868 );
1869 }
1870 {
1872 let json_value: Value = {
1873 let mut buf = Vec::new();
1874 let mut writer = ArrayWriter::new(&mut buf);
1875 writer.write(&batch).unwrap();
1876 writer.close().unwrap();
1877 serde_json::from_slice(&buf).unwrap()
1878 };
1879 assert_eq!(
1880 json!([
1881 {"list": [1, 2, null]},
1882 {"list": [3, null, 4]},
1883 {"list": [null, 5, 6]},
1884 {}, ]),
1886 json_value
1887 );
1888 }
1889 }
1890
1891 #[test]
1892 fn test_writer_null_dict() {
1893 let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
1894 let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
1895 let dict = DictionaryArray::new(keys, values);
1896
1897 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1898 "my_dict",
1899 DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
1900 true,
1901 )]));
1902
1903 let array = Arc::new(dict) as ArrayRef;
1904 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1905
1906 let mut json = Vec::new();
1907 let write_builder = WriterBuilder::new().with_explicit_nulls(true);
1908 let mut writer = write_builder.build::<_, JsonArray>(&mut json);
1909 writer.write(&batch).unwrap();
1910 writer.close().unwrap();
1911
1912 let json_str = str::from_utf8(&json).unwrap();
1913 assert_eq!(
1914 json_str,
1915 r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
1916 )
1917 }
1918
1919 #[test]
1920 fn test_decimal128_encoder() {
1921 let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
1922 .with_precision_and_scale(10, 2)
1923 .unwrap();
1924 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1925 let schema = Schema::new(vec![field]);
1926 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1927
1928 let mut buf = Vec::new();
1929 {
1930 let mut writer = LineDelimitedWriter::new(&mut buf);
1931 writer.write_batches(&[&batch]).unwrap();
1932 }
1933
1934 assert_json_eq(
1935 &buf,
1936 r#"{"decimal":12.34}
1937{"decimal":56.78}
1938{"decimal":90.12}
1939"#,
1940 );
1941 }
1942
1943 #[test]
1944 fn test_decimal256_encoder() {
1945 let array = Decimal256Array::from_iter_values([
1946 i256::from(123400),
1947 i256::from(567800),
1948 i256::from(901200),
1949 ])
1950 .with_precision_and_scale(10, 4)
1951 .unwrap();
1952 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1953 let schema = Schema::new(vec![field]);
1954 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1955
1956 let mut buf = Vec::new();
1957 {
1958 let mut writer = LineDelimitedWriter::new(&mut buf);
1959 writer.write_batches(&[&batch]).unwrap();
1960 }
1961
1962 assert_json_eq(
1963 &buf,
1964 r#"{"decimal":12.3400}
1965{"decimal":56.7800}
1966{"decimal":90.1200}
1967"#,
1968 );
1969 }
1970
1971 #[test]
1972 fn test_decimal_encoder_with_nulls() {
1973 let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
1974 .with_precision_and_scale(10, 2)
1975 .unwrap();
1976 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1977 let schema = Schema::new(vec![field]);
1978 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1979
1980 let mut buf = Vec::new();
1981 {
1982 let mut writer = LineDelimitedWriter::new(&mut buf);
1983 writer.write_batches(&[&batch]).unwrap();
1984 }
1985
1986 assert_json_eq(
1987 &buf,
1988 r#"{"decimal":12.34}
1989{}
1990{"decimal":56.78}
1991"#,
1992 );
1993 }
1994
1995 #[test]
1996 fn write_structs_as_list() {
1997 let schema = Schema::new(vec![
1998 Field::new(
1999 "c1",
2000 DataType::Struct(Fields::from(vec![
2001 Field::new("c11", DataType::Int32, true),
2002 Field::new(
2003 "c12",
2004 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2005 false,
2006 ),
2007 ])),
2008 false,
2009 ),
2010 Field::new("c2", DataType::Utf8, false),
2011 ]);
2012
2013 let c1 = StructArray::from(vec![
2014 (
2015 Arc::new(Field::new("c11", DataType::Int32, true)),
2016 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2017 ),
2018 (
2019 Arc::new(Field::new(
2020 "c12",
2021 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2022 false,
2023 )),
2024 Arc::new(StructArray::from(vec![(
2025 Arc::new(Field::new("c121", DataType::Utf8, false)),
2026 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2027 )])) as ArrayRef,
2028 ),
2029 ]);
2030 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2031
2032 let batch =
2033 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2034
2035 let expected = r#"[[1,["e"]],"a"]
2036[[null,["f"]],"b"]
2037[[5,["g"]],"c"]
2038"#;
2039
2040 let mut buf = Vec::new();
2041 {
2042 let builder = WriterBuilder::new()
2043 .with_explicit_nulls(true)
2044 .with_struct_mode(StructMode::ListOnly);
2045 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2046 writer.write_batches(&[&batch]).unwrap();
2047 }
2048 assert_json_eq(&buf, expected);
2049
2050 let mut buf = Vec::new();
2051 {
2052 let builder = WriterBuilder::new()
2053 .with_explicit_nulls(false)
2054 .with_struct_mode(StructMode::ListOnly);
2055 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2056 writer.write_batches(&[&batch]).unwrap();
2057 }
2058 assert_json_eq(&buf, expected);
2059 }
2060
2061 fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2062 #[derive(Debug)]
2065 enum UnionValue {
2066 Int32(i32),
2067 String(String),
2068 }
2069
2070 #[derive(Debug)]
2071 struct UnionEncoder {
2072 array: Vec<Option<UnionValue>>,
2073 }
2074
2075 impl Encoder for UnionEncoder {
2076 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2077 match &self.array[idx] {
2078 None => out.extend_from_slice(b"null"),
2079 Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2080 Some(UnionValue::String(v)) => {
2081 out.extend_from_slice(format!("\"{}\"", v).as_bytes())
2082 }
2083 }
2084 }
2085 }
2086
2087 #[derive(Debug)]
2088 struct UnionEncoderFactory;
2089
2090 impl EncoderFactory for UnionEncoderFactory {
2091 fn make_default_encoder<'a>(
2092 &self,
2093 _field: &'a FieldRef,
2094 array: &'a dyn Array,
2095 _options: &'a EncoderOptions,
2096 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2097 let data_type = array.data_type();
2098 let fields = match data_type {
2099 DataType::Union(fields, UnionMode::Sparse) => fields,
2100 _ => return Ok(None),
2101 };
2102 let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2104 for f in fields.iter() {
2105 match f.data_type() {
2106 DataType::Null => {}
2107 DataType::Int32 => {}
2108 DataType::Utf8 => {}
2109 _ => return Ok(None),
2110 }
2111 }
2112 let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2113 let mut values = Vec::with_capacity(type_ids.len());
2114 for idx in 0..type_ids.len() {
2115 let type_id = type_ids[idx];
2116 let field = &fields[type_id as usize];
2117 let value = match field.data_type() {
2118 DataType::Null => None,
2119 DataType::Int32 => Some(UnionValue::Int32(
2120 buffers[type_id as usize]
2121 .as_primitive::<Int32Type>()
2122 .value(idx),
2123 )),
2124 DataType::Utf8 => Some(UnionValue::String(
2125 buffers[type_id as usize]
2126 .as_string::<i32>()
2127 .value(idx)
2128 .to_string(),
2129 )),
2130 _ => unreachable!(),
2131 };
2132 values.push(value);
2133 }
2134 let array_encoder =
2135 Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2136 let nulls = array.nulls().cloned();
2137 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2138 }
2139 }
2140
2141 let int_array = Int32Array::from(vec![Some(1), None, None]);
2142 let string_array = StringArray::from(vec![None, Some("a"), None]);
2143 let null_array = NullArray::new(3);
2144 let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2145
2146 let union_fields = [
2147 (0, Arc::new(Field::new("A", DataType::Int32, false))),
2148 (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2149 (2, Arc::new(Field::new("C", DataType::Null, false))),
2150 ]
2151 .into_iter()
2152 .collect::<UnionFields>();
2153
2154 let children = vec![
2155 Arc::new(int_array) as Arc<dyn Array>,
2156 Arc::new(string_array),
2157 Arc::new(null_array),
2158 ];
2159
2160 let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2161
2162 let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2163
2164 let fields = vec![
2165 Field::new(
2166 "union",
2167 DataType::Union(union_fields, UnionMode::Sparse),
2168 true,
2169 ),
2170 Field::new("float", DataType::Float64, true),
2171 ];
2172
2173 let batch = RecordBatch::try_new(
2174 Arc::new(Schema::new(fields)),
2175 vec![
2176 Arc::new(array) as Arc<dyn Array>,
2177 Arc::new(float_array) as Arc<dyn Array>,
2178 ],
2179 )
2180 .unwrap();
2181
2182 (batch, Arc::new(UnionEncoderFactory))
2183 }
2184
2185 #[test]
2186 fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2187 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2188
2189 let mut buf = Vec::new();
2190 {
2191 let mut writer = WriterBuilder::new()
2192 .with_encoder_factory(encoder_factory)
2193 .with_explicit_nulls(false)
2194 .build::<_, LineDelimited>(&mut buf);
2195 writer.write_batches(&[&batch]).unwrap();
2196 writer.finish().unwrap();
2197 }
2198
2199 println!("{}", str::from_utf8(&buf).unwrap());
2200
2201 assert_json_eq(
2202 &buf,
2203 r#"{"union":1,"float":1.0}
2204{"union":"a"}
2205{"union":null,"float":3.4}
2206"#,
2207 );
2208 }
2209
2210 #[test]
2211 fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2212 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2213
2214 let mut buf = Vec::new();
2215 {
2216 let mut writer = WriterBuilder::new()
2217 .with_encoder_factory(encoder_factory)
2218 .with_explicit_nulls(true)
2219 .build::<_, LineDelimited>(&mut buf);
2220 writer.write_batches(&[&batch]).unwrap();
2221 writer.finish().unwrap();
2222 }
2223
2224 assert_json_eq(
2225 &buf,
2226 r#"{"union":1,"float":1.0}
2227{"union":"a","float":null}
2228{"union":null,"float":3.4}
2229"#,
2230 );
2231 }
2232
2233 #[test]
2234 fn test_fallback_encoder_factory_array_implicit_nulls() {
2235 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2236
2237 let json_value: Value = {
2238 let mut buf = Vec::new();
2239 let mut writer = WriterBuilder::new()
2240 .with_encoder_factory(encoder_factory)
2241 .build::<_, JsonArray>(&mut buf);
2242 writer.write_batches(&[&batch]).unwrap();
2243 writer.finish().unwrap();
2244 serde_json::from_slice(&buf).unwrap()
2245 };
2246
2247 let expected = json!([
2248 {"union":1,"float":1.0},
2249 {"union":"a"},
2250 {"float":3.4,"union":null},
2251 ]);
2252
2253 assert_eq!(json_value, expected);
2254 }
2255
2256 #[test]
2257 fn test_fallback_encoder_factory_array_explicit_nulls() {
2258 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2259
2260 let json_value: Value = {
2261 let mut buf = Vec::new();
2262 let mut writer = WriterBuilder::new()
2263 .with_encoder_factory(encoder_factory)
2264 .with_explicit_nulls(true)
2265 .build::<_, JsonArray>(&mut buf);
2266 writer.write_batches(&[&batch]).unwrap();
2267 writer.finish().unwrap();
2268 serde_json::from_slice(&buf).unwrap()
2269 };
2270
2271 let expected = json!([
2272 {"union":1,"float":1.0},
2273 {"union":"a", "float": null},
2274 {"union":null,"float":3.4},
2275 ]);
2276
2277 assert_eq!(json_value, expected);
2278 }
2279
2280 #[test]
2281 fn test_default_encoder_byte_array() {
2282 struct IntArrayBinaryEncoder<B> {
2283 array: B,
2284 }
2285
2286 impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2287 where
2288 B: ArrayAccessor<Item = &'a [u8]>,
2289 {
2290 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2291 out.push(b'[');
2292 let child = self.array.value(idx);
2293 for (idx, byte) in child.iter().enumerate() {
2294 write!(out, "{byte}").unwrap();
2295 if idx < child.len() - 1 {
2296 out.push(b',');
2297 }
2298 }
2299 out.push(b']');
2300 }
2301 }
2302
2303 #[derive(Debug)]
2304 struct IntArayBinaryEncoderFactory;
2305
2306 impl EncoderFactory for IntArayBinaryEncoderFactory {
2307 fn make_default_encoder<'a>(
2308 &self,
2309 _field: &'a FieldRef,
2310 array: &'a dyn Array,
2311 _options: &'a EncoderOptions,
2312 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2313 match array.data_type() {
2314 DataType::Binary => {
2315 let array = array.as_binary::<i32>();
2316 let encoder = IntArrayBinaryEncoder { array };
2317 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2318 let nulls = array.nulls().cloned();
2319 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2320 }
2321 _ => Ok(None),
2322 }
2323 }
2324 }
2325
2326 let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2327 let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2328 let fields = vec![
2329 Field::new("bytes", DataType::Binary, true),
2330 Field::new("float", DataType::Float64, true),
2331 ];
2332 let batch = RecordBatch::try_new(
2333 Arc::new(Schema::new(fields)),
2334 vec![
2335 Arc::new(binary_array) as Arc<dyn Array>,
2336 Arc::new(float_array) as Arc<dyn Array>,
2337 ],
2338 )
2339 .unwrap();
2340
2341 let json_value: Value = {
2342 let mut buf = Vec::new();
2343 let mut writer = WriterBuilder::new()
2344 .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2345 .build::<_, JsonArray>(&mut buf);
2346 writer.write_batches(&[&batch]).unwrap();
2347 writer.finish().unwrap();
2348 serde_json::from_slice(&buf).unwrap()
2349 };
2350
2351 let expected = json!([
2352 {"bytes": [97], "float": 1.0},
2353 {"float": 2.3},
2354 {"bytes": [98]},
2355 ]);
2356
2357 assert_eq!(json_value, expected);
2358 }
2359
2360 #[test]
2361 fn test_encoder_factory_customize_dictionary() {
2362 struct PaddedInt32Encoder {
2367 array: Int32Array,
2368 }
2369
2370 impl Encoder for PaddedInt32Encoder {
2371 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2372 let value = self.array.value(idx);
2373 write!(out, "\"{value:0>8}\"").unwrap();
2374 }
2375 }
2376
2377 #[derive(Debug)]
2378 struct CustomEncoderFactory;
2379
2380 impl EncoderFactory for CustomEncoderFactory {
2381 fn make_default_encoder<'a>(
2382 &self,
2383 field: &'a FieldRef,
2384 array: &'a dyn Array,
2385 _options: &'a EncoderOptions,
2386 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2387 let padded = field
2392 .metadata()
2393 .get("padded")
2394 .map(|v| v == "true")
2395 .unwrap_or_default();
2396 match (array.data_type(), padded) {
2397 (DataType::Int32, true) => {
2398 let array = array.as_primitive::<Int32Type>();
2399 let nulls = array.nulls().cloned();
2400 let encoder = PaddedInt32Encoder {
2401 array: array.clone(),
2402 };
2403 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2404 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2405 }
2406 _ => Ok(None),
2407 }
2408 }
2409 }
2410
2411 let to_json = |batch| {
2412 let mut buf = Vec::new();
2413 let mut writer = WriterBuilder::new()
2414 .with_encoder_factory(Arc::new(CustomEncoderFactory))
2415 .build::<_, JsonArray>(&mut buf);
2416 writer.write_batches(&[batch]).unwrap();
2417 writer.finish().unwrap();
2418 serde_json::from_slice::<Value>(&buf).unwrap()
2419 };
2420
2421 let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2423 let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2424 HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2425 ));
2426 let batch = RecordBatch::try_new(
2427 Arc::new(Schema::new(vec![field.clone()])),
2428 vec![Arc::new(array)],
2429 )
2430 .unwrap();
2431
2432 let json_value = to_json(&batch);
2433
2434 let expected = json!([
2435 {"int": "00000001"},
2436 {},
2437 {"int": "00000002"},
2438 ]);
2439
2440 assert_eq!(json_value, expected);
2441
2442 let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2444 array_builder.append_value(1);
2445 array_builder.append_null();
2446 array_builder.append_value(1);
2447 let array = array_builder.finish();
2448 let field = Field::new(
2449 "int",
2450 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2451 true,
2452 )
2453 .with_metadata(HashMap::from_iter(vec![(
2454 "padded".to_string(),
2455 "true".to_string(),
2456 )]));
2457 let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2458 .unwrap();
2459
2460 let json_value = to_json(&batch);
2461
2462 let expected = json!([
2463 {"int": "00000001"},
2464 {},
2465 {"int": "00000001"},
2466 ]);
2467
2468 assert_eq!(json_value, expected);
2469 }
2470}