1mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullableEncoder};
116
117pub trait JsonFormat: Debug + Default {
120 #[inline]
121 fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123 Ok(())
124 }
125
126 #[inline]
127 fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129 Ok(())
130 }
131
132 #[inline]
133 fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135 Ok(())
136 }
137
138 fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140 Ok(())
141 }
142}
143
144#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157 fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158 writer.write_all(b"\n")?;
159 Ok(())
160 }
161}
162
163#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174 fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175 writer.write_all(b"[")?;
176 Ok(())
177 }
178
179 fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180 if !is_first_row {
181 writer.write_all(b",")?;
182 }
183 Ok(())
184 }
185
186 fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187 writer.write_all(b"]")?;
188 Ok(())
189 }
190}
191
192pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn explicit_nulls(&self) -> bool {
228 self.0.explicit_nulls()
229 }
230
231 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254 self.0 = self.0.with_explicit_nulls(explicit_nulls);
255 self
256 }
257
258 pub fn struct_mode(&self) -> StructMode {
260 self.0.struct_mode()
261 }
262
263 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269 self.0 = self.0.with_struct_mode(struct_mode);
270 self
271 }
272
273 pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278 self.0 = self.0.with_encoder_factory(factory);
279 self
280 }
281
282 pub fn build<W, F>(self, writer: W) -> Writer<W, F>
284 where
285 W: Write,
286 F: JsonFormat,
287 {
288 Writer {
289 writer,
290 started: false,
291 finished: false,
292 format: F::default(),
293 options: self.0,
294 }
295 }
296}
297
298#[derive(Debug)]
309pub struct Writer<W, F>
310where
311 W: Write,
312 F: JsonFormat,
313{
314 writer: W,
316
317 started: bool,
319
320 finished: bool,
322
323 format: F,
325
326 options: EncoderOptions,
328}
329
330impl<W, F> Writer<W, F>
331where
332 W: Write,
333 F: JsonFormat,
334{
335 pub fn new(writer: W) -> Self {
337 Self {
338 writer,
339 started: false,
340 finished: false,
341 format: F::default(),
342 options: EncoderOptions::default(),
343 }
344 }
345
346 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
348 if batch.num_rows() == 0 {
349 return Ok(());
350 }
351
352 let mut buffer = Vec::with_capacity(16 * 1024);
355
356 let mut is_first_row = !self.started;
357 if !self.started {
358 self.format.start_stream(&mut buffer)?;
359 self.started = true;
360 }
361
362 let array = StructArray::from(batch.clone());
363 let field = Arc::new(Field::new_struct(
364 "",
365 batch.schema().fields().clone(),
366 false,
367 ));
368
369 let mut encoder = make_encoder(&field, &array, &self.options)?;
370
371 assert!(!encoder.has_nulls(), "root cannot be nullable");
373 for idx in 0..batch.num_rows() {
374 self.format.start_row(&mut buffer, is_first_row)?;
375 is_first_row = false;
376
377 encoder.encode(idx, &mut buffer);
378 if buffer.len() > 8 * 1024 {
379 self.writer.write_all(&buffer)?;
380 buffer.clear();
381 }
382 self.format.end_row(&mut buffer)?;
383 }
384
385 if !buffer.is_empty() {
386 self.writer.write_all(&buffer)?;
387 }
388
389 Ok(())
390 }
391
392 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
394 for b in batches {
395 self.write(b)?;
396 }
397 Ok(())
398 }
399
400 pub fn finish(&mut self) -> Result<(), ArrowError> {
404 if !self.started {
405 self.format.start_stream(&mut self.writer)?;
406 self.started = true;
407 }
408 if !self.finished {
409 self.format.end_stream(&mut self.writer)?;
410 self.finished = true;
411 }
412
413 Ok(())
414 }
415
416 pub fn get_ref(&self) -> &W {
418 &self.writer
419 }
420
421 pub fn get_mut(&mut self) -> &mut W {
426 &mut self.writer
427 }
428
429 pub fn into_inner(self) -> W {
431 self.writer
432 }
433}
434
435impl<W, F> RecordBatchWriter for Writer<W, F>
436where
437 W: Write,
438 F: JsonFormat,
439{
440 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
441 self.write(batch)
442 }
443
444 fn close(mut self) -> Result<(), ArrowError> {
445 self.finish()
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use core::str;
452 use std::collections::HashMap;
453 use std::fs::{read_to_string, File};
454 use std::io::{BufReader, Seek};
455 use std::sync::Arc;
456
457 use arrow_array::cast::AsArray;
458 use serde_json::{json, Value};
459
460 use super::LineDelimited;
461 use super::{Encoder, WriterBuilder};
462 use arrow_array::builder::*;
463 use arrow_array::types::*;
464 use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice};
465 use arrow_data::ArrayData;
466
467 use crate::reader::*;
468
469 use super::*;
470
471 fn assert_json_eq(input: &[u8], expected: &str) {
473 let expected: Vec<Option<Value>> = expected
474 .split('\n')
475 .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
476 .collect();
477
478 let actual: Vec<Option<Value>> = input
479 .split(|b| *b == b'\n')
480 .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
481 .collect();
482
483 assert_eq!(actual, expected);
484 }
485
486 #[test]
487 fn write_simple_rows() {
488 let schema = Schema::new(vec![
489 Field::new("c1", DataType::Int32, true),
490 Field::new("c2", DataType::Utf8, true),
491 ]);
492
493 let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
494 let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
495
496 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
497
498 let mut buf = Vec::new();
499 {
500 let mut writer = LineDelimitedWriter::new(&mut buf);
501 writer.write_batches(&[&batch]).unwrap();
502 }
503
504 assert_json_eq(
505 &buf,
506 r#"{"c1":1,"c2":"a"}
507{"c1":2,"c2":"b"}
508{"c1":3,"c2":"c"}
509{"c2":"d"}
510{"c1":5}
511"#,
512 );
513 }
514
515 #[test]
516 fn write_large_utf8_and_utf8_view() {
517 let schema = Schema::new(vec![
518 Field::new("c1", DataType::Utf8, true),
519 Field::new("c2", DataType::LargeUtf8, true),
520 Field::new("c3", DataType::Utf8View, true),
521 ]);
522
523 let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
524 let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
525 let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
526
527 let batch = RecordBatch::try_new(
528 Arc::new(schema),
529 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
530 )
531 .unwrap();
532
533 let mut buf = Vec::new();
534 {
535 let mut writer = LineDelimitedWriter::new(&mut buf);
536 writer.write_batches(&[&batch]).unwrap();
537 }
538
539 assert_json_eq(
540 &buf,
541 r#"{"c1":"a","c2":"a","c3":"a"}
542{"c2":"b","c3":"b"}
543{"c1":"c"}
544{"c1":"d","c2":"d","c3":"d"}
545{}
546"#,
547 );
548 }
549
550 #[test]
551 fn write_dictionary() {
552 let schema = Schema::new(vec![
553 Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
554 Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
555 ]);
556
557 let a: DictionaryArray<Int32Type> = vec![
558 Some("cupcakes"),
559 Some("foo"),
560 Some("foo"),
561 None,
562 Some("cupcakes"),
563 ]
564 .into_iter()
565 .collect();
566 let b: DictionaryArray<Int8Type> =
567 vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
568 .into_iter()
569 .collect();
570
571 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
572
573 let mut buf = Vec::new();
574 {
575 let mut writer = LineDelimitedWriter::new(&mut buf);
576 writer.write_batches(&[&batch]).unwrap();
577 }
578
579 assert_json_eq(
580 &buf,
581 r#"{"c1":"cupcakes","c2":"sdsd"}
582{"c1":"foo","c2":"sdsd"}
583{"c1":"foo"}
584{"c2":"sd"}
585{"c1":"cupcakes","c2":"sdsd"}
586"#,
587 );
588 }
589
590 #[test]
591 fn write_list_of_dictionary() {
592 let dict_field = Arc::new(Field::new_dictionary(
593 "item",
594 DataType::Int32,
595 DataType::Utf8,
596 true,
597 ));
598 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
599
600 let dict_array: DictionaryArray<Int32Type> =
601 vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
602 .into_iter()
603 .collect();
604 let list_array = LargeListArray::try_new(
605 dict_field,
606 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
607 Arc::new(dict_array),
608 Some(NullBuffer::from_iter([true, true, false, true])),
609 )
610 .unwrap();
611
612 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
613
614 let mut buf = Vec::new();
615 {
616 let mut writer = LineDelimitedWriter::new(&mut buf);
617 writer.write_batches(&[&batch]).unwrap();
618 }
619
620 assert_json_eq(
621 &buf,
622 r#"{"l":["a","b","c"]}
623{"l":["a",null]}
624{}
625{"l":["c"]}
626"#,
627 );
628 }
629
630 #[test]
631 fn write_list_of_dictionary_large_values() {
632 let dict_field = Arc::new(Field::new_dictionary(
633 "item",
634 DataType::Int32,
635 DataType::LargeUtf8,
636 true,
637 ));
638 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
639
640 let keys = PrimitiveArray::<Int32Type>::from(vec![
641 Some(0),
642 Some(1),
643 Some(2),
644 Some(0),
645 None,
646 Some(2),
647 ]);
648 let values = LargeStringArray::from(vec!["a", "b", "c"]);
649 let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
650
651 let list_array = LargeListArray::try_new(
652 dict_field,
653 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
654 Arc::new(dict_array),
655 Some(NullBuffer::from_iter([true, true, false, true])),
656 )
657 .unwrap();
658
659 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
660
661 let mut buf = Vec::new();
662 {
663 let mut writer = LineDelimitedWriter::new(&mut buf);
664 writer.write_batches(&[&batch]).unwrap();
665 }
666
667 assert_json_eq(
668 &buf,
669 r#"{"l":["a","b","c"]}
670{"l":["a",null]}
671{}
672{"l":["c"]}
673"#,
674 );
675 }
676
677 #[test]
678 fn write_timestamps() {
679 let ts_string = "2018-11-13T17:11:10.011375885995";
680 let ts_nanos = ts_string
681 .parse::<chrono::NaiveDateTime>()
682 .unwrap()
683 .and_utc()
684 .timestamp_nanos_opt()
685 .unwrap();
686 let ts_micros = ts_nanos / 1000;
687 let ts_millis = ts_micros / 1000;
688 let ts_secs = ts_millis / 1000;
689
690 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
691 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
692 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
693 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
694 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
695
696 let schema = Schema::new(vec![
697 Field::new("nanos", arr_nanos.data_type().clone(), true),
698 Field::new("micros", arr_micros.data_type().clone(), true),
699 Field::new("millis", arr_millis.data_type().clone(), true),
700 Field::new("secs", arr_secs.data_type().clone(), true),
701 Field::new("name", arr_names.data_type().clone(), true),
702 ]);
703 let schema = Arc::new(schema);
704
705 let batch = RecordBatch::try_new(
706 schema,
707 vec![
708 Arc::new(arr_nanos),
709 Arc::new(arr_micros),
710 Arc::new(arr_millis),
711 Arc::new(arr_secs),
712 Arc::new(arr_names),
713 ],
714 )
715 .unwrap();
716
717 let mut buf = Vec::new();
718 {
719 let mut writer = LineDelimitedWriter::new(&mut buf);
720 writer.write_batches(&[&batch]).unwrap();
721 }
722
723 assert_json_eq(
724 &buf,
725 r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
726{"name":"b"}
727"#,
728 );
729 }
730
731 #[test]
732 fn write_timestamps_with_tz() {
733 let ts_string = "2018-11-13T17:11:10.011375885995";
734 let ts_nanos = ts_string
735 .parse::<chrono::NaiveDateTime>()
736 .unwrap()
737 .and_utc()
738 .timestamp_nanos_opt()
739 .unwrap();
740 let ts_micros = ts_nanos / 1000;
741 let ts_millis = ts_micros / 1000;
742 let ts_secs = ts_millis / 1000;
743
744 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
745 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
746 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
747 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
748 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
749
750 let tz = "+00:00";
751
752 let arr_nanos = arr_nanos.with_timezone(tz);
753 let arr_micros = arr_micros.with_timezone(tz);
754 let arr_millis = arr_millis.with_timezone(tz);
755 let arr_secs = arr_secs.with_timezone(tz);
756
757 let schema = Schema::new(vec![
758 Field::new("nanos", arr_nanos.data_type().clone(), true),
759 Field::new("micros", arr_micros.data_type().clone(), true),
760 Field::new("millis", arr_millis.data_type().clone(), true),
761 Field::new("secs", arr_secs.data_type().clone(), true),
762 Field::new("name", arr_names.data_type().clone(), true),
763 ]);
764 let schema = Arc::new(schema);
765
766 let batch = RecordBatch::try_new(
767 schema,
768 vec![
769 Arc::new(arr_nanos),
770 Arc::new(arr_micros),
771 Arc::new(arr_millis),
772 Arc::new(arr_secs),
773 Arc::new(arr_names),
774 ],
775 )
776 .unwrap();
777
778 let mut buf = Vec::new();
779 {
780 let mut writer = LineDelimitedWriter::new(&mut buf);
781 writer.write_batches(&[&batch]).unwrap();
782 }
783
784 assert_json_eq(
785 &buf,
786 r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
787{"name":"b"}
788"#,
789 );
790 }
791
792 #[test]
793 fn write_dates() {
794 let ts_string = "2018-11-13T17:11:10.011375885995";
795 let ts_millis = ts_string
796 .parse::<chrono::NaiveDateTime>()
797 .unwrap()
798 .and_utc()
799 .timestamp_millis();
800
801 let arr_date32 = Date32Array::from(vec![
802 Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
803 None,
804 ]);
805 let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
806 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
807
808 let schema = Schema::new(vec![
809 Field::new("date32", arr_date32.data_type().clone(), true),
810 Field::new("date64", arr_date64.data_type().clone(), true),
811 Field::new("name", arr_names.data_type().clone(), false),
812 ]);
813 let schema = Arc::new(schema);
814
815 let batch = RecordBatch::try_new(
816 schema,
817 vec![
818 Arc::new(arr_date32),
819 Arc::new(arr_date64),
820 Arc::new(arr_names),
821 ],
822 )
823 .unwrap();
824
825 let mut buf = Vec::new();
826 {
827 let mut writer = LineDelimitedWriter::new(&mut buf);
828 writer.write_batches(&[&batch]).unwrap();
829 }
830
831 assert_json_eq(
832 &buf,
833 r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
834{"name":"b"}
835"#,
836 );
837 }
838
839 #[test]
840 fn write_times() {
841 let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
842 let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
843 let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
844 let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
845 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
846
847 let schema = Schema::new(vec![
848 Field::new("time32sec", arr_time32sec.data_type().clone(), true),
849 Field::new("time32msec", arr_time32msec.data_type().clone(), true),
850 Field::new("time64usec", arr_time64usec.data_type().clone(), true),
851 Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
852 Field::new("name", arr_names.data_type().clone(), true),
853 ]);
854 let schema = Arc::new(schema);
855
856 let batch = RecordBatch::try_new(
857 schema,
858 vec![
859 Arc::new(arr_time32sec),
860 Arc::new(arr_time32msec),
861 Arc::new(arr_time64usec),
862 Arc::new(arr_time64nsec),
863 Arc::new(arr_names),
864 ],
865 )
866 .unwrap();
867
868 let mut buf = Vec::new();
869 {
870 let mut writer = LineDelimitedWriter::new(&mut buf);
871 writer.write_batches(&[&batch]).unwrap();
872 }
873
874 assert_json_eq(
875 &buf,
876 r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
877{"name":"b"}
878"#,
879 );
880 }
881
882 #[test]
883 fn write_durations() {
884 let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
885 let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
886 let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
887 let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
888 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
889
890 let schema = Schema::new(vec![
891 Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
892 Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
893 Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
894 Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
895 Field::new("name", arr_names.data_type().clone(), true),
896 ]);
897 let schema = Arc::new(schema);
898
899 let batch = RecordBatch::try_new(
900 schema,
901 vec![
902 Arc::new(arr_durationsec),
903 Arc::new(arr_durationmsec),
904 Arc::new(arr_durationusec),
905 Arc::new(arr_durationnsec),
906 Arc::new(arr_names),
907 ],
908 )
909 .unwrap();
910
911 let mut buf = Vec::new();
912 {
913 let mut writer = LineDelimitedWriter::new(&mut buf);
914 writer.write_batches(&[&batch]).unwrap();
915 }
916
917 assert_json_eq(
918 &buf,
919 r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
920{"name":"b"}
921"#,
922 );
923 }
924
925 #[test]
926 fn write_nested_structs() {
927 let schema = Schema::new(vec![
928 Field::new(
929 "c1",
930 DataType::Struct(Fields::from(vec![
931 Field::new("c11", DataType::Int32, true),
932 Field::new(
933 "c12",
934 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
935 false,
936 ),
937 ])),
938 false,
939 ),
940 Field::new("c2", DataType::Utf8, false),
941 ]);
942
943 let c1 = StructArray::from(vec![
944 (
945 Arc::new(Field::new("c11", DataType::Int32, true)),
946 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
947 ),
948 (
949 Arc::new(Field::new(
950 "c12",
951 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
952 false,
953 )),
954 Arc::new(StructArray::from(vec![(
955 Arc::new(Field::new("c121", DataType::Utf8, false)),
956 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
957 )])) as ArrayRef,
958 ),
959 ]);
960 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
961
962 let batch =
963 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
964
965 let mut buf = Vec::new();
966 {
967 let mut writer = LineDelimitedWriter::new(&mut buf);
968 writer.write_batches(&[&batch]).unwrap();
969 }
970
971 assert_json_eq(
972 &buf,
973 r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
974{"c1":{"c12":{"c121":"f"}},"c2":"b"}
975{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
976"#,
977 );
978 }
979
980 #[test]
981 fn write_struct_with_list_field() {
982 let field_c1 = Field::new(
983 "c1",
984 DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
985 false,
986 );
987 let field_c2 = Field::new("c2", DataType::Int32, false);
988 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
989
990 let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
991 let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
993 let a_list_data = ArrayData::builder(field_c1.data_type().clone())
994 .len(5)
995 .add_buffer(a_value_offsets)
996 .add_child_data(a_values.into_data())
997 .null_bit_buffer(Some(Buffer::from([0b00011111])))
998 .build()
999 .unwrap();
1000 let a = ListArray::from(a_list_data);
1001
1002 let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1003
1004 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1005
1006 let mut buf = Vec::new();
1007 {
1008 let mut writer = LineDelimitedWriter::new(&mut buf);
1009 writer.write_batches(&[&batch]).unwrap();
1010 }
1011
1012 assert_json_eq(
1013 &buf,
1014 r#"{"c1":["a","a1"],"c2":1}
1015{"c1":["b"],"c2":2}
1016{"c1":["c"],"c2":3}
1017{"c1":["d"],"c2":4}
1018{"c1":["e"],"c2":5}
1019"#,
1020 );
1021 }
1022
1023 #[test]
1024 fn write_nested_list() {
1025 let list_inner_type = Field::new(
1026 "a",
1027 DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
1028 false,
1029 );
1030 let field_c1 = Field::new(
1031 "c1",
1032 DataType::List(Arc::new(list_inner_type.clone())),
1033 false,
1034 );
1035 let field_c2 = Field::new("c2", DataType::Utf8, true);
1036 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1037
1038 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1040
1041 let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1042 let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1044 .len(3)
1045 .add_buffer(a_value_offsets)
1046 .null_bit_buffer(Some(Buffer::from([0b00000111])))
1047 .add_child_data(a_values.into_data())
1048 .build()
1049 .unwrap();
1050
1051 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1052 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1053 .len(3)
1054 .add_buffer(c1_value_offsets)
1055 .add_child_data(a_list_data)
1056 .build()
1057 .unwrap();
1058
1059 let c1 = ListArray::from(c1_list_data);
1060 let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1061
1062 let batch =
1063 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1064
1065 let mut buf = Vec::new();
1066 {
1067 let mut writer = LineDelimitedWriter::new(&mut buf);
1068 writer.write_batches(&[&batch]).unwrap();
1069 }
1070
1071 assert_json_eq(
1072 &buf,
1073 r#"{"c1":[[1,2],[3]],"c2":"foo"}
1074{"c1":[],"c2":"bar"}
1075{"c1":[[4,5,6]]}
1076"#,
1077 );
1078 }
1079
1080 #[test]
1081 fn write_list_of_struct() {
1082 let field_c1 = Field::new(
1083 "c1",
1084 DataType::List(Arc::new(Field::new(
1085 "s",
1086 DataType::Struct(Fields::from(vec![
1087 Field::new("c11", DataType::Int32, true),
1088 Field::new(
1089 "c12",
1090 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1091 false,
1092 ),
1093 ])),
1094 false,
1095 ))),
1096 true,
1097 );
1098 let field_c2 = Field::new("c2", DataType::Int32, false);
1099 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1100
1101 let struct_values = StructArray::from(vec![
1102 (
1103 Arc::new(Field::new("c11", DataType::Int32, true)),
1104 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1105 ),
1106 (
1107 Arc::new(Field::new(
1108 "c12",
1109 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1110 false,
1111 )),
1112 Arc::new(StructArray::from(vec![(
1113 Arc::new(Field::new("c121", DataType::Utf8, false)),
1114 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1115 )])) as ArrayRef,
1116 ),
1117 ]);
1118
1119 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1124 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1125 .len(3)
1126 .add_buffer(c1_value_offsets)
1127 .add_child_data(struct_values.into_data())
1128 .null_bit_buffer(Some(Buffer::from([0b00000101])))
1129 .build()
1130 .unwrap();
1131 let c1 = ListArray::from(c1_list_data);
1132
1133 let c2 = Int32Array::from(vec![1, 2, 3]);
1134
1135 let batch =
1136 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1137
1138 let mut buf = Vec::new();
1139 {
1140 let mut writer = LineDelimitedWriter::new(&mut buf);
1141 writer.write_batches(&[&batch]).unwrap();
1142 }
1143
1144 assert_json_eq(
1145 &buf,
1146 r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1147{"c2":2}
1148{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1149"#,
1150 );
1151 }
1152
1153 fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1154 let file = File::open(test_file).unwrap();
1155 let mut reader = BufReader::new(file);
1156 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1157 reader.rewind().unwrap();
1158
1159 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1160 let mut reader = builder.build(reader).unwrap();
1161 let batch = reader.next().unwrap().unwrap();
1162
1163 let mut buf = Vec::new();
1164 {
1165 if remove_nulls {
1166 let mut writer = LineDelimitedWriter::new(&mut buf);
1167 writer.write_batches(&[&batch]).unwrap();
1168 } else {
1169 let mut writer = WriterBuilder::new()
1170 .with_explicit_nulls(true)
1171 .build::<_, LineDelimited>(&mut buf);
1172 writer.write_batches(&[&batch]).unwrap();
1173 }
1174 }
1175
1176 let result = str::from_utf8(&buf).unwrap();
1177 let expected = read_to_string(test_file).unwrap();
1178 for (r, e) in result.lines().zip(expected.lines()) {
1179 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1180 if remove_nulls {
1181 if let Value::Object(obj) = expected_json {
1183 expected_json =
1184 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1185 }
1186 }
1187 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1188 }
1189 }
1190
1191 #[test]
1192 fn write_basic_rows() {
1193 test_write_for_file("test/data/basic.json", true);
1194 }
1195
1196 #[test]
1197 fn write_arrays() {
1198 test_write_for_file("test/data/arrays.json", true);
1199 }
1200
1201 #[test]
1202 fn write_basic_nulls() {
1203 test_write_for_file("test/data/basic_nulls.json", true);
1204 }
1205
1206 #[test]
1207 fn write_nested_with_nulls() {
1208 test_write_for_file("test/data/nested_with_nulls.json", false);
1209 }
1210
1211 #[test]
1212 fn json_line_writer_empty() {
1213 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1214 writer.finish().unwrap();
1215 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1216 }
1217
1218 #[test]
1219 fn json_array_writer_empty() {
1220 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1221 writer.finish().unwrap();
1222 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1223 }
1224
1225 #[test]
1226 fn json_line_writer_empty_batch() {
1227 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1228
1229 let array = Int32Array::from(Vec::<i32>::new());
1230 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1231 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1232
1233 writer.write(&batch).unwrap();
1234 writer.finish().unwrap();
1235 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1236 }
1237
1238 #[test]
1239 fn json_array_writer_empty_batch() {
1240 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1241
1242 let array = Int32Array::from(Vec::<i32>::new());
1243 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1244 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1245
1246 writer.write(&batch).unwrap();
1247 writer.finish().unwrap();
1248 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1249 }
1250
1251 #[test]
1252 fn json_struct_array_nulls() {
1253 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1254 Some(vec![Some(1), Some(2)]),
1255 Some(vec![None]),
1256 Some(vec![]),
1257 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1259 None, None,
1261 ]);
1262
1263 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1264 let array = Arc::new(inner) as ArrayRef;
1265 let struct_array_a = StructArray::from((
1266 vec![(field.clone(), array.clone())],
1267 Buffer::from([0b01010111]),
1268 ));
1269 let struct_array_b = StructArray::from(vec![(field, array)]);
1270
1271 let schema = Schema::new(vec![
1272 Field::new_struct("a", struct_array_a.fields().clone(), true),
1273 Field::new_struct("b", struct_array_b.fields().clone(), true),
1274 ]);
1275
1276 let batch = RecordBatch::try_new(
1277 Arc::new(schema),
1278 vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1279 )
1280 .unwrap();
1281
1282 let mut buf = Vec::new();
1283 {
1284 let mut writer = LineDelimitedWriter::new(&mut buf);
1285 writer.write_batches(&[&batch]).unwrap();
1286 }
1287
1288 assert_json_eq(
1289 &buf,
1290 r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1291{"a":{"list":[null]},"b":{"list":[null]}}
1292{"a":{"list":[]},"b":{"list":[]}}
1293{"b":{"list":[3,null]}}
1294{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1295{"b":{}}
1296{"a":{},"b":{}}
1297"#,
1298 );
1299 }
1300
1301 #[test]
1302 fn json_writer_map() {
1303 let keys_array = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1304 let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1305
1306 let keys = Arc::new(Field::new("keys", DataType::Utf8, false));
1307 let values = Arc::new(Field::new("values", DataType::Int64, false));
1308 let entry_struct = StructArray::from(vec![
1309 (keys, Arc::new(keys_array) as ArrayRef),
1310 (values, Arc::new(values_array) as ArrayRef),
1311 ]);
1312
1313 let map_data_type = DataType::Map(
1314 Arc::new(Field::new(
1315 "entries",
1316 entry_struct.data_type().clone(),
1317 false,
1318 )),
1319 false,
1320 );
1321
1322 let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1324 let valid_buffer = Buffer::from([0b00111101]);
1325
1326 let map_data = ArrayData::builder(map_data_type.clone())
1327 .len(6)
1328 .null_bit_buffer(Some(valid_buffer))
1329 .add_buffer(entry_offsets)
1330 .add_child_data(entry_struct.into_data())
1331 .build()
1332 .unwrap();
1333
1334 let map = MapArray::from(map_data);
1335
1336 let map_field = Field::new("map", map_data_type, true);
1337 let schema = Arc::new(Schema::new(vec![map_field]));
1338
1339 let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1340
1341 let mut buf = Vec::new();
1342 {
1343 let mut writer = LineDelimitedWriter::new(&mut buf);
1344 writer.write_batches(&[&batch]).unwrap();
1345 }
1346
1347 assert_json_eq(
1348 &buf,
1349 r#"{"map":{"foo":10}}
1350{}
1351{"map":{}}
1352{"map":{"bar":20,"baz":30,"qux":40}}
1353{"map":{"quux":50}}
1354{"map":{}}
1355"#,
1356 );
1357 }
1358
1359 #[test]
1360 fn test_write_single_batch() {
1361 let test_file = "test/data/basic.json";
1362 let file = File::open(test_file).unwrap();
1363 let mut reader = BufReader::new(file);
1364 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1365 reader.rewind().unwrap();
1366
1367 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1368 let mut reader = builder.build(reader).unwrap();
1369 let batch = reader.next().unwrap().unwrap();
1370
1371 let mut buf = Vec::new();
1372 {
1373 let mut writer = LineDelimitedWriter::new(&mut buf);
1374 writer.write(&batch).unwrap();
1375 }
1376
1377 let result = str::from_utf8(&buf).unwrap();
1378 let expected = read_to_string(test_file).unwrap();
1379 for (r, e) in result.lines().zip(expected.lines()) {
1380 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1381 if let Value::Object(obj) = expected_json {
1383 expected_json =
1384 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1385 }
1386 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1387 }
1388 }
1389
1390 #[test]
1391 fn test_write_multi_batches() {
1392 let test_file = "test/data/basic.json";
1393
1394 let schema = SchemaRef::new(Schema::new(vec![
1395 Field::new("a", DataType::Int64, true),
1396 Field::new("b", DataType::Float64, true),
1397 Field::new("c", DataType::Boolean, true),
1398 Field::new("d", DataType::Utf8, true),
1399 Field::new("e", DataType::Utf8, true),
1400 Field::new("f", DataType::Utf8, true),
1401 Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1402 Field::new("h", DataType::Float16, true),
1403 ]));
1404
1405 let mut reader = ReaderBuilder::new(schema.clone())
1406 .build(BufReader::new(File::open(test_file).unwrap()))
1407 .unwrap();
1408 let batch = reader.next().unwrap().unwrap();
1409
1410 let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1412
1413 let mut buf = Vec::new();
1414 {
1415 let mut writer = LineDelimitedWriter::new(&mut buf);
1416 writer.write_batches(&batches).unwrap();
1417 }
1418
1419 let result = str::from_utf8(&buf).unwrap();
1420 let expected = read_to_string(test_file).unwrap();
1421 let expected = format!("{expected}\n{expected}");
1423 for (r, e) in result.lines().zip(expected.lines()) {
1424 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1425 if let Value::Object(obj) = expected_json {
1427 expected_json =
1428 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1429 }
1430 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1431 }
1432 }
1433
1434 #[test]
1435 fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1436 fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1437 let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1438 Some(vec![None, None, None]),
1439 Some(vec![Some(1), Some(2), Some(3)]),
1440 None,
1441 Some(vec![None, None, None]),
1442 ]));
1443 let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1444 (array, field)
1446 }
1447
1448 fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1449 let array = Arc::new(DictionaryArray::from_iter(vec![
1450 Some("cupcakes"),
1451 None,
1452 Some("bear"),
1453 Some("kuma"),
1454 ]));
1455 let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1456 (array, field)
1458 }
1459
1460 fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1461 let string_builder = StringBuilder::new();
1462 let int_builder = Int64Builder::new();
1463 let mut builder = MapBuilder::new(None, string_builder, int_builder);
1464
1465 builder.keys().append_value("foo");
1467 builder.values().append_value(10);
1468 builder.append(true).unwrap();
1469
1470 builder.append(false).unwrap();
1471
1472 builder.append(true).unwrap();
1473
1474 builder.keys().append_value("bar");
1475 builder.values().append_value(20);
1476 builder.keys().append_value("baz");
1477 builder.values().append_value(30);
1478 builder.keys().append_value("qux");
1479 builder.values().append_value(40);
1480 builder.append(true).unwrap();
1481
1482 let array = Arc::new(builder.finish());
1483 let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1484 (array, field)
1485 }
1486
1487 fn root_list() -> (Arc<ListArray>, Field) {
1488 let struct_array = StructArray::from(vec![
1489 (
1490 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1491 Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1492 ),
1493 (
1494 Arc::new(Field::new("int32", DataType::Int32, true)),
1495 Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1496 ),
1497 ]);
1498
1499 let field = Field::new_list(
1500 "list",
1501 Field::new("struct", struct_array.data_type().clone(), true),
1502 true,
1503 );
1504
1505 let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1507 let data = ArrayData::builder(field.data_type().clone())
1508 .len(4)
1509 .add_buffer(entry_offsets)
1510 .add_child_data(struct_array.into_data())
1511 .null_bit_buffer(Some([0b00000101].into()))
1512 .build()
1513 .unwrap();
1514 let array = Arc::new(ListArray::from(data));
1515 (array, field)
1516 }
1517
1518 let (nested_list_array, nested_list_field) = nested_list();
1519 let (nested_dict_array, nested_dict_field) = nested_dict();
1520 let (nested_map_array, nested_map_field) = nested_map();
1521 let (root_list_array, root_list_field) = root_list();
1522
1523 let schema = Schema::new(vec![
1524 Field::new("date", DataType::Date32, true),
1525 Field::new("null", DataType::Null, true),
1526 Field::new_struct(
1527 "struct",
1528 vec![
1529 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1530 nested_list_field.clone(),
1531 nested_dict_field.clone(),
1532 nested_map_field.clone(),
1533 ],
1534 true,
1535 ),
1536 root_list_field,
1537 ]);
1538
1539 let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1540 let arr_null = NullArray::new(4);
1541 let arr_struct = StructArray::from(vec![
1542 (
1544 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1545 Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1546 ),
1547 (nested_list_field, nested_list_array as ArrayRef),
1549 (nested_dict_field, nested_dict_array as ArrayRef),
1551 (nested_map_field, nested_map_array as ArrayRef),
1553 ]);
1554
1555 let batch = RecordBatch::try_new(
1556 Arc::new(schema),
1557 vec![
1558 Arc::new(arr_date32),
1560 Arc::new(arr_null),
1562 Arc::new(arr_struct),
1563 root_list_array,
1565 ],
1566 )?;
1567
1568 let mut buf = Vec::new();
1569 {
1570 let mut writer = WriterBuilder::new()
1571 .with_explicit_nulls(true)
1572 .build::<_, JsonArray>(&mut buf);
1573 writer.write_batches(&[&batch])?;
1574 writer.finish()?;
1575 }
1576
1577 let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1578 let expected = serde_json::from_value::<Vec<Value>>(json!([
1579 {
1580 "date": "1970-01-01",
1581 "list": [
1582 {
1583 "int32": 1,
1584 "utf8": "a"
1585 },
1586 {
1587 "int32": null,
1588 "utf8": "b"
1589 }
1590 ],
1591 "null": null,
1592 "struct": {
1593 "dict": "cupcakes",
1594 "list": [
1595 null,
1596 null,
1597 null
1598 ],
1599 "map": {
1600 "foo": 10
1601 },
1602 "utf8": "a"
1603 }
1604 },
1605 {
1606 "date": null,
1607 "list": null,
1608 "null": null,
1609 "struct": {
1610 "dict": null,
1611 "list": [
1612 1,
1613 2,
1614 3
1615 ],
1616 "map": null,
1617 "utf8": null
1618 }
1619 },
1620 {
1621 "date": "1970-01-02",
1622 "list": [
1623 {
1624 "int32": 5,
1625 "utf8": null
1626 }
1627 ],
1628 "null": null,
1629 "struct": {
1630 "dict": "bear",
1631 "list": null,
1632 "map": {},
1633 "utf8": null
1634 }
1635 },
1636 {
1637 "date": null,
1638 "list": null,
1639 "null": null,
1640 "struct": {
1641 "dict": "kuma",
1642 "list": [
1643 null,
1644 null,
1645 null
1646 ],
1647 "map": {
1648 "bar": 20,
1649 "baz": 30,
1650 "qux": 40
1651 },
1652 "utf8": "b"
1653 }
1654 }
1655 ]))
1656 .unwrap();
1657
1658 assert_eq!(actual, expected);
1659
1660 Ok(())
1661 }
1662
1663 fn binary_encoding_test<O: OffsetSizeTrait>() {
1664 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1666 "bytes",
1667 GenericBinaryType::<O>::DATA_TYPE,
1668 true,
1669 )]));
1670
1671 let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1673 let values = [Some(b"Ned Flanders"), None, Some(b"Troy McClure")];
1674 for value in values {
1675 match value {
1676 Some(v) => builder.append_value(v),
1677 None => builder.append_null(),
1678 }
1679 }
1680 let array = Arc::new(builder.finish()) as ArrayRef;
1681 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1682
1683 {
1685 let mut buf = Vec::new();
1686 let json_value: Value = {
1687 let mut writer = WriterBuilder::new()
1688 .with_explicit_nulls(true)
1689 .build::<_, JsonArray>(&mut buf);
1690 writer.write(&batch).unwrap();
1691 writer.close().unwrap();
1692 serde_json::from_slice(&buf).unwrap()
1693 };
1694
1695 assert_eq!(
1696 json!([
1697 {
1698 "bytes": "4e656420466c616e64657273"
1699 },
1700 {
1701 "bytes": null },
1703 {
1704 "bytes": "54726f79204d63436c757265"
1705 }
1706 ]),
1707 json_value,
1708 );
1709 }
1710
1711 {
1713 let mut buf = Vec::new();
1714 let json_value: Value = {
1715 let mut writer = ArrayWriter::new(&mut buf);
1718 writer.write(&batch).unwrap();
1719 writer.close().unwrap();
1720 serde_json::from_slice(&buf).unwrap()
1721 };
1722
1723 assert_eq!(
1724 json!([
1725 {
1726 "bytes": "4e656420466c616e64657273"
1727 },
1728 {}, {
1730 "bytes": "54726f79204d63436c757265"
1731 }
1732 ]),
1733 json_value
1734 );
1735 }
1736 }
1737
1738 #[test]
1739 fn test_writer_binary() {
1740 binary_encoding_test::<i32>();
1742 binary_encoding_test::<i64>();
1744 }
1745
1746 #[test]
1747 fn test_writer_fixed_size_binary() {
1748 let size = 11;
1750 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1751 "bytes",
1752 DataType::FixedSizeBinary(size),
1753 true,
1754 )]));
1755
1756 let mut builder = FixedSizeBinaryBuilder::new(size);
1758 let values = [Some(b"hello world"), None, Some(b"summer rain")];
1759 for value in values {
1760 match value {
1761 Some(v) => builder.append_value(v).unwrap(),
1762 None => builder.append_null(),
1763 }
1764 }
1765 let array = Arc::new(builder.finish()) as ArrayRef;
1766 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1767
1768 {
1770 let mut buf = Vec::new();
1771 let json_value: Value = {
1772 let mut writer = WriterBuilder::new()
1773 .with_explicit_nulls(true)
1774 .build::<_, JsonArray>(&mut buf);
1775 writer.write(&batch).unwrap();
1776 writer.close().unwrap();
1777 serde_json::from_slice(&buf).unwrap()
1778 };
1779
1780 assert_eq!(
1781 json!([
1782 {
1783 "bytes": "68656c6c6f20776f726c64"
1784 },
1785 {
1786 "bytes": null },
1788 {
1789 "bytes": "73756d6d6572207261696e"
1790 }
1791 ]),
1792 json_value,
1793 );
1794 }
1795 {
1797 let mut buf = Vec::new();
1798 let json_value: Value = {
1799 let mut writer = ArrayWriter::new(&mut buf);
1802 writer.write(&batch).unwrap();
1803 writer.close().unwrap();
1804 serde_json::from_slice(&buf).unwrap()
1805 };
1806
1807 assert_eq!(
1808 json!([
1809 {
1810 "bytes": "68656c6c6f20776f726c64"
1811 },
1812 {}, {
1814 "bytes": "73756d6d6572207261696e"
1815 }
1816 ]),
1817 json_value,
1818 );
1819 }
1820 }
1821
1822 #[test]
1823 fn test_writer_fixed_size_list() {
1824 let size = 3;
1825 let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1826 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1827 "list",
1828 DataType::FixedSizeList(field, size),
1829 true,
1830 )]));
1831
1832 let values_builder = Int32Builder::new();
1833 let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1834 let lists = [
1835 Some([Some(1), Some(2), None]),
1836 Some([Some(3), None, Some(4)]),
1837 Some([None, Some(5), Some(6)]),
1838 None,
1839 ];
1840 for list in lists {
1841 match list {
1842 Some(l) => {
1843 for value in l {
1844 match value {
1845 Some(v) => list_builder.values().append_value(v),
1846 None => list_builder.values().append_null(),
1847 }
1848 }
1849 list_builder.append(true);
1850 }
1851 None => {
1852 for _ in 0..size {
1853 list_builder.values().append_null();
1854 }
1855 list_builder.append(false);
1856 }
1857 }
1858 }
1859 let array = Arc::new(list_builder.finish()) as ArrayRef;
1860 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1861
1862 {
1864 let json_value: Value = {
1865 let mut buf = Vec::new();
1866 let mut writer = WriterBuilder::new()
1867 .with_explicit_nulls(true)
1868 .build::<_, JsonArray>(&mut buf);
1869 writer.write(&batch).unwrap();
1870 writer.close().unwrap();
1871 serde_json::from_slice(&buf).unwrap()
1872 };
1873 assert_eq!(
1874 json!([
1875 {"list": [1, 2, null]},
1876 {"list": [3, null, 4]},
1877 {"list": [null, 5, 6]},
1878 {"list": null},
1879 ]),
1880 json_value
1881 );
1882 }
1883 {
1885 let json_value: Value = {
1886 let mut buf = Vec::new();
1887 let mut writer = ArrayWriter::new(&mut buf);
1888 writer.write(&batch).unwrap();
1889 writer.close().unwrap();
1890 serde_json::from_slice(&buf).unwrap()
1891 };
1892 assert_eq!(
1893 json!([
1894 {"list": [1, 2, null]},
1895 {"list": [3, null, 4]},
1896 {"list": [null, 5, 6]},
1897 {}, ]),
1899 json_value
1900 );
1901 }
1902 }
1903
1904 #[test]
1905 fn test_writer_null_dict() {
1906 let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
1907 let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
1908 let dict = DictionaryArray::new(keys, values);
1909
1910 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1911 "my_dict",
1912 DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
1913 true,
1914 )]));
1915
1916 let array = Arc::new(dict) as ArrayRef;
1917 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1918
1919 let mut json = Vec::new();
1920 let write_builder = WriterBuilder::new().with_explicit_nulls(true);
1921 let mut writer = write_builder.build::<_, JsonArray>(&mut json);
1922 writer.write(&batch).unwrap();
1923 writer.close().unwrap();
1924
1925 let json_str = str::from_utf8(&json).unwrap();
1926 assert_eq!(
1927 json_str,
1928 r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
1929 )
1930 }
1931
1932 #[test]
1933 fn test_decimal32_encoder() {
1934 let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
1935 .with_precision_and_scale(8, 2)
1936 .unwrap();
1937 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1938 let schema = Schema::new(vec![field]);
1939 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1940
1941 let mut buf = Vec::new();
1942 {
1943 let mut writer = LineDelimitedWriter::new(&mut buf);
1944 writer.write_batches(&[&batch]).unwrap();
1945 }
1946
1947 assert_json_eq(
1948 &buf,
1949 r#"{"decimal":12.34}
1950{"decimal":56.78}
1951{"decimal":90.12}
1952"#,
1953 );
1954 }
1955
1956 #[test]
1957 fn test_decimal64_encoder() {
1958 let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
1959 .with_precision_and_scale(10, 2)
1960 .unwrap();
1961 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1962 let schema = Schema::new(vec![field]);
1963 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1964
1965 let mut buf = Vec::new();
1966 {
1967 let mut writer = LineDelimitedWriter::new(&mut buf);
1968 writer.write_batches(&[&batch]).unwrap();
1969 }
1970
1971 assert_json_eq(
1972 &buf,
1973 r#"{"decimal":12.34}
1974{"decimal":56.78}
1975{"decimal":90.12}
1976"#,
1977 );
1978 }
1979
1980 #[test]
1981 fn test_decimal128_encoder() {
1982 let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
1983 .with_precision_and_scale(10, 2)
1984 .unwrap();
1985 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1986 let schema = Schema::new(vec![field]);
1987 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1988
1989 let mut buf = Vec::new();
1990 {
1991 let mut writer = LineDelimitedWriter::new(&mut buf);
1992 writer.write_batches(&[&batch]).unwrap();
1993 }
1994
1995 assert_json_eq(
1996 &buf,
1997 r#"{"decimal":12.34}
1998{"decimal":56.78}
1999{"decimal":90.12}
2000"#,
2001 );
2002 }
2003
2004 #[test]
2005 fn test_decimal256_encoder() {
2006 let array = Decimal256Array::from_iter_values([
2007 i256::from(123400),
2008 i256::from(567800),
2009 i256::from(901200),
2010 ])
2011 .with_precision_and_scale(10, 4)
2012 .unwrap();
2013 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2014 let schema = Schema::new(vec![field]);
2015 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2016
2017 let mut buf = Vec::new();
2018 {
2019 let mut writer = LineDelimitedWriter::new(&mut buf);
2020 writer.write_batches(&[&batch]).unwrap();
2021 }
2022
2023 assert_json_eq(
2024 &buf,
2025 r#"{"decimal":12.3400}
2026{"decimal":56.7800}
2027{"decimal":90.1200}
2028"#,
2029 );
2030 }
2031
2032 #[test]
2033 fn test_decimal_encoder_with_nulls() {
2034 let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
2035 .with_precision_and_scale(10, 2)
2036 .unwrap();
2037 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2038 let schema = Schema::new(vec![field]);
2039 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2040
2041 let mut buf = Vec::new();
2042 {
2043 let mut writer = LineDelimitedWriter::new(&mut buf);
2044 writer.write_batches(&[&batch]).unwrap();
2045 }
2046
2047 assert_json_eq(
2048 &buf,
2049 r#"{"decimal":12.34}
2050{}
2051{"decimal":56.78}
2052"#,
2053 );
2054 }
2055
2056 #[test]
2057 fn write_structs_as_list() {
2058 let schema = Schema::new(vec![
2059 Field::new(
2060 "c1",
2061 DataType::Struct(Fields::from(vec![
2062 Field::new("c11", DataType::Int32, true),
2063 Field::new(
2064 "c12",
2065 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2066 false,
2067 ),
2068 ])),
2069 false,
2070 ),
2071 Field::new("c2", DataType::Utf8, false),
2072 ]);
2073
2074 let c1 = StructArray::from(vec![
2075 (
2076 Arc::new(Field::new("c11", DataType::Int32, true)),
2077 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2078 ),
2079 (
2080 Arc::new(Field::new(
2081 "c12",
2082 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2083 false,
2084 )),
2085 Arc::new(StructArray::from(vec![(
2086 Arc::new(Field::new("c121", DataType::Utf8, false)),
2087 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2088 )])) as ArrayRef,
2089 ),
2090 ]);
2091 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2092
2093 let batch =
2094 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2095
2096 let expected = r#"[[1,["e"]],"a"]
2097[[null,["f"]],"b"]
2098[[5,["g"]],"c"]
2099"#;
2100
2101 let mut buf = Vec::new();
2102 {
2103 let builder = WriterBuilder::new()
2104 .with_explicit_nulls(true)
2105 .with_struct_mode(StructMode::ListOnly);
2106 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2107 writer.write_batches(&[&batch]).unwrap();
2108 }
2109 assert_json_eq(&buf, expected);
2110
2111 let mut buf = Vec::new();
2112 {
2113 let builder = WriterBuilder::new()
2114 .with_explicit_nulls(false)
2115 .with_struct_mode(StructMode::ListOnly);
2116 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2117 writer.write_batches(&[&batch]).unwrap();
2118 }
2119 assert_json_eq(&buf, expected);
2120 }
2121
2122 fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2123 #[derive(Debug)]
2126 enum UnionValue {
2127 Int32(i32),
2128 String(String),
2129 }
2130
2131 #[derive(Debug)]
2132 struct UnionEncoder {
2133 array: Vec<Option<UnionValue>>,
2134 }
2135
2136 impl Encoder for UnionEncoder {
2137 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2138 match &self.array[idx] {
2139 None => out.extend_from_slice(b"null"),
2140 Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2141 Some(UnionValue::String(v)) => {
2142 out.extend_from_slice(format!("\"{v}\"").as_bytes())
2143 }
2144 }
2145 }
2146 }
2147
2148 #[derive(Debug)]
2149 struct UnionEncoderFactory;
2150
2151 impl EncoderFactory for UnionEncoderFactory {
2152 fn make_default_encoder<'a>(
2153 &self,
2154 _field: &'a FieldRef,
2155 array: &'a dyn Array,
2156 _options: &'a EncoderOptions,
2157 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2158 let data_type = array.data_type();
2159 let fields = match data_type {
2160 DataType::Union(fields, UnionMode::Sparse) => fields,
2161 _ => return Ok(None),
2162 };
2163 let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2165 for f in fields.iter() {
2166 match f.data_type() {
2167 DataType::Null => {}
2168 DataType::Int32 => {}
2169 DataType::Utf8 => {}
2170 _ => return Ok(None),
2171 }
2172 }
2173 let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2174 let mut values = Vec::with_capacity(type_ids.len());
2175 for idx in 0..type_ids.len() {
2176 let type_id = type_ids[idx];
2177 let field = &fields[type_id as usize];
2178 let value = match field.data_type() {
2179 DataType::Null => None,
2180 DataType::Int32 => Some(UnionValue::Int32(
2181 buffers[type_id as usize]
2182 .as_primitive::<Int32Type>()
2183 .value(idx),
2184 )),
2185 DataType::Utf8 => Some(UnionValue::String(
2186 buffers[type_id as usize]
2187 .as_string::<i32>()
2188 .value(idx)
2189 .to_string(),
2190 )),
2191 _ => unreachable!(),
2192 };
2193 values.push(value);
2194 }
2195 let array_encoder =
2196 Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2197 let nulls = array.nulls().cloned();
2198 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2199 }
2200 }
2201
2202 let int_array = Int32Array::from(vec![Some(1), None, None]);
2203 let string_array = StringArray::from(vec![None, Some("a"), None]);
2204 let null_array = NullArray::new(3);
2205 let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2206
2207 let union_fields = [
2208 (0, Arc::new(Field::new("A", DataType::Int32, false))),
2209 (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2210 (2, Arc::new(Field::new("C", DataType::Null, false))),
2211 ]
2212 .into_iter()
2213 .collect::<UnionFields>();
2214
2215 let children = vec![
2216 Arc::new(int_array) as Arc<dyn Array>,
2217 Arc::new(string_array),
2218 Arc::new(null_array),
2219 ];
2220
2221 let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2222
2223 let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2224
2225 let fields = vec![
2226 Field::new(
2227 "union",
2228 DataType::Union(union_fields, UnionMode::Sparse),
2229 true,
2230 ),
2231 Field::new("float", DataType::Float64, true),
2232 ];
2233
2234 let batch = RecordBatch::try_new(
2235 Arc::new(Schema::new(fields)),
2236 vec![
2237 Arc::new(array) as Arc<dyn Array>,
2238 Arc::new(float_array) as Arc<dyn Array>,
2239 ],
2240 )
2241 .unwrap();
2242
2243 (batch, Arc::new(UnionEncoderFactory))
2244 }
2245
2246 #[test]
2247 fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2248 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2249
2250 let mut buf = Vec::new();
2251 {
2252 let mut writer = WriterBuilder::new()
2253 .with_encoder_factory(encoder_factory)
2254 .with_explicit_nulls(false)
2255 .build::<_, LineDelimited>(&mut buf);
2256 writer.write_batches(&[&batch]).unwrap();
2257 writer.finish().unwrap();
2258 }
2259
2260 println!("{}", str::from_utf8(&buf).unwrap());
2261
2262 assert_json_eq(
2263 &buf,
2264 r#"{"union":1,"float":1.0}
2265{"union":"a"}
2266{"union":null,"float":3.4}
2267"#,
2268 );
2269 }
2270
2271 #[test]
2272 fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2273 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2274
2275 let mut buf = Vec::new();
2276 {
2277 let mut writer = WriterBuilder::new()
2278 .with_encoder_factory(encoder_factory)
2279 .with_explicit_nulls(true)
2280 .build::<_, LineDelimited>(&mut buf);
2281 writer.write_batches(&[&batch]).unwrap();
2282 writer.finish().unwrap();
2283 }
2284
2285 assert_json_eq(
2286 &buf,
2287 r#"{"union":1,"float":1.0}
2288{"union":"a","float":null}
2289{"union":null,"float":3.4}
2290"#,
2291 );
2292 }
2293
2294 #[test]
2295 fn test_fallback_encoder_factory_array_implicit_nulls() {
2296 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2297
2298 let json_value: Value = {
2299 let mut buf = Vec::new();
2300 let mut writer = WriterBuilder::new()
2301 .with_encoder_factory(encoder_factory)
2302 .build::<_, JsonArray>(&mut buf);
2303 writer.write_batches(&[&batch]).unwrap();
2304 writer.finish().unwrap();
2305 serde_json::from_slice(&buf).unwrap()
2306 };
2307
2308 let expected = json!([
2309 {"union":1,"float":1.0},
2310 {"union":"a"},
2311 {"float":3.4,"union":null},
2312 ]);
2313
2314 assert_eq!(json_value, expected);
2315 }
2316
2317 #[test]
2318 fn test_fallback_encoder_factory_array_explicit_nulls() {
2319 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2320
2321 let json_value: Value = {
2322 let mut buf = Vec::new();
2323 let mut writer = WriterBuilder::new()
2324 .with_encoder_factory(encoder_factory)
2325 .with_explicit_nulls(true)
2326 .build::<_, JsonArray>(&mut buf);
2327 writer.write_batches(&[&batch]).unwrap();
2328 writer.finish().unwrap();
2329 serde_json::from_slice(&buf).unwrap()
2330 };
2331
2332 let expected = json!([
2333 {"union":1,"float":1.0},
2334 {"union":"a", "float": null},
2335 {"union":null,"float":3.4},
2336 ]);
2337
2338 assert_eq!(json_value, expected);
2339 }
2340
2341 #[test]
2342 fn test_default_encoder_byte_array() {
2343 struct IntArrayBinaryEncoder<B> {
2344 array: B,
2345 }
2346
2347 impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2348 where
2349 B: ArrayAccessor<Item = &'a [u8]>,
2350 {
2351 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2352 out.push(b'[');
2353 let child = self.array.value(idx);
2354 for (idx, byte) in child.iter().enumerate() {
2355 write!(out, "{byte}").unwrap();
2356 if idx < child.len() - 1 {
2357 out.push(b',');
2358 }
2359 }
2360 out.push(b']');
2361 }
2362 }
2363
2364 #[derive(Debug)]
2365 struct IntArayBinaryEncoderFactory;
2366
2367 impl EncoderFactory for IntArayBinaryEncoderFactory {
2368 fn make_default_encoder<'a>(
2369 &self,
2370 _field: &'a FieldRef,
2371 array: &'a dyn Array,
2372 _options: &'a EncoderOptions,
2373 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2374 match array.data_type() {
2375 DataType::Binary => {
2376 let array = array.as_binary::<i32>();
2377 let encoder = IntArrayBinaryEncoder { array };
2378 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2379 let nulls = array.nulls().cloned();
2380 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2381 }
2382 _ => Ok(None),
2383 }
2384 }
2385 }
2386
2387 let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2388 let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2389 let fields = vec![
2390 Field::new("bytes", DataType::Binary, true),
2391 Field::new("float", DataType::Float64, true),
2392 ];
2393 let batch = RecordBatch::try_new(
2394 Arc::new(Schema::new(fields)),
2395 vec![
2396 Arc::new(binary_array) as Arc<dyn Array>,
2397 Arc::new(float_array) as Arc<dyn Array>,
2398 ],
2399 )
2400 .unwrap();
2401
2402 let json_value: Value = {
2403 let mut buf = Vec::new();
2404 let mut writer = WriterBuilder::new()
2405 .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2406 .build::<_, JsonArray>(&mut buf);
2407 writer.write_batches(&[&batch]).unwrap();
2408 writer.finish().unwrap();
2409 serde_json::from_slice(&buf).unwrap()
2410 };
2411
2412 let expected = json!([
2413 {"bytes": [97], "float": 1.0},
2414 {"float": 2.3},
2415 {"bytes": [98]},
2416 ]);
2417
2418 assert_eq!(json_value, expected);
2419 }
2420
2421 #[test]
2422 fn test_encoder_factory_customize_dictionary() {
2423 struct PaddedInt32Encoder {
2428 array: Int32Array,
2429 }
2430
2431 impl Encoder for PaddedInt32Encoder {
2432 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2433 let value = self.array.value(idx);
2434 write!(out, "\"{value:0>8}\"").unwrap();
2435 }
2436 }
2437
2438 #[derive(Debug)]
2439 struct CustomEncoderFactory;
2440
2441 impl EncoderFactory for CustomEncoderFactory {
2442 fn make_default_encoder<'a>(
2443 &self,
2444 field: &'a FieldRef,
2445 array: &'a dyn Array,
2446 _options: &'a EncoderOptions,
2447 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2448 let padded = field
2453 .metadata()
2454 .get("padded")
2455 .map(|v| v == "true")
2456 .unwrap_or_default();
2457 match (array.data_type(), padded) {
2458 (DataType::Int32, true) => {
2459 let array = array.as_primitive::<Int32Type>();
2460 let nulls = array.nulls().cloned();
2461 let encoder = PaddedInt32Encoder {
2462 array: array.clone(),
2463 };
2464 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2465 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2466 }
2467 _ => Ok(None),
2468 }
2469 }
2470 }
2471
2472 let to_json = |batch| {
2473 let mut buf = Vec::new();
2474 let mut writer = WriterBuilder::new()
2475 .with_encoder_factory(Arc::new(CustomEncoderFactory))
2476 .build::<_, JsonArray>(&mut buf);
2477 writer.write_batches(&[batch]).unwrap();
2478 writer.finish().unwrap();
2479 serde_json::from_slice::<Value>(&buf).unwrap()
2480 };
2481
2482 let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2484 let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2485 HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2486 ));
2487 let batch = RecordBatch::try_new(
2488 Arc::new(Schema::new(vec![field.clone()])),
2489 vec![Arc::new(array)],
2490 )
2491 .unwrap();
2492
2493 let json_value = to_json(&batch);
2494
2495 let expected = json!([
2496 {"int": "00000001"},
2497 {},
2498 {"int": "00000002"},
2499 ]);
2500
2501 assert_eq!(json_value, expected);
2502
2503 let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2505 array_builder.append_value(1);
2506 array_builder.append_null();
2507 array_builder.append_value(1);
2508 let array = array_builder.finish();
2509 let field = Field::new(
2510 "int",
2511 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2512 true,
2513 )
2514 .with_metadata(HashMap::from_iter(vec![(
2515 "padded".to_string(),
2516 "true".to_string(),
2517 )]));
2518 let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2519 .unwrap();
2520
2521 let json_value = to_json(&batch);
2522
2523 let expected = json!([
2524 {"int": "00000001"},
2525 {},
2526 {"int": "00000001"},
2527 ]);
2528
2529 assert_eq!(json_value, expected);
2530 }
2531}