1mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{Encoder, EncoderFactory, EncoderOptions, NullableEncoder, make_encoder};
116
117pub trait JsonFormat: Debug + Default {
120 #[inline]
121 fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123 Ok(())
124 }
125
126 #[inline]
127 fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129 Ok(())
130 }
131
132 #[inline]
133 fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135 Ok(())
136 }
137
138 fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140 Ok(())
141 }
142}
143
144#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157 fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158 writer.write_all(b"\n")?;
159 Ok(())
160 }
161}
162
163#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174 fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175 writer.write_all(b"[")?;
176 Ok(())
177 }
178
179 fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180 if !is_first_row {
181 writer.write_all(b",")?;
182 }
183 Ok(())
184 }
185
186 fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187 writer.write_all(b"]")?;
188 Ok(())
189 }
190}
191
192pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn explicit_nulls(&self) -> bool {
228 self.0.explicit_nulls()
229 }
230
231 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254 self.0 = self.0.with_explicit_nulls(explicit_nulls);
255 self
256 }
257
258 pub fn struct_mode(&self) -> StructMode {
260 self.0.struct_mode()
261 }
262
263 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269 self.0 = self.0.with_struct_mode(struct_mode);
270 self
271 }
272
273 pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278 self.0 = self.0.with_encoder_factory(factory);
279 self
280 }
281
282 pub fn with_date_format(mut self, format: String) -> Self {
284 self.0 = self.0.with_date_format(format);
285 self
286 }
287
288 pub fn with_datetime_format(mut self, format: String) -> Self {
290 self.0 = self.0.with_datetime_format(format);
291 self
292 }
293
294 pub fn with_time_format(mut self, format: String) -> Self {
296 self.0 = self.0.with_time_format(format);
297 self
298 }
299
300 pub fn with_timestamp_format(mut self, format: String) -> Self {
302 self.0 = self.0.with_timestamp_format(format);
303 self
304 }
305
306 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
308 self.0 = self.0.with_timestamp_tz_format(tz_format);
309 self
310 }
311
312 pub fn build<W, F>(self, writer: W) -> Writer<W, F>
314 where
315 W: Write,
316 F: JsonFormat,
317 {
318 Writer {
319 writer,
320 started: false,
321 finished: false,
322 format: F::default(),
323 options: self.0,
324 }
325 }
326}
327
328#[derive(Debug)]
339pub struct Writer<W, F>
340where
341 W: Write,
342 F: JsonFormat,
343{
344 writer: W,
346
347 started: bool,
349
350 finished: bool,
352
353 format: F,
355
356 options: EncoderOptions,
358}
359
360impl<W, F> Writer<W, F>
361where
362 W: Write,
363 F: JsonFormat,
364{
365 pub fn new(writer: W) -> Self {
367 Self {
368 writer,
369 started: false,
370 finished: false,
371 format: F::default(),
372 options: EncoderOptions::default(),
373 }
374 }
375
376 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
378 if batch.num_rows() == 0 {
379 return Ok(());
380 }
381
382 let mut buffer = Vec::with_capacity(16 * 1024);
385
386 let mut is_first_row = !self.started;
387 if !self.started {
388 self.format.start_stream(&mut buffer)?;
389 self.started = true;
390 }
391
392 let array = StructArray::from(batch.clone());
393 let field = Arc::new(Field::new_struct(
394 "",
395 batch.schema().fields().clone(),
396 false,
397 ));
398
399 let mut encoder = make_encoder(&field, &array, &self.options)?;
400
401 assert!(!encoder.has_nulls(), "root cannot be nullable");
403 for idx in 0..batch.num_rows() {
404 self.format.start_row(&mut buffer, is_first_row)?;
405 is_first_row = false;
406
407 encoder.encode(idx, &mut buffer);
408 if buffer.len() > 8 * 1024 {
409 self.writer.write_all(&buffer)?;
410 buffer.clear();
411 }
412 self.format.end_row(&mut buffer)?;
413 }
414
415 if !buffer.is_empty() {
416 self.writer.write_all(&buffer)?;
417 }
418
419 Ok(())
420 }
421
422 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
424 for b in batches {
425 self.write(b)?;
426 }
427 Ok(())
428 }
429
430 pub fn finish(&mut self) -> Result<(), ArrowError> {
434 if !self.started {
435 self.format.start_stream(&mut self.writer)?;
436 self.started = true;
437 }
438 if !self.finished {
439 self.format.end_stream(&mut self.writer)?;
440 self.finished = true;
441 }
442
443 Ok(())
444 }
445
446 pub fn get_ref(&self) -> &W {
448 &self.writer
449 }
450
451 pub fn get_mut(&mut self) -> &mut W {
456 &mut self.writer
457 }
458
459 pub fn into_inner(self) -> W {
461 self.writer
462 }
463}
464
465impl<W, F> RecordBatchWriter for Writer<W, F>
466where
467 W: Write,
468 F: JsonFormat,
469{
470 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
471 self.write(batch)
472 }
473
474 fn close(mut self) -> Result<(), ArrowError> {
475 self.finish()
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use core::str;
482 use std::collections::HashMap;
483 use std::fs::{File, read_to_string};
484 use std::io::{BufReader, Seek};
485 use std::sync::Arc;
486
487 use arrow_array::cast::AsArray;
488 use serde_json::{Value, json};
489
490 use super::LineDelimited;
491 use super::{Encoder, WriterBuilder};
492 use arrow_array::builder::*;
493 use arrow_array::types::*;
494 use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice, i256};
495 use arrow_data::ArrayData;
496
497 use crate::reader::*;
498
499 use super::*;
500
501 fn assert_json_eq(input: &[u8], expected: &str) {
503 let expected: Vec<Option<Value>> = expected
504 .split('\n')
505 .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
506 .collect();
507
508 let actual: Vec<Option<Value>> = input
509 .split(|b| *b == b'\n')
510 .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
511 .collect();
512
513 assert_eq!(actual, expected);
514 }
515
516 #[test]
517 fn write_simple_rows() {
518 let schema = Schema::new(vec![
519 Field::new("c1", DataType::Int32, true),
520 Field::new("c2", DataType::Utf8, true),
521 ]);
522
523 let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
524 let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
525
526 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
527
528 let mut buf = Vec::new();
529 {
530 let mut writer = LineDelimitedWriter::new(&mut buf);
531 writer.write_batches(&[&batch]).unwrap();
532 }
533
534 assert_json_eq(
535 &buf,
536 r#"{"c1":1,"c2":"a"}
537{"c1":2,"c2":"b"}
538{"c1":3,"c2":"c"}
539{"c2":"d"}
540{"c1":5}
541"#,
542 );
543 }
544
545 #[test]
546 fn write_large_utf8_and_utf8_view() {
547 let schema = Schema::new(vec![
548 Field::new("c1", DataType::Utf8, true),
549 Field::new("c2", DataType::LargeUtf8, true),
550 Field::new("c3", DataType::Utf8View, true),
551 ]);
552
553 let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
554 let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
555 let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
556
557 let batch = RecordBatch::try_new(
558 Arc::new(schema),
559 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
560 )
561 .unwrap();
562
563 let mut buf = Vec::new();
564 {
565 let mut writer = LineDelimitedWriter::new(&mut buf);
566 writer.write_batches(&[&batch]).unwrap();
567 }
568
569 assert_json_eq(
570 &buf,
571 r#"{"c1":"a","c2":"a","c3":"a"}
572{"c2":"b","c3":"b"}
573{"c1":"c"}
574{"c1":"d","c2":"d","c3":"d"}
575{}
576"#,
577 );
578 }
579
580 #[test]
581 fn write_dictionary() {
582 let schema = Schema::new(vec![
583 Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
584 Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
585 ]);
586
587 let a: DictionaryArray<Int32Type> = vec![
588 Some("cupcakes"),
589 Some("foo"),
590 Some("foo"),
591 None,
592 Some("cupcakes"),
593 ]
594 .into_iter()
595 .collect();
596 let b: DictionaryArray<Int8Type> =
597 vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
598 .into_iter()
599 .collect();
600
601 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
602
603 let mut buf = Vec::new();
604 {
605 let mut writer = LineDelimitedWriter::new(&mut buf);
606 writer.write_batches(&[&batch]).unwrap();
607 }
608
609 assert_json_eq(
610 &buf,
611 r#"{"c1":"cupcakes","c2":"sdsd"}
612{"c1":"foo","c2":"sdsd"}
613{"c1":"foo"}
614{"c2":"sd"}
615{"c1":"cupcakes","c2":"sdsd"}
616"#,
617 );
618 }
619
620 #[test]
621 fn write_list_of_dictionary() {
622 let dict_field = Arc::new(Field::new_dictionary(
623 "item",
624 DataType::Int32,
625 DataType::Utf8,
626 true,
627 ));
628 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
629
630 let dict_array: DictionaryArray<Int32Type> =
631 vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
632 .into_iter()
633 .collect();
634 let list_array = LargeListArray::try_new(
635 dict_field,
636 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
637 Arc::new(dict_array),
638 Some(NullBuffer::from_iter([true, true, false, true])),
639 )
640 .unwrap();
641
642 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
643
644 let mut buf = Vec::new();
645 {
646 let mut writer = LineDelimitedWriter::new(&mut buf);
647 writer.write_batches(&[&batch]).unwrap();
648 }
649
650 assert_json_eq(
651 &buf,
652 r#"{"l":["a","b","c"]}
653{"l":["a",null]}
654{}
655{"l":["c"]}
656"#,
657 );
658 }
659
660 #[test]
661 fn write_list_of_dictionary_large_values() {
662 let dict_field = Arc::new(Field::new_dictionary(
663 "item",
664 DataType::Int32,
665 DataType::LargeUtf8,
666 true,
667 ));
668 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
669
670 let keys = PrimitiveArray::<Int32Type>::from(vec![
671 Some(0),
672 Some(1),
673 Some(2),
674 Some(0),
675 None,
676 Some(2),
677 ]);
678 let values = LargeStringArray::from(vec!["a", "b", "c"]);
679 let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
680
681 let list_array = LargeListArray::try_new(
682 dict_field,
683 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
684 Arc::new(dict_array),
685 Some(NullBuffer::from_iter([true, true, false, true])),
686 )
687 .unwrap();
688
689 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
690
691 let mut buf = Vec::new();
692 {
693 let mut writer = LineDelimitedWriter::new(&mut buf);
694 writer.write_batches(&[&batch]).unwrap();
695 }
696
697 assert_json_eq(
698 &buf,
699 r#"{"l":["a","b","c"]}
700{"l":["a",null]}
701{}
702{"l":["c"]}
703"#,
704 );
705 }
706
707 #[test]
708 fn write_timestamps() {
709 let ts_string = "2018-11-13T17:11:10.011375885995";
710 let ts_nanos = ts_string
711 .parse::<chrono::NaiveDateTime>()
712 .unwrap()
713 .and_utc()
714 .timestamp_nanos_opt()
715 .unwrap();
716 let ts_micros = ts_nanos / 1000;
717 let ts_millis = ts_micros / 1000;
718 let ts_secs = ts_millis / 1000;
719
720 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
721 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
722 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
723 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
724 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
725
726 let schema = Schema::new(vec![
727 Field::new("nanos", arr_nanos.data_type().clone(), true),
728 Field::new("micros", arr_micros.data_type().clone(), true),
729 Field::new("millis", arr_millis.data_type().clone(), true),
730 Field::new("secs", arr_secs.data_type().clone(), true),
731 Field::new("name", arr_names.data_type().clone(), true),
732 ]);
733 let schema = Arc::new(schema);
734
735 let batch = RecordBatch::try_new(
736 schema,
737 vec![
738 Arc::new(arr_nanos),
739 Arc::new(arr_micros),
740 Arc::new(arr_millis),
741 Arc::new(arr_secs),
742 Arc::new(arr_names),
743 ],
744 )
745 .unwrap();
746
747 let mut buf = Vec::new();
748 {
749 let mut writer = LineDelimitedWriter::new(&mut buf);
750 writer.write_batches(&[&batch]).unwrap();
751 }
752
753 assert_json_eq(
754 &buf,
755 r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
756{"name":"b"}
757"#,
758 );
759
760 let mut buf = Vec::new();
761 {
762 let mut writer = WriterBuilder::new()
763 .with_timestamp_format("%m-%d-%Y".to_string())
764 .build::<_, LineDelimited>(&mut buf);
765 writer.write_batches(&[&batch]).unwrap();
766 }
767
768 assert_json_eq(
769 &buf,
770 r#"{"nanos":"11-13-2018","micros":"11-13-2018","millis":"11-13-2018","secs":"11-13-2018","name":"a"}
771{"name":"b"}
772"#,
773 );
774 }
775
776 #[test]
777 fn write_timestamps_with_tz() {
778 let ts_string = "2018-11-13T17:11:10.011375885995";
779 let ts_nanos = ts_string
780 .parse::<chrono::NaiveDateTime>()
781 .unwrap()
782 .and_utc()
783 .timestamp_nanos_opt()
784 .unwrap();
785 let ts_micros = ts_nanos / 1000;
786 let ts_millis = ts_micros / 1000;
787 let ts_secs = ts_millis / 1000;
788
789 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
790 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
791 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
792 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
793 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
794
795 let tz = "+00:00";
796
797 let arr_nanos = arr_nanos.with_timezone(tz);
798 let arr_micros = arr_micros.with_timezone(tz);
799 let arr_millis = arr_millis.with_timezone(tz);
800 let arr_secs = arr_secs.with_timezone(tz);
801
802 let schema = Schema::new(vec![
803 Field::new("nanos", arr_nanos.data_type().clone(), true),
804 Field::new("micros", arr_micros.data_type().clone(), true),
805 Field::new("millis", arr_millis.data_type().clone(), true),
806 Field::new("secs", arr_secs.data_type().clone(), true),
807 Field::new("name", arr_names.data_type().clone(), true),
808 ]);
809 let schema = Arc::new(schema);
810
811 let batch = RecordBatch::try_new(
812 schema,
813 vec![
814 Arc::new(arr_nanos),
815 Arc::new(arr_micros),
816 Arc::new(arr_millis),
817 Arc::new(arr_secs),
818 Arc::new(arr_names),
819 ],
820 )
821 .unwrap();
822
823 let mut buf = Vec::new();
824 {
825 let mut writer = LineDelimitedWriter::new(&mut buf);
826 writer.write_batches(&[&batch]).unwrap();
827 }
828
829 assert_json_eq(
830 &buf,
831 r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
832{"name":"b"}
833"#,
834 );
835
836 let mut buf = Vec::new();
837 {
838 let mut writer = WriterBuilder::new()
839 .with_timestamp_tz_format("%m-%d-%Y %Z".to_string())
840 .build::<_, LineDelimited>(&mut buf);
841 writer.write_batches(&[&batch]).unwrap();
842 }
843
844 assert_json_eq(
845 &buf,
846 r#"{"nanos":"11-13-2018 +00:00","micros":"11-13-2018 +00:00","millis":"11-13-2018 +00:00","secs":"11-13-2018 +00:00","name":"a"}
847{"name":"b"}
848"#,
849 );
850 }
851
852 #[test]
853 fn write_dates() {
854 let ts_string = "2018-11-13T17:11:10.011375885995";
855 let ts_millis = ts_string
856 .parse::<chrono::NaiveDateTime>()
857 .unwrap()
858 .and_utc()
859 .timestamp_millis();
860
861 let arr_date32 = Date32Array::from(vec![
862 Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
863 None,
864 ]);
865 let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
866 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
867
868 let schema = Schema::new(vec![
869 Field::new("date32", arr_date32.data_type().clone(), true),
870 Field::new("date64", arr_date64.data_type().clone(), true),
871 Field::new("name", arr_names.data_type().clone(), false),
872 ]);
873 let schema = Arc::new(schema);
874
875 let batch = RecordBatch::try_new(
876 schema,
877 vec![
878 Arc::new(arr_date32),
879 Arc::new(arr_date64),
880 Arc::new(arr_names),
881 ],
882 )
883 .unwrap();
884
885 let mut buf = Vec::new();
886 {
887 let mut writer = LineDelimitedWriter::new(&mut buf);
888 writer.write_batches(&[&batch]).unwrap();
889 }
890
891 assert_json_eq(
892 &buf,
893 r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
894{"name":"b"}
895"#,
896 );
897
898 let mut buf = Vec::new();
899 {
900 let mut writer = WriterBuilder::new()
901 .with_date_format("%m-%d-%Y".to_string())
902 .with_datetime_format("%m-%d-%Y %Mmin %Ssec %Hhour".to_string())
903 .build::<_, LineDelimited>(&mut buf);
904 writer.write_batches(&[&batch]).unwrap();
905 }
906
907 assert_json_eq(
908 &buf,
909 r#"{"date32":"11-13-2018","date64":"11-13-2018 11min 10sec 17hour","name":"a"}
910{"name":"b"}
911"#,
912 );
913 }
914
915 #[test]
916 fn write_times() {
917 let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
918 let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
919 let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
920 let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
921 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
922
923 let schema = Schema::new(vec![
924 Field::new("time32sec", arr_time32sec.data_type().clone(), true),
925 Field::new("time32msec", arr_time32msec.data_type().clone(), true),
926 Field::new("time64usec", arr_time64usec.data_type().clone(), true),
927 Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
928 Field::new("name", arr_names.data_type().clone(), true),
929 ]);
930 let schema = Arc::new(schema);
931
932 let batch = RecordBatch::try_new(
933 schema,
934 vec![
935 Arc::new(arr_time32sec),
936 Arc::new(arr_time32msec),
937 Arc::new(arr_time64usec),
938 Arc::new(arr_time64nsec),
939 Arc::new(arr_names),
940 ],
941 )
942 .unwrap();
943
944 let mut buf = Vec::new();
945 {
946 let mut writer = LineDelimitedWriter::new(&mut buf);
947 writer.write_batches(&[&batch]).unwrap();
948 }
949
950 assert_json_eq(
951 &buf,
952 r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
953{"name":"b"}
954"#,
955 );
956
957 let mut buf = Vec::new();
958 {
959 let mut writer = WriterBuilder::new()
960 .with_time_format("%H-%M-%S %f".to_string())
961 .build::<_, LineDelimited>(&mut buf);
962 writer.write_batches(&[&batch]).unwrap();
963 }
964
965 assert_json_eq(
966 &buf,
967 r#"{"time32sec":"00-02-00 000000000","time32msec":"00-00-00 120000000","time64usec":"00-00-00 000120000","time64nsec":"00-00-00 000000120","name":"a"}
968{"name":"b"}
969"#,
970 );
971 }
972
973 #[test]
974 fn write_durations() {
975 let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
976 let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
977 let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
978 let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
979 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
980
981 let schema = Schema::new(vec![
982 Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
983 Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
984 Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
985 Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
986 Field::new("name", arr_names.data_type().clone(), true),
987 ]);
988 let schema = Arc::new(schema);
989
990 let batch = RecordBatch::try_new(
991 schema,
992 vec![
993 Arc::new(arr_durationsec),
994 Arc::new(arr_durationmsec),
995 Arc::new(arr_durationusec),
996 Arc::new(arr_durationnsec),
997 Arc::new(arr_names),
998 ],
999 )
1000 .unwrap();
1001
1002 let mut buf = Vec::new();
1003 {
1004 let mut writer = LineDelimitedWriter::new(&mut buf);
1005 writer.write_batches(&[&batch]).unwrap();
1006 }
1007
1008 assert_json_eq(
1009 &buf,
1010 r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
1011{"name":"b"}
1012"#,
1013 );
1014 }
1015
1016 #[test]
1017 fn write_nested_structs() {
1018 let schema = Schema::new(vec![
1019 Field::new(
1020 "c1",
1021 DataType::Struct(Fields::from(vec![
1022 Field::new("c11", DataType::Int32, true),
1023 Field::new(
1024 "c12",
1025 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1026 false,
1027 ),
1028 ])),
1029 false,
1030 ),
1031 Field::new("c2", DataType::Utf8, false),
1032 ]);
1033
1034 let c1 = StructArray::from(vec![
1035 (
1036 Arc::new(Field::new("c11", DataType::Int32, true)),
1037 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1038 ),
1039 (
1040 Arc::new(Field::new(
1041 "c12",
1042 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1043 false,
1044 )),
1045 Arc::new(StructArray::from(vec![(
1046 Arc::new(Field::new("c121", DataType::Utf8, false)),
1047 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1048 )])) as ArrayRef,
1049 ),
1050 ]);
1051 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1052
1053 let batch =
1054 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1055
1056 let mut buf = Vec::new();
1057 {
1058 let mut writer = LineDelimitedWriter::new(&mut buf);
1059 writer.write_batches(&[&batch]).unwrap();
1060 }
1061
1062 assert_json_eq(
1063 &buf,
1064 r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
1065{"c1":{"c12":{"c121":"f"}},"c2":"b"}
1066{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
1067"#,
1068 );
1069 }
1070
1071 #[test]
1072 fn write_struct_with_list_field() {
1073 let field_c1 = Field::new(
1074 "c1",
1075 DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
1076 false,
1077 );
1078 let field_c2 = Field::new("c2", DataType::Int32, false);
1079 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1080
1081 let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
1082 let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
1084 let a_list_data = ArrayData::builder(field_c1.data_type().clone())
1085 .len(5)
1086 .add_buffer(a_value_offsets)
1087 .add_child_data(a_values.into_data())
1088 .null_bit_buffer(Some(Buffer::from([0b00011111])))
1089 .build()
1090 .unwrap();
1091 let a = ListArray::from(a_list_data);
1092
1093 let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1094
1095 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1096
1097 let mut buf = Vec::new();
1098 {
1099 let mut writer = LineDelimitedWriter::new(&mut buf);
1100 writer.write_batches(&[&batch]).unwrap();
1101 }
1102
1103 assert_json_eq(
1104 &buf,
1105 r#"{"c1":["a","a1"],"c2":1}
1106{"c1":["b"],"c2":2}
1107{"c1":["c"],"c2":3}
1108{"c1":["d"],"c2":4}
1109{"c1":["e"],"c2":5}
1110"#,
1111 );
1112 }
1113
1114 #[test]
1115 fn write_nested_list() {
1116 let list_inner_type = Field::new(
1117 "a",
1118 DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
1119 false,
1120 );
1121 let field_c1 = Field::new(
1122 "c1",
1123 DataType::List(Arc::new(list_inner_type.clone())),
1124 false,
1125 );
1126 let field_c2 = Field::new("c2", DataType::Utf8, true);
1127 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1128
1129 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1131
1132 let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1133 let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1135 .len(3)
1136 .add_buffer(a_value_offsets)
1137 .null_bit_buffer(Some(Buffer::from([0b00000111])))
1138 .add_child_data(a_values.into_data())
1139 .build()
1140 .unwrap();
1141
1142 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1143 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1144 .len(3)
1145 .add_buffer(c1_value_offsets)
1146 .add_child_data(a_list_data)
1147 .build()
1148 .unwrap();
1149
1150 let c1 = ListArray::from(c1_list_data);
1151 let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1152
1153 let batch =
1154 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1155
1156 let mut buf = Vec::new();
1157 {
1158 let mut writer = LineDelimitedWriter::new(&mut buf);
1159 writer.write_batches(&[&batch]).unwrap();
1160 }
1161
1162 assert_json_eq(
1163 &buf,
1164 r#"{"c1":[[1,2],[3]],"c2":"foo"}
1165{"c1":[],"c2":"bar"}
1166{"c1":[[4,5,6]]}
1167"#,
1168 );
1169 }
1170
1171 #[test]
1172 fn write_list_of_struct() {
1173 let field_c1 = Field::new(
1174 "c1",
1175 DataType::List(Arc::new(Field::new(
1176 "s",
1177 DataType::Struct(Fields::from(vec![
1178 Field::new("c11", DataType::Int32, true),
1179 Field::new(
1180 "c12",
1181 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1182 false,
1183 ),
1184 ])),
1185 false,
1186 ))),
1187 true,
1188 );
1189 let field_c2 = Field::new("c2", DataType::Int32, false);
1190 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1191
1192 let struct_values = StructArray::from(vec![
1193 (
1194 Arc::new(Field::new("c11", DataType::Int32, true)),
1195 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1196 ),
1197 (
1198 Arc::new(Field::new(
1199 "c12",
1200 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1201 false,
1202 )),
1203 Arc::new(StructArray::from(vec![(
1204 Arc::new(Field::new("c121", DataType::Utf8, false)),
1205 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1206 )])) as ArrayRef,
1207 ),
1208 ]);
1209
1210 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1215 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1216 .len(3)
1217 .add_buffer(c1_value_offsets)
1218 .add_child_data(struct_values.into_data())
1219 .null_bit_buffer(Some(Buffer::from([0b00000101])))
1220 .build()
1221 .unwrap();
1222 let c1 = ListArray::from(c1_list_data);
1223
1224 let c2 = Int32Array::from(vec![1, 2, 3]);
1225
1226 let batch =
1227 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1228
1229 let mut buf = Vec::new();
1230 {
1231 let mut writer = LineDelimitedWriter::new(&mut buf);
1232 writer.write_batches(&[&batch]).unwrap();
1233 }
1234
1235 assert_json_eq(
1236 &buf,
1237 r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1238{"c2":2}
1239{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1240"#,
1241 );
1242 }
1243
1244 fn assert_write_list_view<O: OffsetSizeTrait>() {
1245 let field = Arc::new(Field::new("item", DataType::Int32, true));
1246 let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
1247 let schema = Schema::new(vec![Field::new("lv", data_type, true)]);
1248
1249 let values = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), None, Some(6)]);
1251 let offsets = [0, 3, 0, 5]
1252 .iter()
1253 .map(|&v| O::from_usize(v).unwrap())
1254 .collect::<Vec<_>>();
1255 let sizes = [3, 2, 0, 1]
1256 .iter()
1257 .map(|&v| O::from_usize(v).unwrap())
1258 .collect::<Vec<_>>();
1259 let list_view = GenericListViewArray::<O>::try_new(
1260 field,
1261 ScalarBuffer::from(offsets),
1262 ScalarBuffer::from(sizes),
1263 Arc::new(values),
1264 Some(NullBuffer::from_iter([true, true, false, true])),
1265 )
1266 .unwrap();
1267
1268 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
1269
1270 let mut buf = Vec::new();
1271 {
1272 let mut writer = LineDelimitedWriter::new(&mut buf);
1273 writer.write_batches(&[&batch]).unwrap();
1274 }
1275
1276 assert_json_eq(
1277 &buf,
1278 r#"{"lv":[1,2,3]}
1279{"lv":[4,null]}
1280{}
1281{"lv":[6]}
1282"#,
1283 );
1284 }
1285
1286 #[test]
1287 fn write_list_view() {
1288 assert_write_list_view::<i32>();
1289 assert_write_list_view::<i64>();
1290 }
1291
1292 fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1293 let file = File::open(test_file).unwrap();
1294 let mut reader = BufReader::new(file);
1295 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1296 reader.rewind().unwrap();
1297
1298 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1299 let mut reader = builder.build(reader).unwrap();
1300 let batch = reader.next().unwrap().unwrap();
1301
1302 let mut buf = Vec::new();
1303 {
1304 if remove_nulls {
1305 let mut writer = LineDelimitedWriter::new(&mut buf);
1306 writer.write_batches(&[&batch]).unwrap();
1307 } else {
1308 let mut writer = WriterBuilder::new()
1309 .with_explicit_nulls(true)
1310 .build::<_, LineDelimited>(&mut buf);
1311 writer.write_batches(&[&batch]).unwrap();
1312 }
1313 }
1314
1315 let result = str::from_utf8(&buf).unwrap();
1316 let expected = read_to_string(test_file).unwrap();
1317 for (r, e) in result.lines().zip(expected.lines()) {
1318 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1319 if remove_nulls {
1320 if let Value::Object(obj) = expected_json {
1322 expected_json =
1323 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1324 }
1325 }
1326 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1327 }
1328 }
1329
1330 #[test]
1331 fn write_basic_rows() {
1332 test_write_for_file("test/data/basic.json", true);
1333 }
1334
1335 #[test]
1336 fn write_arrays() {
1337 test_write_for_file("test/data/arrays.json", true);
1338 }
1339
1340 #[test]
1341 fn write_basic_nulls() {
1342 test_write_for_file("test/data/basic_nulls.json", true);
1343 }
1344
1345 #[test]
1346 fn write_nested_with_nulls() {
1347 test_write_for_file("test/data/nested_with_nulls.json", false);
1348 }
1349
1350 #[test]
1351 fn json_line_writer_empty() {
1352 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1353 writer.finish().unwrap();
1354 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1355 }
1356
1357 #[test]
1358 fn json_array_writer_empty() {
1359 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1360 writer.finish().unwrap();
1361 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1362 }
1363
1364 #[test]
1365 fn json_line_writer_empty_batch() {
1366 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1367
1368 let array = Int32Array::from(Vec::<i32>::new());
1369 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1370 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1371
1372 writer.write(&batch).unwrap();
1373 writer.finish().unwrap();
1374 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1375 }
1376
1377 #[test]
1378 fn json_array_writer_empty_batch() {
1379 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1380
1381 let array = Int32Array::from(Vec::<i32>::new());
1382 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1383 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1384
1385 writer.write(&batch).unwrap();
1386 writer.finish().unwrap();
1387 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1388 }
1389
1390 #[test]
1391 fn json_struct_array_nulls() {
1392 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1393 Some(vec![Some(1), Some(2)]),
1394 Some(vec![None]),
1395 Some(vec![]),
1396 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1398 None, None,
1400 ]);
1401
1402 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1403 let array = Arc::new(inner) as ArrayRef;
1404 let struct_array_a = StructArray::from((
1405 vec![(field.clone(), array.clone())],
1406 Buffer::from([0b01010111]),
1407 ));
1408 let struct_array_b = StructArray::from(vec![(field, array)]);
1409
1410 let schema = Schema::new(vec![
1411 Field::new_struct("a", struct_array_a.fields().clone(), true),
1412 Field::new_struct("b", struct_array_b.fields().clone(), true),
1413 ]);
1414
1415 let batch = RecordBatch::try_new(
1416 Arc::new(schema),
1417 vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1418 )
1419 .unwrap();
1420
1421 let mut buf = Vec::new();
1422 {
1423 let mut writer = LineDelimitedWriter::new(&mut buf);
1424 writer.write_batches(&[&batch]).unwrap();
1425 }
1426
1427 assert_json_eq(
1428 &buf,
1429 r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1430{"a":{"list":[null]},"b":{"list":[null]}}
1431{"a":{"list":[]},"b":{"list":[]}}
1432{"b":{"list":[3,null]}}
1433{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1434{"b":{}}
1435{"a":{},"b":{}}
1436"#,
1437 );
1438 }
1439
1440 fn run_json_writer_map_with_keys(keys_array: ArrayRef) {
1441 let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1442
1443 let keys_field = Arc::new(Field::new("keys", keys_array.data_type().clone(), false));
1444 let values_field = Arc::new(Field::new("values", DataType::Int64, false));
1445 let entry_struct = StructArray::from(vec![
1446 (keys_field, keys_array.clone()),
1447 (values_field, Arc::new(values_array) as ArrayRef),
1448 ]);
1449
1450 let map_data_type = DataType::Map(
1451 Arc::new(Field::new(
1452 "entries",
1453 entry_struct.data_type().clone(),
1454 false,
1455 )),
1456 false,
1457 );
1458
1459 let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1461 let valid_buffer = Buffer::from([0b00111101]);
1462
1463 let map_data = ArrayData::builder(map_data_type.clone())
1464 .len(6)
1465 .null_bit_buffer(Some(valid_buffer))
1466 .add_buffer(entry_offsets)
1467 .add_child_data(entry_struct.into_data())
1468 .build()
1469 .unwrap();
1470
1471 let map = MapArray::from(map_data);
1472
1473 let map_field = Field::new("map", map_data_type, true);
1474 let schema = Arc::new(Schema::new(vec![map_field]));
1475
1476 let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1477
1478 let mut buf = Vec::new();
1479 {
1480 let mut writer = LineDelimitedWriter::new(&mut buf);
1481 writer.write_batches(&[&batch]).unwrap();
1482 }
1483
1484 assert_json_eq(
1485 &buf,
1486 r#"{"map":{"foo":10}}
1487{}
1488{"map":{}}
1489{"map":{"bar":20,"baz":30,"qux":40}}
1490{"map":{"quux":50}}
1491{"map":{}}
1492"#,
1493 );
1494 }
1495
1496 #[test]
1497 fn json_writer_map() {
1498 let keys_utf8 = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1500 run_json_writer_map_with_keys(Arc::new(keys_utf8) as ArrayRef);
1501
1502 let keys_large = super::LargeStringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1504 run_json_writer_map_with_keys(Arc::new(keys_large) as ArrayRef);
1505
1506 let keys_view = super::StringViewArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1508 run_json_writer_map_with_keys(Arc::new(keys_view) as ArrayRef);
1509 }
1510
1511 #[test]
1512 fn test_write_single_batch() {
1513 let test_file = "test/data/basic.json";
1514 let file = File::open(test_file).unwrap();
1515 let mut reader = BufReader::new(file);
1516 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1517 reader.rewind().unwrap();
1518
1519 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1520 let mut reader = builder.build(reader).unwrap();
1521 let batch = reader.next().unwrap().unwrap();
1522
1523 let mut buf = Vec::new();
1524 {
1525 let mut writer = LineDelimitedWriter::new(&mut buf);
1526 writer.write(&batch).unwrap();
1527 }
1528
1529 let result = str::from_utf8(&buf).unwrap();
1530 let expected = read_to_string(test_file).unwrap();
1531 for (r, e) in result.lines().zip(expected.lines()) {
1532 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1533 if let Value::Object(obj) = expected_json {
1535 expected_json =
1536 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1537 }
1538 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1539 }
1540 }
1541
1542 #[test]
1543 fn test_write_multi_batches() {
1544 let test_file = "test/data/basic.json";
1545
1546 let schema = SchemaRef::new(Schema::new(vec![
1547 Field::new("a", DataType::Int64, true),
1548 Field::new("b", DataType::Float64, true),
1549 Field::new("c", DataType::Boolean, true),
1550 Field::new("d", DataType::Utf8, true),
1551 Field::new("e", DataType::Utf8, true),
1552 Field::new("f", DataType::Utf8, true),
1553 Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1554 Field::new("h", DataType::Float16, true),
1555 ]));
1556
1557 let mut reader = ReaderBuilder::new(schema.clone())
1558 .build(BufReader::new(File::open(test_file).unwrap()))
1559 .unwrap();
1560 let batch = reader.next().unwrap().unwrap();
1561
1562 let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1564
1565 let mut buf = Vec::new();
1566 {
1567 let mut writer = LineDelimitedWriter::new(&mut buf);
1568 writer.write_batches(&batches).unwrap();
1569 }
1570
1571 let result = str::from_utf8(&buf).unwrap();
1572 let expected = read_to_string(test_file).unwrap();
1573 let expected = format!("{expected}\n{expected}");
1575 for (r, e) in result.lines().zip(expected.lines()) {
1576 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1577 if let Value::Object(obj) = expected_json {
1579 expected_json =
1580 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1581 }
1582 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1583 }
1584 }
1585
1586 #[test]
1587 fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1588 fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1589 let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1590 Some(vec![None, None, None]),
1591 Some(vec![Some(1), Some(2), Some(3)]),
1592 None,
1593 Some(vec![None, None, None]),
1594 ]));
1595 let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1596 (array, field)
1598 }
1599
1600 fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1601 let array = Arc::new(DictionaryArray::from_iter(vec![
1602 Some("cupcakes"),
1603 None,
1604 Some("bear"),
1605 Some("kuma"),
1606 ]));
1607 let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1608 (array, field)
1610 }
1611
1612 fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1613 let string_builder = StringBuilder::new();
1614 let int_builder = Int64Builder::new();
1615 let mut builder = MapBuilder::new(None, string_builder, int_builder);
1616
1617 builder.keys().append_value("foo");
1619 builder.values().append_value(10);
1620 builder.append(true).unwrap();
1621
1622 builder.append(false).unwrap();
1623
1624 builder.append(true).unwrap();
1625
1626 builder.keys().append_value("bar");
1627 builder.values().append_value(20);
1628 builder.keys().append_value("baz");
1629 builder.values().append_value(30);
1630 builder.keys().append_value("qux");
1631 builder.values().append_value(40);
1632 builder.append(true).unwrap();
1633
1634 let array = Arc::new(builder.finish());
1635 let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1636 (array, field)
1637 }
1638
1639 fn root_list() -> (Arc<ListArray>, Field) {
1640 let struct_array = StructArray::from(vec![
1641 (
1642 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1643 Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1644 ),
1645 (
1646 Arc::new(Field::new("int32", DataType::Int32, true)),
1647 Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1648 ),
1649 ]);
1650
1651 let field = Field::new_list(
1652 "list",
1653 Field::new("struct", struct_array.data_type().clone(), true),
1654 true,
1655 );
1656
1657 let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1659 let data = ArrayData::builder(field.data_type().clone())
1660 .len(4)
1661 .add_buffer(entry_offsets)
1662 .add_child_data(struct_array.into_data())
1663 .null_bit_buffer(Some([0b00000101].into()))
1664 .build()
1665 .unwrap();
1666 let array = Arc::new(ListArray::from(data));
1667 (array, field)
1668 }
1669
1670 let (nested_list_array, nested_list_field) = nested_list();
1671 let (nested_dict_array, nested_dict_field) = nested_dict();
1672 let (nested_map_array, nested_map_field) = nested_map();
1673 let (root_list_array, root_list_field) = root_list();
1674
1675 let schema = Schema::new(vec![
1676 Field::new("date", DataType::Date32, true),
1677 Field::new("null", DataType::Null, true),
1678 Field::new_struct(
1679 "struct",
1680 vec![
1681 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1682 nested_list_field.clone(),
1683 nested_dict_field.clone(),
1684 nested_map_field.clone(),
1685 ],
1686 true,
1687 ),
1688 root_list_field,
1689 ]);
1690
1691 let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1692 let arr_null = NullArray::new(4);
1693 let arr_struct = StructArray::from(vec![
1694 (
1696 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1697 Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1698 ),
1699 (nested_list_field, nested_list_array as ArrayRef),
1701 (nested_dict_field, nested_dict_array as ArrayRef),
1703 (nested_map_field, nested_map_array as ArrayRef),
1705 ]);
1706
1707 let batch = RecordBatch::try_new(
1708 Arc::new(schema),
1709 vec![
1710 Arc::new(arr_date32),
1712 Arc::new(arr_null),
1714 Arc::new(arr_struct),
1715 root_list_array,
1717 ],
1718 )?;
1719
1720 let mut buf = Vec::new();
1721 {
1722 let mut writer = WriterBuilder::new()
1723 .with_explicit_nulls(true)
1724 .build::<_, JsonArray>(&mut buf);
1725 writer.write_batches(&[&batch])?;
1726 writer.finish()?;
1727 }
1728
1729 let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1730 let expected = serde_json::from_value::<Vec<Value>>(json!([
1731 {
1732 "date": "1970-01-01",
1733 "list": [
1734 {
1735 "int32": 1,
1736 "utf8": "a"
1737 },
1738 {
1739 "int32": null,
1740 "utf8": "b"
1741 }
1742 ],
1743 "null": null,
1744 "struct": {
1745 "dict": "cupcakes",
1746 "list": [
1747 null,
1748 null,
1749 null
1750 ],
1751 "map": {
1752 "foo": 10
1753 },
1754 "utf8": "a"
1755 }
1756 },
1757 {
1758 "date": null,
1759 "list": null,
1760 "null": null,
1761 "struct": {
1762 "dict": null,
1763 "list": [
1764 1,
1765 2,
1766 3
1767 ],
1768 "map": null,
1769 "utf8": null
1770 }
1771 },
1772 {
1773 "date": "1970-01-02",
1774 "list": [
1775 {
1776 "int32": 5,
1777 "utf8": null
1778 }
1779 ],
1780 "null": null,
1781 "struct": {
1782 "dict": "bear",
1783 "list": null,
1784 "map": {},
1785 "utf8": null
1786 }
1787 },
1788 {
1789 "date": null,
1790 "list": null,
1791 "null": null,
1792 "struct": {
1793 "dict": "kuma",
1794 "list": [
1795 null,
1796 null,
1797 null
1798 ],
1799 "map": {
1800 "bar": 20,
1801 "baz": 30,
1802 "qux": 40
1803 },
1804 "utf8": "b"
1805 }
1806 }
1807 ]))
1808 .unwrap();
1809
1810 assert_eq!(actual, expected);
1811
1812 Ok(())
1813 }
1814
1815 fn build_array_binary<O: OffsetSizeTrait>(values: &[Option<&[u8]>]) -> RecordBatch {
1816 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1817 "bytes",
1818 GenericBinaryType::<O>::DATA_TYPE,
1819 true,
1820 )]));
1821 let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1822 for value in values {
1823 match value {
1824 Some(v) => builder.append_value(v),
1825 None => builder.append_null(),
1826 }
1827 }
1828 let array = Arc::new(builder.finish()) as ArrayRef;
1829 RecordBatch::try_new(schema, vec![array]).unwrap()
1830 }
1831
1832 fn build_array_binary_view(values: &[Option<&[u8]>]) -> RecordBatch {
1833 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1834 "bytes",
1835 DataType::BinaryView,
1836 true,
1837 )]));
1838 let mut builder = BinaryViewBuilder::new();
1839 for value in values {
1840 match value {
1841 Some(v) => builder.append_value(v),
1842 None => builder.append_null(),
1843 }
1844 }
1845 let array = Arc::new(builder.finish()) as ArrayRef;
1846 RecordBatch::try_new(schema, vec![array]).unwrap()
1847 }
1848
1849 fn assert_binary_json(batch: &RecordBatch) {
1850 {
1852 let mut buf = Vec::new();
1853 let json_value: Value = {
1854 let mut writer = WriterBuilder::new()
1855 .with_explicit_nulls(true)
1856 .build::<_, JsonArray>(&mut buf);
1857 writer.write(batch).unwrap();
1858 writer.close().unwrap();
1859 serde_json::from_slice(&buf).unwrap()
1860 };
1861
1862 assert_eq!(
1863 json!([
1864 {
1865 "bytes": "4e656420466c616e64657273"
1866 },
1867 {
1868 "bytes": null },
1870 {
1871 "bytes": "54726f79204d63436c757265"
1872 }
1873 ]),
1874 json_value,
1875 );
1876 }
1877
1878 {
1880 let mut buf = Vec::new();
1881 let json_value: Value = {
1882 let mut writer = ArrayWriter::new(&mut buf);
1885 writer.write(batch).unwrap();
1886 writer.close().unwrap();
1887 serde_json::from_slice(&buf).unwrap()
1888 };
1889
1890 assert_eq!(
1891 json!([
1892 { "bytes": "4e656420466c616e64657273" },
1893 {},
1894 { "bytes": "54726f79204d63436c757265" }
1895 ]),
1896 json_value
1897 );
1898 }
1899 }
1900
1901 #[test]
1902 fn test_writer_binary() {
1903 let values: [Option<&[u8]>; 3] = [
1904 Some(b"Ned Flanders" as &[u8]),
1905 None,
1906 Some(b"Troy McClure" as &[u8]),
1907 ];
1908 {
1910 let batch = build_array_binary::<i32>(&values);
1911 assert_binary_json(&batch);
1912 }
1913 {
1915 let batch = build_array_binary::<i64>(&values);
1916 assert_binary_json(&batch);
1917 }
1918 {
1919 let batch = build_array_binary_view(&values);
1920 assert_binary_json(&batch);
1921 }
1922 }
1923
1924 #[test]
1925 fn test_writer_fixed_size_binary() {
1926 let size = 11;
1928 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1929 "bytes",
1930 DataType::FixedSizeBinary(size),
1931 true,
1932 )]));
1933
1934 let mut builder = FixedSizeBinaryBuilder::new(size);
1936 let values = [Some(b"hello world"), None, Some(b"summer rain")];
1937 for value in values {
1938 match value {
1939 Some(v) => builder.append_value(v).unwrap(),
1940 None => builder.append_null(),
1941 }
1942 }
1943 let array = Arc::new(builder.finish()) as ArrayRef;
1944 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1945
1946 {
1948 let mut buf = Vec::new();
1949 let json_value: Value = {
1950 let mut writer = WriterBuilder::new()
1951 .with_explicit_nulls(true)
1952 .build::<_, JsonArray>(&mut buf);
1953 writer.write(&batch).unwrap();
1954 writer.close().unwrap();
1955 serde_json::from_slice(&buf).unwrap()
1956 };
1957
1958 assert_eq!(
1959 json!([
1960 {
1961 "bytes": "68656c6c6f20776f726c64"
1962 },
1963 {
1964 "bytes": null },
1966 {
1967 "bytes": "73756d6d6572207261696e"
1968 }
1969 ]),
1970 json_value,
1971 );
1972 }
1973 {
1975 let mut buf = Vec::new();
1976 let json_value: Value = {
1977 let mut writer = ArrayWriter::new(&mut buf);
1980 writer.write(&batch).unwrap();
1981 writer.close().unwrap();
1982 serde_json::from_slice(&buf).unwrap()
1983 };
1984
1985 assert_eq!(
1986 json!([
1987 {
1988 "bytes": "68656c6c6f20776f726c64"
1989 },
1990 {}, {
1992 "bytes": "73756d6d6572207261696e"
1993 }
1994 ]),
1995 json_value,
1996 );
1997 }
1998 }
1999
2000 #[test]
2001 fn test_writer_fixed_size_list() {
2002 let size = 3;
2003 let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
2004 let schema = SchemaRef::new(Schema::new(vec![Field::new(
2005 "list",
2006 DataType::FixedSizeList(field, size),
2007 true,
2008 )]));
2009
2010 let values_builder = Int32Builder::new();
2011 let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
2012 let lists = [
2013 Some([Some(1), Some(2), None]),
2014 Some([Some(3), None, Some(4)]),
2015 Some([None, Some(5), Some(6)]),
2016 None,
2017 ];
2018 for list in lists {
2019 match list {
2020 Some(l) => {
2021 for value in l {
2022 match value {
2023 Some(v) => list_builder.values().append_value(v),
2024 None => list_builder.values().append_null(),
2025 }
2026 }
2027 list_builder.append(true);
2028 }
2029 None => {
2030 for _ in 0..size {
2031 list_builder.values().append_null();
2032 }
2033 list_builder.append(false);
2034 }
2035 }
2036 }
2037 let array = Arc::new(list_builder.finish()) as ArrayRef;
2038 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2039
2040 {
2042 let json_value: Value = {
2043 let mut buf = Vec::new();
2044 let mut writer = WriterBuilder::new()
2045 .with_explicit_nulls(true)
2046 .build::<_, JsonArray>(&mut buf);
2047 writer.write(&batch).unwrap();
2048 writer.close().unwrap();
2049 serde_json::from_slice(&buf).unwrap()
2050 };
2051 assert_eq!(
2052 json!([
2053 {"list": [1, 2, null]},
2054 {"list": [3, null, 4]},
2055 {"list": [null, 5, 6]},
2056 {"list": null},
2057 ]),
2058 json_value
2059 );
2060 }
2061 {
2063 let json_value: Value = {
2064 let mut buf = Vec::new();
2065 let mut writer = ArrayWriter::new(&mut buf);
2066 writer.write(&batch).unwrap();
2067 writer.close().unwrap();
2068 serde_json::from_slice(&buf).unwrap()
2069 };
2070 assert_eq!(
2071 json!([
2072 {"list": [1, 2, null]},
2073 {"list": [3, null, 4]},
2074 {"list": [null, 5, 6]},
2075 {}, ]),
2077 json_value
2078 );
2079 }
2080 }
2081
2082 #[test]
2083 fn test_writer_null_dict() {
2084 let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
2085 let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
2086 let dict = DictionaryArray::new(keys, values);
2087
2088 let schema = SchemaRef::new(Schema::new(vec![Field::new(
2089 "my_dict",
2090 DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
2091 true,
2092 )]));
2093
2094 let array = Arc::new(dict) as ArrayRef;
2095 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2096
2097 let mut json = Vec::new();
2098 let write_builder = WriterBuilder::new().with_explicit_nulls(true);
2099 let mut writer = write_builder.build::<_, JsonArray>(&mut json);
2100 writer.write(&batch).unwrap();
2101 writer.close().unwrap();
2102
2103 let json_str = str::from_utf8(&json).unwrap();
2104 assert_eq!(
2105 json_str,
2106 r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
2107 )
2108 }
2109
2110 #[test]
2111 fn test_decimal32_encoder() {
2112 let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
2113 .with_precision_and_scale(8, 2)
2114 .unwrap();
2115 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2116 let schema = Schema::new(vec![field]);
2117 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2118
2119 let mut buf = Vec::new();
2120 {
2121 let mut writer = LineDelimitedWriter::new(&mut buf);
2122 writer.write_batches(&[&batch]).unwrap();
2123 }
2124
2125 assert_json_eq(
2126 &buf,
2127 r#"{"decimal":12.34}
2128{"decimal":56.78}
2129{"decimal":90.12}
2130"#,
2131 );
2132 }
2133
2134 #[test]
2135 fn test_decimal64_encoder() {
2136 let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
2137 .with_precision_and_scale(10, 2)
2138 .unwrap();
2139 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2140 let schema = Schema::new(vec![field]);
2141 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2142
2143 let mut buf = Vec::new();
2144 {
2145 let mut writer = LineDelimitedWriter::new(&mut buf);
2146 writer.write_batches(&[&batch]).unwrap();
2147 }
2148
2149 assert_json_eq(
2150 &buf,
2151 r#"{"decimal":12.34}
2152{"decimal":56.78}
2153{"decimal":90.12}
2154"#,
2155 );
2156 }
2157
2158 #[test]
2159 fn test_decimal128_encoder() {
2160 let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
2161 .with_precision_and_scale(10, 2)
2162 .unwrap();
2163 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2164 let schema = Schema::new(vec![field]);
2165 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2166
2167 let mut buf = Vec::new();
2168 {
2169 let mut writer = LineDelimitedWriter::new(&mut buf);
2170 writer.write_batches(&[&batch]).unwrap();
2171 }
2172
2173 assert_json_eq(
2174 &buf,
2175 r#"{"decimal":12.34}
2176{"decimal":56.78}
2177{"decimal":90.12}
2178"#,
2179 );
2180 }
2181
2182 #[test]
2183 fn test_decimal256_encoder() {
2184 let array = Decimal256Array::from_iter_values([
2185 i256::from(123400),
2186 i256::from(567800),
2187 i256::from(901200),
2188 ])
2189 .with_precision_and_scale(10, 4)
2190 .unwrap();
2191 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2192 let schema = Schema::new(vec![field]);
2193 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2194
2195 let mut buf = Vec::new();
2196 {
2197 let mut writer = LineDelimitedWriter::new(&mut buf);
2198 writer.write_batches(&[&batch]).unwrap();
2199 }
2200
2201 assert_json_eq(
2202 &buf,
2203 r#"{"decimal":12.3400}
2204{"decimal":56.7800}
2205{"decimal":90.1200}
2206"#,
2207 );
2208 }
2209
2210 #[test]
2211 fn test_decimal_encoder_with_nulls() {
2212 let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
2213 .with_precision_and_scale(10, 2)
2214 .unwrap();
2215 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2216 let schema = Schema::new(vec![field]);
2217 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2218
2219 let mut buf = Vec::new();
2220 {
2221 let mut writer = LineDelimitedWriter::new(&mut buf);
2222 writer.write_batches(&[&batch]).unwrap();
2223 }
2224
2225 assert_json_eq(
2226 &buf,
2227 r#"{"decimal":12.34}
2228{}
2229{"decimal":56.78}
2230"#,
2231 );
2232 }
2233
2234 #[test]
2235 fn write_structs_as_list() {
2236 let schema = Schema::new(vec![
2237 Field::new(
2238 "c1",
2239 DataType::Struct(Fields::from(vec![
2240 Field::new("c11", DataType::Int32, true),
2241 Field::new(
2242 "c12",
2243 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2244 false,
2245 ),
2246 ])),
2247 false,
2248 ),
2249 Field::new("c2", DataType::Utf8, false),
2250 ]);
2251
2252 let c1 = StructArray::from(vec![
2253 (
2254 Arc::new(Field::new("c11", DataType::Int32, true)),
2255 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2256 ),
2257 (
2258 Arc::new(Field::new(
2259 "c12",
2260 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2261 false,
2262 )),
2263 Arc::new(StructArray::from(vec![(
2264 Arc::new(Field::new("c121", DataType::Utf8, false)),
2265 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2266 )])) as ArrayRef,
2267 ),
2268 ]);
2269 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2270
2271 let batch =
2272 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2273
2274 let expected = r#"[[1,["e"]],"a"]
2275[[null,["f"]],"b"]
2276[[5,["g"]],"c"]
2277"#;
2278
2279 let mut buf = Vec::new();
2280 {
2281 let builder = WriterBuilder::new()
2282 .with_explicit_nulls(true)
2283 .with_struct_mode(StructMode::ListOnly);
2284 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2285 writer.write_batches(&[&batch]).unwrap();
2286 }
2287 assert_json_eq(&buf, expected);
2288
2289 let mut buf = Vec::new();
2290 {
2291 let builder = WriterBuilder::new()
2292 .with_explicit_nulls(false)
2293 .with_struct_mode(StructMode::ListOnly);
2294 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2295 writer.write_batches(&[&batch]).unwrap();
2296 }
2297 assert_json_eq(&buf, expected);
2298 }
2299
2300 fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2301 #[derive(Debug)]
2304 enum UnionValue {
2305 Int32(i32),
2306 String(String),
2307 }
2308
2309 #[derive(Debug)]
2310 struct UnionEncoder {
2311 array: Vec<Option<UnionValue>>,
2312 }
2313
2314 impl Encoder for UnionEncoder {
2315 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2316 match &self.array[idx] {
2317 None => out.extend_from_slice(b"null"),
2318 Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2319 Some(UnionValue::String(v)) => {
2320 out.extend_from_slice(format!("\"{v}\"").as_bytes())
2321 }
2322 }
2323 }
2324 }
2325
2326 #[derive(Debug)]
2327 struct UnionEncoderFactory;
2328
2329 impl EncoderFactory for UnionEncoderFactory {
2330 fn make_default_encoder<'a>(
2331 &self,
2332 _field: &'a FieldRef,
2333 array: &'a dyn Array,
2334 _options: &'a EncoderOptions,
2335 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2336 let data_type = array.data_type();
2337 let fields = match data_type {
2338 DataType::Union(fields, UnionMode::Sparse) => fields,
2339 _ => return Ok(None),
2340 };
2341 let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2343 for f in fields.iter() {
2344 match f.data_type() {
2345 DataType::Null => {}
2346 DataType::Int32 => {}
2347 DataType::Utf8 => {}
2348 _ => return Ok(None),
2349 }
2350 }
2351 let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2352 let mut values = Vec::with_capacity(type_ids.len());
2353 for idx in 0..type_ids.len() {
2354 let type_id = type_ids[idx];
2355 let field = &fields[type_id as usize];
2356 let value = match field.data_type() {
2357 DataType::Null => None,
2358 DataType::Int32 => Some(UnionValue::Int32(
2359 buffers[type_id as usize]
2360 .as_primitive::<Int32Type>()
2361 .value(idx),
2362 )),
2363 DataType::Utf8 => Some(UnionValue::String(
2364 buffers[type_id as usize]
2365 .as_string::<i32>()
2366 .value(idx)
2367 .to_string(),
2368 )),
2369 _ => unreachable!(),
2370 };
2371 values.push(value);
2372 }
2373 let array_encoder =
2374 Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2375 let nulls = array.nulls().cloned();
2376 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2377 }
2378 }
2379
2380 let int_array = Int32Array::from(vec![Some(1), None, None]);
2381 let string_array = StringArray::from(vec![None, Some("a"), None]);
2382 let null_array = NullArray::new(3);
2383 let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2384
2385 let union_fields = [
2386 (0, Arc::new(Field::new("A", DataType::Int32, false))),
2387 (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2388 (2, Arc::new(Field::new("C", DataType::Null, false))),
2389 ]
2390 .into_iter()
2391 .collect::<UnionFields>();
2392
2393 let children = vec![
2394 Arc::new(int_array) as Arc<dyn Array>,
2395 Arc::new(string_array),
2396 Arc::new(null_array),
2397 ];
2398
2399 let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2400
2401 let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2402
2403 let fields = vec![
2404 Field::new(
2405 "union",
2406 DataType::Union(union_fields, UnionMode::Sparse),
2407 true,
2408 ),
2409 Field::new("float", DataType::Float64, true),
2410 ];
2411
2412 let batch = RecordBatch::try_new(
2413 Arc::new(Schema::new(fields)),
2414 vec![
2415 Arc::new(array) as Arc<dyn Array>,
2416 Arc::new(float_array) as Arc<dyn Array>,
2417 ],
2418 )
2419 .unwrap();
2420
2421 (batch, Arc::new(UnionEncoderFactory))
2422 }
2423
2424 #[test]
2425 fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2426 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2427
2428 let mut buf = Vec::new();
2429 {
2430 let mut writer = WriterBuilder::new()
2431 .with_encoder_factory(encoder_factory)
2432 .with_explicit_nulls(false)
2433 .build::<_, LineDelimited>(&mut buf);
2434 writer.write_batches(&[&batch]).unwrap();
2435 writer.finish().unwrap();
2436 }
2437
2438 println!("{}", str::from_utf8(&buf).unwrap());
2439
2440 assert_json_eq(
2441 &buf,
2442 r#"{"union":1,"float":1.0}
2443{"union":"a"}
2444{"union":null,"float":3.4}
2445"#,
2446 );
2447 }
2448
2449 #[test]
2450 fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2451 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2452
2453 let mut buf = Vec::new();
2454 {
2455 let mut writer = WriterBuilder::new()
2456 .with_encoder_factory(encoder_factory)
2457 .with_explicit_nulls(true)
2458 .build::<_, LineDelimited>(&mut buf);
2459 writer.write_batches(&[&batch]).unwrap();
2460 writer.finish().unwrap();
2461 }
2462
2463 assert_json_eq(
2464 &buf,
2465 r#"{"union":1,"float":1.0}
2466{"union":"a","float":null}
2467{"union":null,"float":3.4}
2468"#,
2469 );
2470 }
2471
2472 #[test]
2473 fn test_fallback_encoder_factory_array_implicit_nulls() {
2474 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2475
2476 let json_value: Value = {
2477 let mut buf = Vec::new();
2478 let mut writer = WriterBuilder::new()
2479 .with_encoder_factory(encoder_factory)
2480 .build::<_, JsonArray>(&mut buf);
2481 writer.write_batches(&[&batch]).unwrap();
2482 writer.finish().unwrap();
2483 serde_json::from_slice(&buf).unwrap()
2484 };
2485
2486 let expected = json!([
2487 {"union":1,"float":1.0},
2488 {"union":"a"},
2489 {"float":3.4,"union":null},
2490 ]);
2491
2492 assert_eq!(json_value, expected);
2493 }
2494
2495 #[test]
2496 fn test_fallback_encoder_factory_array_explicit_nulls() {
2497 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2498
2499 let json_value: Value = {
2500 let mut buf = Vec::new();
2501 let mut writer = WriterBuilder::new()
2502 .with_encoder_factory(encoder_factory)
2503 .with_explicit_nulls(true)
2504 .build::<_, JsonArray>(&mut buf);
2505 writer.write_batches(&[&batch]).unwrap();
2506 writer.finish().unwrap();
2507 serde_json::from_slice(&buf).unwrap()
2508 };
2509
2510 let expected = json!([
2511 {"union":1,"float":1.0},
2512 {"union":"a", "float": null},
2513 {"union":null,"float":3.4},
2514 ]);
2515
2516 assert_eq!(json_value, expected);
2517 }
2518
2519 #[test]
2520 fn test_default_encoder_byte_array() {
2521 struct IntArrayBinaryEncoder<B> {
2522 array: B,
2523 }
2524
2525 impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2526 where
2527 B: ArrayAccessor<Item = &'a [u8]>,
2528 {
2529 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2530 out.push(b'[');
2531 let child = self.array.value(idx);
2532 for (idx, byte) in child.iter().enumerate() {
2533 write!(out, "{byte}").unwrap();
2534 if idx < child.len() - 1 {
2535 out.push(b',');
2536 }
2537 }
2538 out.push(b']');
2539 }
2540 }
2541
2542 #[derive(Debug)]
2543 struct IntArayBinaryEncoderFactory;
2544
2545 impl EncoderFactory for IntArayBinaryEncoderFactory {
2546 fn make_default_encoder<'a>(
2547 &self,
2548 _field: &'a FieldRef,
2549 array: &'a dyn Array,
2550 _options: &'a EncoderOptions,
2551 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2552 match array.data_type() {
2553 DataType::Binary => {
2554 let array = array.as_binary::<i32>();
2555 let encoder = IntArrayBinaryEncoder { array };
2556 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2557 let nulls = array.nulls().cloned();
2558 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2559 }
2560 _ => Ok(None),
2561 }
2562 }
2563 }
2564
2565 let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2566 let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2567 let fields = vec![
2568 Field::new("bytes", DataType::Binary, true),
2569 Field::new("float", DataType::Float64, true),
2570 ];
2571 let batch = RecordBatch::try_new(
2572 Arc::new(Schema::new(fields)),
2573 vec![
2574 Arc::new(binary_array) as Arc<dyn Array>,
2575 Arc::new(float_array) as Arc<dyn Array>,
2576 ],
2577 )
2578 .unwrap();
2579
2580 let json_value: Value = {
2581 let mut buf = Vec::new();
2582 let mut writer = WriterBuilder::new()
2583 .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2584 .build::<_, JsonArray>(&mut buf);
2585 writer.write_batches(&[&batch]).unwrap();
2586 writer.finish().unwrap();
2587 serde_json::from_slice(&buf).unwrap()
2588 };
2589
2590 let expected = json!([
2591 {"bytes": [97], "float": 1.0},
2592 {"float": 2.3},
2593 {"bytes": [98]},
2594 ]);
2595
2596 assert_eq!(json_value, expected);
2597 }
2598
2599 #[test]
2600 fn test_encoder_factory_customize_dictionary() {
2601 struct PaddedInt32Encoder {
2606 array: Int32Array,
2607 }
2608
2609 impl Encoder for PaddedInt32Encoder {
2610 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2611 let value = self.array.value(idx);
2612 write!(out, "\"{value:0>8}\"").unwrap();
2613 }
2614 }
2615
2616 #[derive(Debug)]
2617 struct CustomEncoderFactory;
2618
2619 impl EncoderFactory for CustomEncoderFactory {
2620 fn make_default_encoder<'a>(
2621 &self,
2622 field: &'a FieldRef,
2623 array: &'a dyn Array,
2624 _options: &'a EncoderOptions,
2625 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2626 let padded = field
2631 .metadata()
2632 .get("padded")
2633 .map(|v| v == "true")
2634 .unwrap_or_default();
2635 match (array.data_type(), padded) {
2636 (DataType::Int32, true) => {
2637 let array = array.as_primitive::<Int32Type>();
2638 let nulls = array.nulls().cloned();
2639 let encoder = PaddedInt32Encoder {
2640 array: array.clone(),
2641 };
2642 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2643 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2644 }
2645 _ => Ok(None),
2646 }
2647 }
2648 }
2649
2650 let to_json = |batch| {
2651 let mut buf = Vec::new();
2652 let mut writer = WriterBuilder::new()
2653 .with_encoder_factory(Arc::new(CustomEncoderFactory))
2654 .build::<_, JsonArray>(&mut buf);
2655 writer.write_batches(&[batch]).unwrap();
2656 writer.finish().unwrap();
2657 serde_json::from_slice::<Value>(&buf).unwrap()
2658 };
2659
2660 let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2662 let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2663 HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2664 ));
2665 let batch = RecordBatch::try_new(
2666 Arc::new(Schema::new(vec![field.clone()])),
2667 vec![Arc::new(array)],
2668 )
2669 .unwrap();
2670
2671 let json_value = to_json(&batch);
2672
2673 let expected = json!([
2674 {"int": "00000001"},
2675 {},
2676 {"int": "00000002"},
2677 ]);
2678
2679 assert_eq!(json_value, expected);
2680
2681 let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2683 array_builder.append_value(1);
2684 array_builder.append_null();
2685 array_builder.append_value(1);
2686 let array = array_builder.finish();
2687 let field = Field::new(
2688 "int",
2689 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2690 true,
2691 )
2692 .with_metadata(HashMap::from_iter(vec![(
2693 "padded".to_string(),
2694 "true".to_string(),
2695 )]));
2696 let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2697 .unwrap();
2698
2699 let json_value = to_json(&batch);
2700
2701 let expected = json!([
2702 {"int": "00000001"},
2703 {},
2704 {"int": "00000001"},
2705 ]);
2706
2707 assert_eq!(json_value, expected);
2708 }
2709
2710 #[test]
2711 fn test_write_run_end_encoded() {
2712 let run_ends = Int32Array::from(vec![2, 5, 6]);
2713 let values = StringArray::from(vec![Some("a"), Some("b"), None]);
2714 let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2715
2716 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2717 "c1",
2718 ree.data_type().clone(),
2719 true,
2720 )]));
2721
2722 let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2723
2724 let mut buf = Vec::new();
2725 {
2726 let mut writer = LineDelimitedWriter::new(&mut buf);
2727 writer.write_batches(&[&batch]).unwrap();
2728 }
2729
2730 assert_json_eq(
2731 &buf,
2732 r#"{"c1":"a"}
2733{"c1":"a"}
2734{"c1":"b"}
2735{"c1":"b"}
2736{"c1":"b"}
2737{}
2738"#,
2739 );
2740 }
2741
2742 #[test]
2743 fn test_write_run_end_encoded_int_values() {
2744 let run_ends = Int32Array::from(vec![3, 5]);
2745 let values = Int32Array::from(vec![10, 20]);
2746 let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2747
2748 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2749 "n",
2750 ree.data_type().clone(),
2751 true,
2752 )]));
2753
2754 let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2755
2756 let json_value: Value = {
2757 let mut buf = Vec::new();
2758 let mut writer = WriterBuilder::new().build::<_, JsonArray>(&mut buf);
2759 writer.write_batches(&[&batch]).unwrap();
2760 writer.finish().unwrap();
2761 serde_json::from_slice(&buf).unwrap()
2762 };
2763
2764 let expected = json!([
2765 {"n": 10},
2766 {"n": 10},
2767 {"n": 10},
2768 {"n": 20},
2769 {"n": 20},
2770 ]);
2771
2772 assert_eq!(json_value, expected);
2773 }
2774
2775 #[test]
2776 fn test_run_end_encoded_roundtrip() {
2777 let run_ends = Int32Array::from(vec![3, 5, 7]);
2778 let values = StringArray::from(vec![Some("a"), None, Some("b")]);
2779 let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2780
2781 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2782 "c",
2783 ree.data_type().clone(),
2784 true,
2785 )]));
2786 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ree)]).unwrap();
2787
2788 let mut buf = Vec::new();
2789 {
2790 let mut writer = super::LineDelimitedWriter::new(&mut buf);
2791 writer.write_batches(&[&batch]).unwrap();
2792 }
2793
2794 let batches: Vec<RecordBatch> = ReaderBuilder::new(schema)
2795 .with_batch_size(1024)
2796 .build(std::io::Cursor::new(&buf))
2797 .unwrap()
2798 .collect::<Result<Vec<_>, _>>()
2799 .unwrap();
2800 assert_eq!(batches.len(), 1);
2801
2802 let col = batches[0].column(0);
2803 let run_array = col.as_run::<Int32Type>();
2804
2805 assert_eq!(run_array.len(), 7);
2806 assert_eq!(run_array.run_ends().values(), &[3, 5, 7]);
2807
2808 let values = run_array.values().as_string::<i32>();
2809 assert_eq!(values.len(), 3);
2810 assert_eq!(values.value(0), "a");
2811 assert!(values.is_null(1));
2812 assert_eq!(values.value(2), "b");
2813 }
2814}