1use crate::StructMode;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use chrono::Utc;
141use serde_core::Serialize;
142
143use arrow_array::timezone::Tz;
144use arrow_array::types::*;
145use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
146use arrow_data::ArrayData;
147use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
148pub use schema::*;
149
150use crate::reader::boolean_array::BooleanArrayDecoder;
151use crate::reader::decimal_array::DecimalArrayDecoder;
152use crate::reader::list_array::ListArrayDecoder;
153use crate::reader::map_array::MapArrayDecoder;
154use crate::reader::null_array::NullArrayDecoder;
155use crate::reader::primitive_array::PrimitiveArrayDecoder;
156use crate::reader::string_array::StringArrayDecoder;
157use crate::reader::string_view_array::StringViewArrayDecoder;
158use crate::reader::struct_array::StructArrayDecoder;
159use crate::reader::tape::{Tape, TapeDecoder};
160use crate::reader::timestamp_array::TimestampArrayDecoder;
161
162mod boolean_array;
163mod decimal_array;
164mod list_array;
165mod map_array;
166mod null_array;
167mod primitive_array;
168mod schema;
169mod serializer;
170mod string_array;
171mod string_view_array;
172mod struct_array;
173mod tape;
174mod timestamp_array;
175
176pub struct ReaderBuilder {
178 batch_size: usize,
179 coerce_primitive: bool,
180 strict_mode: bool,
181 is_field: bool,
182 struct_mode: StructMode,
183
184 schema: SchemaRef,
185}
186
187impl ReaderBuilder {
188 pub fn new(schema: SchemaRef) -> Self {
197 Self {
198 batch_size: 1024,
199 coerce_primitive: false,
200 strict_mode: false,
201 is_field: false,
202 struct_mode: Default::default(),
203 schema,
204 }
205 }
206
207 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
238 Self {
239 batch_size: 1024,
240 coerce_primitive: false,
241 strict_mode: false,
242 is_field: true,
243 struct_mode: Default::default(),
244 schema: Arc::new(Schema::new([field.into()])),
245 }
246 }
247
248 pub fn with_batch_size(self, batch_size: usize) -> Self {
250 Self { batch_size, ..self }
251 }
252
253 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
256 Self {
257 coerce_primitive,
258 ..self
259 }
260 }
261
262 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
268 Self {
269 strict_mode,
270 ..self
271 }
272 }
273
274 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
278 Self {
279 struct_mode,
280 ..self
281 }
282 }
283
284 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
286 Ok(Reader {
287 reader,
288 decoder: self.build_decoder()?,
289 })
290 }
291
292 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
294 let (data_type, nullable) = match self.is_field {
295 false => (DataType::Struct(self.schema.fields.clone()), false),
296 true => {
297 let field = &self.schema.fields[0];
298 (field.data_type().clone(), field.is_nullable())
299 }
300 };
301
302 let decoder = make_decoder(
303 data_type,
304 self.coerce_primitive,
305 self.strict_mode,
306 nullable,
307 self.struct_mode,
308 )?;
309
310 let num_fields = self.schema.flattened_fields().len();
311
312 Ok(Decoder {
313 decoder,
314 is_field: self.is_field,
315 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
316 batch_size: self.batch_size,
317 schema: self.schema,
318 })
319 }
320}
321
322pub struct Reader<R> {
326 reader: R,
327 decoder: Decoder,
328}
329
330impl<R> std::fmt::Debug for Reader<R> {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 f.debug_struct("Reader")
333 .field("decoder", &self.decoder)
334 .finish()
335 }
336}
337
338impl<R: BufRead> Reader<R> {
339 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
341 loop {
342 let buf = self.reader.fill_buf()?;
343 if buf.is_empty() {
344 break;
345 }
346 let read = buf.len();
347
348 let decoded = self.decoder.decode(buf)?;
349 self.reader.consume(decoded);
350 if decoded != read {
351 break;
352 }
353 }
354 self.decoder.flush()
355 }
356}
357
358impl<R: BufRead> Iterator for Reader<R> {
359 type Item = Result<RecordBatch, ArrowError>;
360
361 fn next(&mut self) -> Option<Self::Item> {
362 self.read().transpose()
363 }
364}
365
366impl<R: BufRead> RecordBatchReader for Reader<R> {
367 fn schema(&self) -> SchemaRef {
368 self.decoder.schema.clone()
369 }
370}
371
372pub struct Decoder {
413 tape_decoder: TapeDecoder,
414 decoder: Box<dyn ArrayDecoder>,
415 batch_size: usize,
416 is_field: bool,
417 schema: SchemaRef,
418}
419
420impl std::fmt::Debug for Decoder {
421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422 f.debug_struct("Decoder")
423 .field("schema", &self.schema)
424 .field("batch_size", &self.batch_size)
425 .finish()
426 }
427}
428
429impl Decoder {
430 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
439 self.tape_decoder.decode(buf)
440 }
441
442 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
619 self.tape_decoder.serialize(rows)
620 }
621
622 pub fn has_partial_record(&self) -> bool {
624 self.tape_decoder.has_partial_row()
625 }
626
627 pub fn len(&self) -> usize {
629 self.tape_decoder.num_buffered_rows()
630 }
631
632 pub fn is_empty(&self) -> bool {
634 self.len() == 0
635 }
636
637 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
644 let tape = self.tape_decoder.finish()?;
645
646 if tape.num_rows() == 0 {
647 return Ok(None);
648 }
649
650 let mut next_object = 1;
652 let pos: Vec<_> = (0..tape.num_rows())
653 .map(|_| {
654 let next = tape.next(next_object, "row").unwrap();
655 std::mem::replace(&mut next_object, next)
656 })
657 .collect();
658
659 let decoded = self.decoder.decode(&tape, &pos)?;
660 self.tape_decoder.clear();
661
662 let batch = match self.is_field {
663 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
664 false => {
665 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
666 }
667 };
668
669 Ok(Some(batch))
670 }
671}
672
673trait ArrayDecoder: Send {
674 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
676}
677
678macro_rules! primitive_decoder {
679 ($t:ty, $data_type:expr) => {
680 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
681 };
682}
683
684fn make_decoder(
685 data_type: DataType,
686 coerce_primitive: bool,
687 strict_mode: bool,
688 is_nullable: bool,
689 struct_mode: StructMode,
690) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
691 downcast_integer! {
692 data_type => (primitive_decoder, data_type),
693 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
694 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
695 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
696 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
697 DataType::Timestamp(TimeUnit::Second, None) => {
698 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
699 },
700 DataType::Timestamp(TimeUnit::Millisecond, None) => {
701 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
702 },
703 DataType::Timestamp(TimeUnit::Microsecond, None) => {
704 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
705 },
706 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
707 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
708 },
709 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
710 let tz: Tz = tz.parse()?;
711 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
712 },
713 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
714 let tz: Tz = tz.parse()?;
715 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
716 },
717 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
718 let tz: Tz = tz.parse()?;
719 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
720 },
721 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
722 let tz: Tz = tz.parse()?;
723 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
724 },
725 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
726 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
727 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
728 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
729 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
730 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
731 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
732 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
733 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
734 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
735 DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
736 DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
737 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
738 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
739 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
740 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
741 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
742 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
743 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
744 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
745 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
746 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
747 Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
748 }
749 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
750 d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use serde_json::json;
757 use std::fs::File;
758 use std::io::{BufReader, Cursor, Seek};
759
760 use arrow_array::cast::AsArray;
761 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
762 use arrow_buffer::{ArrowNativeType, Buffer};
763 use arrow_cast::display::{ArrayFormatter, FormatOptions};
764 use arrow_data::ArrayDataBuilder;
765 use arrow_schema::{Field, Fields};
766
767 use super::*;
768
769 fn do_read(
770 buf: &str,
771 batch_size: usize,
772 coerce_primitive: bool,
773 strict_mode: bool,
774 schema: SchemaRef,
775 ) -> Vec<RecordBatch> {
776 let mut unbuffered = vec![];
777
778 for batch_size in [1, 3, 100, batch_size] {
780 unbuffered = ReaderBuilder::new(schema.clone())
781 .with_batch_size(batch_size)
782 .with_coerce_primitive(coerce_primitive)
783 .build(Cursor::new(buf.as_bytes()))
784 .unwrap()
785 .collect::<Result<Vec<_>, _>>()
786 .unwrap();
787
788 for b in unbuffered.iter().take(unbuffered.len() - 1) {
789 assert_eq!(b.num_rows(), batch_size)
790 }
791
792 for b in [1, 3, 5] {
794 let buffered = ReaderBuilder::new(schema.clone())
795 .with_batch_size(batch_size)
796 .with_coerce_primitive(coerce_primitive)
797 .with_strict_mode(strict_mode)
798 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
799 .unwrap()
800 .collect::<Result<Vec<_>, _>>()
801 .unwrap();
802 assert_eq!(unbuffered, buffered);
803 }
804 }
805
806 unbuffered
807 }
808
809 #[test]
810 fn test_basic() {
811 let buf = r#"
812 {"a": 1, "b": 2, "c": true, "d": 1}
813 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
814
815 {"b": 6, "a": 2.0, "d": 45}
816 {"b": "5", "a": 2}
817 {"b": 4e0}
818 {"b": 7, "a": null}
819 "#;
820
821 let schema = Arc::new(Schema::new(vec![
822 Field::new("a", DataType::Int64, true),
823 Field::new("b", DataType::Int32, true),
824 Field::new("c", DataType::Boolean, true),
825 Field::new("d", DataType::Date32, true),
826 Field::new("e", DataType::Date64, true),
827 ]));
828
829 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
830 assert!(decoder.is_empty());
831 assert_eq!(decoder.len(), 0);
832 assert!(!decoder.has_partial_record());
833 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
834 assert!(!decoder.is_empty());
835 assert_eq!(decoder.len(), 6);
836 assert!(!decoder.has_partial_record());
837 let batch = decoder.flush().unwrap().unwrap();
838 assert_eq!(batch.num_rows(), 6);
839 assert!(decoder.is_empty());
840 assert_eq!(decoder.len(), 0);
841 assert!(!decoder.has_partial_record());
842
843 let batches = do_read(buf, 1024, false, false, schema);
844 assert_eq!(batches.len(), 1);
845
846 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
847 assert_eq!(col1.null_count(), 2);
848 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
849 assert!(col1.is_null(4));
850 assert!(col1.is_null(5));
851
852 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
853 assert_eq!(col2.null_count(), 0);
854 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
855
856 let col3 = batches[0].column(2).as_boolean();
857 assert_eq!(col3.null_count(), 4);
858 assert!(col3.value(0));
859 assert!(!col3.is_null(0));
860 assert!(!col3.value(1));
861 assert!(!col3.is_null(1));
862
863 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
864 assert_eq!(col4.null_count(), 3);
865 assert!(col4.is_null(3));
866 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
867
868 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
869 assert_eq!(col5.null_count(), 5);
870 assert!(col5.is_null(0));
871 assert!(col5.is_null(2));
872 assert!(col5.is_null(3));
873 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
874 }
875
876 #[test]
877 fn test_string() {
878 let buf = r#"
879 {"a": "1", "b": "2"}
880 {"a": "hello", "b": "shoo"}
881 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
882
883 {"b": null}
884 {"b": "", "a": null}
885
886 "#;
887 let schema = Arc::new(Schema::new(vec![
888 Field::new("a", DataType::Utf8, true),
889 Field::new("b", DataType::LargeUtf8, true),
890 ]));
891
892 let batches = do_read(buf, 1024, false, false, schema);
893 assert_eq!(batches.len(), 1);
894
895 let col1 = batches[0].column(0).as_string::<i32>();
896 assert_eq!(col1.null_count(), 2);
897 assert_eq!(col1.value(0), "1");
898 assert_eq!(col1.value(1), "hello");
899 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
900 assert!(col1.is_null(3));
901 assert!(col1.is_null(4));
902
903 let col2 = batches[0].column(1).as_string::<i64>();
904 assert_eq!(col2.null_count(), 1);
905 assert_eq!(col2.value(0), "2");
906 assert_eq!(col2.value(1), "shoo");
907 assert_eq!(col2.value(2), "\t😁foo");
908 assert!(col2.is_null(3));
909 assert_eq!(col2.value(4), "");
910 }
911
912 #[test]
913 fn test_long_string_view_allocation() {
914 let expected_capacity: usize = 41;
924
925 let buf = r#"
926 {"a": "short", "b": "dummy"}
927 {"a": "this is definitely long", "b": "dummy"}
928 {"a": "hello", "b": "dummy"}
929 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
930 "#;
931
932 let schema = Arc::new(Schema::new(vec![
933 Field::new("a", DataType::Utf8View, true),
934 Field::new("b", DataType::LargeUtf8, true),
935 ]));
936
937 let batches = do_read(buf, 1024, false, false, schema);
938 assert_eq!(batches.len(), 1, "Expected one record batch");
939
940 let col_a = batches[0].column(0);
942 let string_view_array = col_a
943 .as_any()
944 .downcast_ref::<StringViewArray>()
945 .expect("Column should be a StringViewArray");
946
947 let data_buffer = string_view_array.to_data().buffers()[0].len();
950
951 assert!(
954 data_buffer >= expected_capacity,
955 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
956 );
957
958 assert_eq!(string_view_array.value(0), "short");
960 assert_eq!(string_view_array.value(1), "this is definitely long");
961 assert_eq!(string_view_array.value(2), "hello");
962 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
963 }
964
965 #[test]
967 fn test_numeric_view_allocation() {
968 let expected_capacity: usize = 33;
976
977 let buf = r#"
978 {"n": 123456789}
979 {"n": 1000000000000}
980 {"n": 3.1415}
981 {"n": 2.718281828459045}
982 "#;
983
984 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
985
986 let batches = do_read(buf, 1024, true, false, schema);
987 assert_eq!(batches.len(), 1, "Expected one record batch");
988
989 let col_n = batches[0].column(0);
990 let string_view_array = col_n
991 .as_any()
992 .downcast_ref::<StringViewArray>()
993 .expect("Column should be a StringViewArray");
994
995 let data_buffer = string_view_array.to_data().buffers()[0].len();
997 assert!(
998 data_buffer >= expected_capacity,
999 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1000 );
1001
1002 assert_eq!(string_view_array.value(0), "123456789");
1005 assert_eq!(string_view_array.value(1), "1000000000000");
1006 assert_eq!(string_view_array.value(2), "3.1415");
1007 assert_eq!(string_view_array.value(3), "2.718281828459045");
1008 }
1009
1010 #[test]
1011 fn test_string_with_uft8view() {
1012 let buf = r#"
1013 {"a": "1", "b": "2"}
1014 {"a": "hello", "b": "shoo"}
1015 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1016
1017 {"b": null}
1018 {"b": "", "a": null}
1019
1020 "#;
1021 let schema = Arc::new(Schema::new(vec![
1022 Field::new("a", DataType::Utf8View, true),
1023 Field::new("b", DataType::LargeUtf8, true),
1024 ]));
1025
1026 let batches = do_read(buf, 1024, false, false, schema);
1027 assert_eq!(batches.len(), 1);
1028
1029 let col1 = batches[0].column(0).as_string_view();
1030 assert_eq!(col1.null_count(), 2);
1031 assert_eq!(col1.value(0), "1");
1032 assert_eq!(col1.value(1), "hello");
1033 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1034 assert!(col1.is_null(3));
1035 assert!(col1.is_null(4));
1036 assert_eq!(col1.data_type(), &DataType::Utf8View);
1037
1038 let col2 = batches[0].column(1).as_string::<i64>();
1039 assert_eq!(col2.null_count(), 1);
1040 assert_eq!(col2.value(0), "2");
1041 assert_eq!(col2.value(1), "shoo");
1042 assert_eq!(col2.value(2), "\t😁foo");
1043 assert!(col2.is_null(3));
1044 assert_eq!(col2.value(4), "");
1045 }
1046
1047 #[test]
1048 fn test_complex() {
1049 let buf = r#"
1050 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1051 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1052 {"list": null, "nested": {"a": null}}
1053 "#;
1054
1055 let schema = Arc::new(Schema::new(vec![
1056 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1057 Field::new_struct(
1058 "nested",
1059 vec![
1060 Field::new("a", DataType::Int32, true),
1061 Field::new("b", DataType::Int32, true),
1062 ],
1063 true,
1064 ),
1065 Field::new_struct(
1066 "nested_list",
1067 vec![Field::new_list(
1068 "list2",
1069 Field::new_struct(
1070 "element",
1071 vec![Field::new("c", DataType::Int32, false)],
1072 false,
1073 ),
1074 true,
1075 )],
1076 true,
1077 ),
1078 ]));
1079
1080 let batches = do_read(buf, 1024, false, false, schema);
1081 assert_eq!(batches.len(), 1);
1082
1083 let list = batches[0].column(0).as_list::<i32>();
1084 assert_eq!(list.len(), 3);
1085 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1086 assert_eq!(list.null_count(), 1);
1087 assert!(list.is_null(2));
1088 let list_values = list.values().as_primitive::<Int32Type>();
1089 assert_eq!(list_values.values(), &[5, 6]);
1090
1091 let nested = batches[0].column(1).as_struct();
1092 let a = nested.column(0).as_primitive::<Int32Type>();
1093 assert_eq!(list.null_count(), 1);
1094 assert_eq!(a.values(), &[1, 7, 0]);
1095 assert!(list.is_null(2));
1096
1097 let b = nested.column(1).as_primitive::<Int32Type>();
1098 assert_eq!(b.null_count(), 2);
1099 assert_eq!(b.len(), 3);
1100 assert_eq!(b.value(0), 2);
1101 assert!(b.is_null(1));
1102 assert!(b.is_null(2));
1103
1104 let nested_list = batches[0].column(2).as_struct();
1105 assert_eq!(nested_list.len(), 3);
1106 assert_eq!(nested_list.null_count(), 1);
1107 assert!(nested_list.is_null(2));
1108
1109 let list2 = nested_list.column(0).as_list::<i32>();
1110 assert_eq!(list2.len(), 3);
1111 assert_eq!(list2.null_count(), 1);
1112 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1113 assert!(list2.is_null(2));
1114
1115 let list2_values = list2.values().as_struct();
1116
1117 let c = list2_values.column(0).as_primitive::<Int32Type>();
1118 assert_eq!(c.values(), &[3, 4]);
1119 }
1120
1121 #[test]
1122 fn test_projection() {
1123 let buf = r#"
1124 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1125 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1126 "#;
1127
1128 let schema = Arc::new(Schema::new(vec![
1129 Field::new_struct(
1130 "nested",
1131 vec![Field::new("a", DataType::Int32, false)],
1132 true,
1133 ),
1134 Field::new_struct(
1135 "nested_list",
1136 vec![Field::new_list(
1137 "list2",
1138 Field::new_struct(
1139 "element",
1140 vec![Field::new("d", DataType::Int32, true)],
1141 false,
1142 ),
1143 true,
1144 )],
1145 true,
1146 ),
1147 ]));
1148
1149 let batches = do_read(buf, 1024, false, false, schema);
1150 assert_eq!(batches.len(), 1);
1151
1152 let nested = batches[0].column(0).as_struct();
1153 assert_eq!(nested.num_columns(), 1);
1154 let a = nested.column(0).as_primitive::<Int32Type>();
1155 assert_eq!(a.null_count(), 0);
1156 assert_eq!(a.values(), &[1, 7]);
1157
1158 let nested_list = batches[0].column(1).as_struct();
1159 assert_eq!(nested_list.num_columns(), 1);
1160 assert_eq!(nested_list.null_count(), 0);
1161
1162 let list2 = nested_list.column(0).as_list::<i32>();
1163 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1164 assert_eq!(list2.null_count(), 0);
1165
1166 let child = list2.values().as_struct();
1167 assert_eq!(child.num_columns(), 1);
1168 assert_eq!(child.len(), 2);
1169 assert_eq!(child.null_count(), 0);
1170
1171 let c = child.column(0).as_primitive::<Int32Type>();
1172 assert_eq!(c.values(), &[5, 0]);
1173 assert_eq!(c.null_count(), 1);
1174 assert!(c.is_null(1));
1175 }
1176
1177 #[test]
1178 fn test_map() {
1179 let buf = r#"
1180 {"map": {"a": ["foo", null]}}
1181 {"map": {"a": [null], "b": []}}
1182 {"map": {"c": null, "a": ["baz"]}}
1183 "#;
1184 let map = Field::new_map(
1185 "map",
1186 "entries",
1187 Field::new("key", DataType::Utf8, false),
1188 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1189 false,
1190 true,
1191 );
1192
1193 let schema = Arc::new(Schema::new(vec![map]));
1194
1195 let batches = do_read(buf, 1024, false, false, schema);
1196 assert_eq!(batches.len(), 1);
1197
1198 let map = batches[0].column(0).as_map();
1199 let map_keys = map.keys().as_string::<i32>();
1200 let map_values = map.values().as_list::<i32>();
1201 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1202
1203 let k: Vec<_> = map_keys.iter().flatten().collect();
1204 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1205
1206 let list_values = map_values.values().as_string::<i32>();
1207 let lv: Vec<_> = list_values.iter().collect();
1208 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1209 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1210 assert_eq!(map_values.null_count(), 1);
1211 assert!(map_values.is_null(3));
1212
1213 let options = FormatOptions::default().with_null("null");
1214 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1215 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1216 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1217 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1218 }
1219
1220 #[test]
1221 fn test_not_coercing_primitive_into_string_without_flag() {
1222 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1223
1224 let buf = r#"{"a": 1}"#;
1225 let err = ReaderBuilder::new(schema.clone())
1226 .with_batch_size(1024)
1227 .build(Cursor::new(buf.as_bytes()))
1228 .unwrap()
1229 .read()
1230 .unwrap_err();
1231
1232 assert_eq!(
1233 err.to_string(),
1234 "Json error: whilst decoding field 'a': expected string got 1"
1235 );
1236
1237 let buf = r#"{"a": true}"#;
1238 let err = ReaderBuilder::new(schema)
1239 .with_batch_size(1024)
1240 .build(Cursor::new(buf.as_bytes()))
1241 .unwrap()
1242 .read()
1243 .unwrap_err();
1244
1245 assert_eq!(
1246 err.to_string(),
1247 "Json error: whilst decoding field 'a': expected string got true"
1248 );
1249 }
1250
1251 #[test]
1252 fn test_coercing_primitive_into_string() {
1253 let buf = r#"
1254 {"a": 1, "b": 2, "c": true}
1255 {"a": 2E0, "b": 4, "c": false}
1256
1257 {"b": 6, "a": 2.0}
1258 {"b": "5", "a": 2}
1259 {"b": 4e0}
1260 {"b": 7, "a": null}
1261 "#;
1262
1263 let schema = Arc::new(Schema::new(vec![
1264 Field::new("a", DataType::Utf8, true),
1265 Field::new("b", DataType::Utf8, true),
1266 Field::new("c", DataType::Utf8, true),
1267 ]));
1268
1269 let batches = do_read(buf, 1024, true, false, schema);
1270 assert_eq!(batches.len(), 1);
1271
1272 let col1 = batches[0].column(0).as_string::<i32>();
1273 assert_eq!(col1.null_count(), 2);
1274 assert_eq!(col1.value(0), "1");
1275 assert_eq!(col1.value(1), "2E0");
1276 assert_eq!(col1.value(2), "2.0");
1277 assert_eq!(col1.value(3), "2");
1278 assert!(col1.is_null(4));
1279 assert!(col1.is_null(5));
1280
1281 let col2 = batches[0].column(1).as_string::<i32>();
1282 assert_eq!(col2.null_count(), 0);
1283 assert_eq!(col2.value(0), "2");
1284 assert_eq!(col2.value(1), "4");
1285 assert_eq!(col2.value(2), "6");
1286 assert_eq!(col2.value(3), "5");
1287 assert_eq!(col2.value(4), "4e0");
1288 assert_eq!(col2.value(5), "7");
1289
1290 let col3 = batches[0].column(2).as_string::<i32>();
1291 assert_eq!(col3.null_count(), 4);
1292 assert_eq!(col3.value(0), "true");
1293 assert_eq!(col3.value(1), "false");
1294 assert!(col3.is_null(2));
1295 assert!(col3.is_null(3));
1296 assert!(col3.is_null(4));
1297 assert!(col3.is_null(5));
1298 }
1299
1300 fn test_decimal<T: DecimalType>(data_type: DataType) {
1301 let buf = r#"
1302 {"a": 1, "b": 2, "c": 38.30}
1303 {"a": 2, "b": 4, "c": 123.456}
1304
1305 {"b": 1337, "a": "2.0452"}
1306 {"b": "5", "a": "11034.2"}
1307 {"b": 40}
1308 {"b": 1234, "a": null}
1309 "#;
1310
1311 let schema = Arc::new(Schema::new(vec![
1312 Field::new("a", data_type.clone(), true),
1313 Field::new("b", data_type.clone(), true),
1314 Field::new("c", data_type, true),
1315 ]));
1316
1317 let batches = do_read(buf, 1024, true, false, schema);
1318 assert_eq!(batches.len(), 1);
1319
1320 let col1 = batches[0].column(0).as_primitive::<T>();
1321 assert_eq!(col1.null_count(), 2);
1322 assert!(col1.is_null(4));
1323 assert!(col1.is_null(5));
1324 assert_eq!(
1325 col1.values(),
1326 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1327 );
1328
1329 let col2 = batches[0].column(1).as_primitive::<T>();
1330 assert_eq!(col2.null_count(), 0);
1331 assert_eq!(
1332 col2.values(),
1333 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1334 );
1335
1336 let col3 = batches[0].column(2).as_primitive::<T>();
1337 assert_eq!(col3.null_count(), 4);
1338 assert!(!col3.is_null(0));
1339 assert!(!col3.is_null(1));
1340 assert!(col3.is_null(2));
1341 assert!(col3.is_null(3));
1342 assert!(col3.is_null(4));
1343 assert!(col3.is_null(5));
1344 assert_eq!(
1345 col3.values(),
1346 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1347 );
1348 }
1349
1350 #[test]
1351 fn test_decimals() {
1352 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1353 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1354 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1355 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1356 }
1357
1358 fn test_timestamp<T: ArrowTimestampType>() {
1359 let buf = r#"
1360 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1361 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1362
1363 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1364 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1365 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1366 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1367 "#;
1368
1369 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1370 let schema = Arc::new(Schema::new(vec![
1371 Field::new("a", T::DATA_TYPE, true),
1372 Field::new("b", T::DATA_TYPE, true),
1373 Field::new("c", T::DATA_TYPE, true),
1374 Field::new("d", with_timezone, true),
1375 ]));
1376
1377 let batches = do_read(buf, 1024, true, false, schema);
1378 assert_eq!(batches.len(), 1);
1379
1380 let unit_in_nanos: i64 = match T::UNIT {
1381 TimeUnit::Second => 1_000_000_000,
1382 TimeUnit::Millisecond => 1_000_000,
1383 TimeUnit::Microsecond => 1_000,
1384 TimeUnit::Nanosecond => 1,
1385 };
1386
1387 let col1 = batches[0].column(0).as_primitive::<T>();
1388 assert_eq!(col1.null_count(), 4);
1389 assert!(col1.is_null(2));
1390 assert!(col1.is_null(3));
1391 assert!(col1.is_null(4));
1392 assert!(col1.is_null(5));
1393 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1394
1395 let col2 = batches[0].column(1).as_primitive::<T>();
1396 assert_eq!(col2.null_count(), 1);
1397 assert!(col2.is_null(5));
1398 assert_eq!(
1399 col2.values(),
1400 &[
1401 1599572549190855000 / unit_in_nanos,
1402 1599572549190855000 / unit_in_nanos,
1403 1599572549000000000 / unit_in_nanos,
1404 40,
1405 1234,
1406 0
1407 ]
1408 );
1409
1410 let col3 = batches[0].column(2).as_primitive::<T>();
1411 assert_eq!(col3.null_count(), 0);
1412 assert_eq!(
1413 col3.values(),
1414 &[
1415 38,
1416 123,
1417 854702816123000000 / unit_in_nanos,
1418 1599572549190855000 / unit_in_nanos,
1419 854702816123000000 / unit_in_nanos,
1420 854738816123000000 / unit_in_nanos
1421 ]
1422 );
1423
1424 let col4 = batches[0].column(3).as_primitive::<T>();
1425
1426 assert_eq!(col4.null_count(), 0);
1427 assert_eq!(
1428 col4.values(),
1429 &[
1430 854674016123000000 / unit_in_nanos,
1431 123,
1432 854702816123000000 / unit_in_nanos,
1433 854720816123000000 / unit_in_nanos,
1434 854674016000000000 / unit_in_nanos,
1435 854640000000000000 / unit_in_nanos
1436 ]
1437 );
1438 }
1439
1440 #[test]
1441 fn test_timestamps() {
1442 test_timestamp::<TimestampSecondType>();
1443 test_timestamp::<TimestampMillisecondType>();
1444 test_timestamp::<TimestampMicrosecondType>();
1445 test_timestamp::<TimestampNanosecondType>();
1446 }
1447
1448 fn test_time<T: ArrowTemporalType>() {
1449 let buf = r#"
1450 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1451 {"a": 2, "b": "23:59:59", "c": 123.456}
1452
1453 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1454 {"b": 40, "c": "13:42:29.190855"}
1455 {"b": 1234, "a": null, "c": "09:26:56.123"}
1456 {"c": "14:26:56.123"}
1457 "#;
1458
1459 let unit = match T::DATA_TYPE {
1460 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1461 _ => unreachable!(),
1462 };
1463
1464 let unit_in_nanos = match unit {
1465 TimeUnit::Second => 1_000_000_000,
1466 TimeUnit::Millisecond => 1_000_000,
1467 TimeUnit::Microsecond => 1_000,
1468 TimeUnit::Nanosecond => 1,
1469 };
1470
1471 let schema = Arc::new(Schema::new(vec![
1472 Field::new("a", T::DATA_TYPE, true),
1473 Field::new("b", T::DATA_TYPE, true),
1474 Field::new("c", T::DATA_TYPE, true),
1475 ]));
1476
1477 let batches = do_read(buf, 1024, true, false, schema);
1478 assert_eq!(batches.len(), 1);
1479
1480 let col1 = batches[0].column(0).as_primitive::<T>();
1481 assert_eq!(col1.null_count(), 4);
1482 assert!(col1.is_null(2));
1483 assert!(col1.is_null(3));
1484 assert!(col1.is_null(4));
1485 assert!(col1.is_null(5));
1486 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1487
1488 let col2 = batches[0].column(1).as_primitive::<T>();
1489 assert_eq!(col2.null_count(), 1);
1490 assert!(col2.is_null(5));
1491 assert_eq!(
1492 col2.values(),
1493 &[
1494 34016123000000 / unit_in_nanos,
1495 86399000000000 / unit_in_nanos,
1496 64800000000000 / unit_in_nanos,
1497 40,
1498 1234,
1499 0
1500 ]
1501 .map(T::Native::usize_as)
1502 );
1503
1504 let col3 = batches[0].column(2).as_primitive::<T>();
1505 assert_eq!(col3.null_count(), 0);
1506 assert_eq!(
1507 col3.values(),
1508 &[
1509 38,
1510 123,
1511 34016123000000 / unit_in_nanos,
1512 49349190855000 / unit_in_nanos,
1513 34016123000000 / unit_in_nanos,
1514 52016123000000 / unit_in_nanos
1515 ]
1516 .map(T::Native::usize_as)
1517 );
1518 }
1519
1520 #[test]
1521 fn test_times() {
1522 test_time::<Time32MillisecondType>();
1523 test_time::<Time32SecondType>();
1524 test_time::<Time64MicrosecondType>();
1525 test_time::<Time64NanosecondType>();
1526 }
1527
1528 fn test_duration<T: ArrowTemporalType>() {
1529 let buf = r#"
1530 {"a": 1, "b": "2"}
1531 {"a": 3, "b": null}
1532 "#;
1533
1534 let schema = Arc::new(Schema::new(vec![
1535 Field::new("a", T::DATA_TYPE, true),
1536 Field::new("b", T::DATA_TYPE, true),
1537 ]));
1538
1539 let batches = do_read(buf, 1024, true, false, schema);
1540 assert_eq!(batches.len(), 1);
1541
1542 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1543 assert_eq!(col_a.null_count(), 0);
1544 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1545
1546 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1547 assert_eq!(col2.null_count(), 1);
1548 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1549 }
1550
1551 #[test]
1552 fn test_durations() {
1553 test_duration::<DurationNanosecondType>();
1554 test_duration::<DurationMicrosecondType>();
1555 test_duration::<DurationMillisecondType>();
1556 test_duration::<DurationSecondType>();
1557 }
1558
1559 #[test]
1560 fn test_delta_checkpoint() {
1561 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1562 let schema = Arc::new(Schema::new(vec![
1563 Field::new_struct(
1564 "protocol",
1565 vec![
1566 Field::new("minReaderVersion", DataType::Int32, true),
1567 Field::new("minWriterVersion", DataType::Int32, true),
1568 ],
1569 true,
1570 ),
1571 Field::new_struct(
1572 "add",
1573 vec![Field::new_map(
1574 "partitionValues",
1575 "key_value",
1576 Field::new("key", DataType::Utf8, false),
1577 Field::new("value", DataType::Utf8, true),
1578 false,
1579 false,
1580 )],
1581 true,
1582 ),
1583 ]));
1584
1585 let batches = do_read(json, 1024, true, false, schema);
1586 assert_eq!(batches.len(), 1);
1587
1588 let s: StructArray = batches.into_iter().next().unwrap().into();
1589 let opts = FormatOptions::default().with_null("null");
1590 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1591 assert_eq!(
1592 formatter.value(0).to_string(),
1593 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1594 );
1595 }
1596
1597 #[test]
1598 fn struct_nullability() {
1599 let do_test = |child: DataType| {
1600 let non_null = r#"{"foo": {}}"#;
1602 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1603 "foo",
1604 vec![Field::new("bar", child, false)],
1605 true,
1606 )]));
1607 let mut reader = ReaderBuilder::new(schema.clone())
1608 .build(Cursor::new(non_null.as_bytes()))
1609 .unwrap();
1610 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1613 let mut reader = ReaderBuilder::new(schema.clone())
1614 .build(Cursor::new(null.as_bytes()))
1615 .unwrap();
1616 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1620 let mut reader = ReaderBuilder::new(schema)
1621 .build(Cursor::new(null.as_bytes()))
1622 .unwrap();
1623 let batch = reader.next().unwrap().unwrap();
1624 assert_eq!(batch.num_columns(), 1);
1625 let foo = batch.column(0).as_struct();
1626 assert_eq!(foo.len(), 1);
1627 assert!(foo.is_null(0));
1628 assert_eq!(foo.num_columns(), 1);
1629
1630 let bar = foo.column(0);
1631 assert_eq!(bar.len(), 1);
1632 assert!(bar.is_null(0));
1634 };
1635
1636 do_test(DataType::Boolean);
1637 do_test(DataType::Int32);
1638 do_test(DataType::Utf8);
1639 do_test(DataType::Decimal128(2, 1));
1640 do_test(DataType::Timestamp(
1641 TimeUnit::Microsecond,
1642 Some("+00:00".into()),
1643 ));
1644 }
1645
1646 #[test]
1647 fn test_truncation() {
1648 let buf = r#"
1649 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1650 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1651 {"i64": -9223372036854775808, "u64": 0 }
1652 {"i64": "-9223372036854775808", "u64": 0 }
1653 "#;
1654
1655 let schema = Arc::new(Schema::new(vec![
1656 Field::new("i64", DataType::Int64, true),
1657 Field::new("u64", DataType::UInt64, true),
1658 ]));
1659
1660 let batches = do_read(buf, 1024, true, false, schema);
1661 assert_eq!(batches.len(), 1);
1662
1663 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1664 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1665
1666 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1667 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1668 }
1669
1670 #[test]
1671 fn test_timestamp_truncation() {
1672 let buf = r#"
1673 {"time": 9223372036854775807 }
1674 {"time": -9223372036854775808 }
1675 {"time": 9e5 }
1676 "#;
1677
1678 let schema = Arc::new(Schema::new(vec![Field::new(
1679 "time",
1680 DataType::Timestamp(TimeUnit::Nanosecond, None),
1681 true,
1682 )]));
1683
1684 let batches = do_read(buf, 1024, true, false, schema);
1685 assert_eq!(batches.len(), 1);
1686
1687 let i64 = batches[0]
1688 .column(0)
1689 .as_primitive::<TimestampNanosecondType>();
1690 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1691 }
1692
1693 #[test]
1694 fn test_strict_mode_no_missing_columns_in_schema() {
1695 let buf = r#"
1696 {"a": 1, "b": "2", "c": true}
1697 {"a": 2E0, "b": "4", "c": false}
1698 "#;
1699
1700 let schema = Arc::new(Schema::new(vec![
1701 Field::new("a", DataType::Int16, false),
1702 Field::new("b", DataType::Utf8, false),
1703 Field::new("c", DataType::Boolean, false),
1704 ]));
1705
1706 let batches = do_read(buf, 1024, true, true, schema);
1707 assert_eq!(batches.len(), 1);
1708
1709 let buf = r#"
1710 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1711 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1712 "#;
1713
1714 let schema = Arc::new(Schema::new(vec![
1715 Field::new("a", DataType::Int16, false),
1716 Field::new("b", DataType::Utf8, false),
1717 Field::new_struct(
1718 "c",
1719 vec![
1720 Field::new("a", DataType::Boolean, false),
1721 Field::new("b", DataType::Int16, false),
1722 ],
1723 false,
1724 ),
1725 ]));
1726
1727 let batches = do_read(buf, 1024, true, true, schema);
1728 assert_eq!(batches.len(), 1);
1729 }
1730
1731 #[test]
1732 fn test_strict_mode_missing_columns_in_schema() {
1733 let buf = r#"
1734 {"a": 1, "b": "2", "c": true}
1735 {"a": 2E0, "b": "4", "c": false}
1736 "#;
1737
1738 let schema = Arc::new(Schema::new(vec![
1739 Field::new("a", DataType::Int16, true),
1740 Field::new("c", DataType::Boolean, true),
1741 ]));
1742
1743 let err = ReaderBuilder::new(schema)
1744 .with_batch_size(1024)
1745 .with_strict_mode(true)
1746 .build(Cursor::new(buf.as_bytes()))
1747 .unwrap()
1748 .read()
1749 .unwrap_err();
1750
1751 assert_eq!(
1752 err.to_string(),
1753 "Json error: column 'b' missing from schema"
1754 );
1755
1756 let buf = r#"
1757 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1758 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1759 "#;
1760
1761 let schema = Arc::new(Schema::new(vec![
1762 Field::new("a", DataType::Int16, false),
1763 Field::new("b", DataType::Utf8, false),
1764 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1765 ]));
1766
1767 let err = ReaderBuilder::new(schema)
1768 .with_batch_size(1024)
1769 .with_strict_mode(true)
1770 .build(Cursor::new(buf.as_bytes()))
1771 .unwrap()
1772 .read()
1773 .unwrap_err();
1774
1775 assert_eq!(
1776 err.to_string(),
1777 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1778 );
1779 }
1780
1781 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1782 let file = File::open(path).unwrap();
1783 let mut reader = BufReader::new(file);
1784 let schema = schema.unwrap_or_else(|| {
1785 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1786 reader.rewind().unwrap();
1787 schema
1788 });
1789 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1790 builder.build(reader).unwrap()
1791 }
1792
1793 #[test]
1794 fn test_json_basic() {
1795 let mut reader = read_file("test/data/basic.json", None);
1796 let batch = reader.next().unwrap().unwrap();
1797
1798 assert_eq!(8, batch.num_columns());
1799 assert_eq!(12, batch.num_rows());
1800
1801 let schema = reader.schema();
1802 let batch_schema = batch.schema();
1803 assert_eq!(schema, batch_schema);
1804
1805 let a = schema.column_with_name("a").unwrap();
1806 assert_eq!(0, a.0);
1807 assert_eq!(&DataType::Int64, a.1.data_type());
1808 let b = schema.column_with_name("b").unwrap();
1809 assert_eq!(1, b.0);
1810 assert_eq!(&DataType::Float64, b.1.data_type());
1811 let c = schema.column_with_name("c").unwrap();
1812 assert_eq!(2, c.0);
1813 assert_eq!(&DataType::Boolean, c.1.data_type());
1814 let d = schema.column_with_name("d").unwrap();
1815 assert_eq!(3, d.0);
1816 assert_eq!(&DataType::Utf8, d.1.data_type());
1817
1818 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1819 assert_eq!(1, aa.value(0));
1820 assert_eq!(-10, aa.value(1));
1821 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1822 assert_eq!(2.0, bb.value(0));
1823 assert_eq!(-3.5, bb.value(1));
1824 let cc = batch.column(c.0).as_boolean();
1825 assert!(!cc.value(0));
1826 assert!(cc.value(10));
1827 let dd = batch.column(d.0).as_string::<i32>();
1828 assert_eq!("4", dd.value(0));
1829 assert_eq!("text", dd.value(8));
1830 }
1831
1832 #[test]
1833 fn test_json_empty_projection() {
1834 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1835 let batch = reader.next().unwrap().unwrap();
1836
1837 assert_eq!(0, batch.num_columns());
1838 assert_eq!(12, batch.num_rows());
1839 }
1840
1841 #[test]
1842 fn test_json_basic_with_nulls() {
1843 let mut reader = read_file("test/data/basic_nulls.json", None);
1844 let batch = reader.next().unwrap().unwrap();
1845
1846 assert_eq!(4, batch.num_columns());
1847 assert_eq!(12, batch.num_rows());
1848
1849 let schema = reader.schema();
1850 let batch_schema = batch.schema();
1851 assert_eq!(schema, batch_schema);
1852
1853 let a = schema.column_with_name("a").unwrap();
1854 assert_eq!(&DataType::Int64, a.1.data_type());
1855 let b = schema.column_with_name("b").unwrap();
1856 assert_eq!(&DataType::Float64, b.1.data_type());
1857 let c = schema.column_with_name("c").unwrap();
1858 assert_eq!(&DataType::Boolean, c.1.data_type());
1859 let d = schema.column_with_name("d").unwrap();
1860 assert_eq!(&DataType::Utf8, d.1.data_type());
1861
1862 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1863 assert!(aa.is_valid(0));
1864 assert!(!aa.is_valid(1));
1865 assert!(!aa.is_valid(11));
1866 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1867 assert!(bb.is_valid(0));
1868 assert!(!bb.is_valid(2));
1869 assert!(!bb.is_valid(11));
1870 let cc = batch.column(c.0).as_boolean();
1871 assert!(cc.is_valid(0));
1872 assert!(!cc.is_valid(4));
1873 assert!(!cc.is_valid(11));
1874 let dd = batch.column(d.0).as_string::<i32>();
1875 assert!(!dd.is_valid(0));
1876 assert!(dd.is_valid(1));
1877 assert!(!dd.is_valid(4));
1878 assert!(!dd.is_valid(11));
1879 }
1880
1881 #[test]
1882 fn test_json_basic_schema() {
1883 let schema = Schema::new(vec![
1884 Field::new("a", DataType::Int64, true),
1885 Field::new("b", DataType::Float32, false),
1886 Field::new("c", DataType::Boolean, false),
1887 Field::new("d", DataType::Utf8, false),
1888 ]);
1889
1890 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1891 let reader_schema = reader.schema();
1892 assert_eq!(reader_schema.as_ref(), &schema);
1893 let batch = reader.next().unwrap().unwrap();
1894
1895 assert_eq!(4, batch.num_columns());
1896 assert_eq!(12, batch.num_rows());
1897
1898 let schema = batch.schema();
1899
1900 let a = schema.column_with_name("a").unwrap();
1901 assert_eq!(&DataType::Int64, a.1.data_type());
1902 let b = schema.column_with_name("b").unwrap();
1903 assert_eq!(&DataType::Float32, b.1.data_type());
1904 let c = schema.column_with_name("c").unwrap();
1905 assert_eq!(&DataType::Boolean, c.1.data_type());
1906 let d = schema.column_with_name("d").unwrap();
1907 assert_eq!(&DataType::Utf8, d.1.data_type());
1908
1909 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1910 assert_eq!(1, aa.value(0));
1911 assert_eq!(100000000000000, aa.value(11));
1912 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1913 assert_eq!(2.0, bb.value(0));
1914 assert_eq!(-3.5, bb.value(1));
1915 }
1916
1917 #[test]
1918 fn test_json_basic_schema_projection() {
1919 let schema = Schema::new(vec![
1920 Field::new("a", DataType::Int64, true),
1921 Field::new("c", DataType::Boolean, false),
1922 ]);
1923
1924 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1925 let batch = reader.next().unwrap().unwrap();
1926
1927 assert_eq!(2, batch.num_columns());
1928 assert_eq!(2, batch.schema().fields().len());
1929 assert_eq!(12, batch.num_rows());
1930
1931 assert_eq!(batch.schema().as_ref(), &schema);
1932
1933 let a = schema.column_with_name("a").unwrap();
1934 assert_eq!(0, a.0);
1935 assert_eq!(&DataType::Int64, a.1.data_type());
1936 let c = schema.column_with_name("c").unwrap();
1937 assert_eq!(1, c.0);
1938 assert_eq!(&DataType::Boolean, c.1.data_type());
1939 }
1940
1941 #[test]
1942 fn test_json_arrays() {
1943 let mut reader = read_file("test/data/arrays.json", None);
1944 let batch = reader.next().unwrap().unwrap();
1945
1946 assert_eq!(4, batch.num_columns());
1947 assert_eq!(3, batch.num_rows());
1948
1949 let schema = batch.schema();
1950
1951 let a = schema.column_with_name("a").unwrap();
1952 assert_eq!(&DataType::Int64, a.1.data_type());
1953 let b = schema.column_with_name("b").unwrap();
1954 assert_eq!(
1955 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1956 b.1.data_type()
1957 );
1958 let c = schema.column_with_name("c").unwrap();
1959 assert_eq!(
1960 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1961 c.1.data_type()
1962 );
1963 let d = schema.column_with_name("d").unwrap();
1964 assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967 assert_eq!(1, aa.value(0));
1968 assert_eq!(-10, aa.value(1));
1969 assert_eq!(1627668684594000000, aa.value(2));
1970 let bb = batch.column(b.0).as_list::<i32>();
1971 let bb = bb.values().as_primitive::<Float64Type>();
1972 assert_eq!(9, bb.len());
1973 assert_eq!(2.0, bb.value(0));
1974 assert_eq!(-6.1, bb.value(5));
1975 assert!(!bb.is_valid(7));
1976
1977 let cc = batch
1978 .column(c.0)
1979 .as_any()
1980 .downcast_ref::<ListArray>()
1981 .unwrap();
1982 let cc = cc.values().as_boolean();
1983 assert_eq!(6, cc.len());
1984 assert!(!cc.value(0));
1985 assert!(!cc.value(4));
1986 assert!(!cc.is_valid(5));
1987 }
1988
1989 #[test]
1990 fn test_empty_json_arrays() {
1991 let json_content = r#"
1992 {"items": []}
1993 {"items": null}
1994 {}
1995 "#;
1996
1997 let schema = Arc::new(Schema::new(vec![Field::new(
1998 "items",
1999 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2000 true,
2001 )]));
2002
2003 let batches = do_read(json_content, 1024, false, false, schema);
2004 assert_eq!(batches.len(), 1);
2005
2006 let col1 = batches[0].column(0).as_list::<i32>();
2007 assert_eq!(col1.null_count(), 2);
2008 assert!(col1.value(0).is_empty());
2009 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2010 assert!(col1.is_null(1));
2011 assert!(col1.is_null(2));
2012 }
2013
2014 #[test]
2015 fn test_nested_empty_json_arrays() {
2016 let json_content = r#"
2017 {"items": [[],[]]}
2018 {"items": [[null, null],[null]]}
2019 "#;
2020
2021 let schema = Arc::new(Schema::new(vec![Field::new(
2022 "items",
2023 DataType::List(FieldRef::new(Field::new_list_field(
2024 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2025 true,
2026 ))),
2027 true,
2028 )]));
2029
2030 let batches = do_read(json_content, 1024, false, false, schema);
2031 assert_eq!(batches.len(), 1);
2032
2033 let col1 = batches[0].column(0).as_list::<i32>();
2034 assert_eq!(col1.null_count(), 0);
2035 assert_eq!(col1.value(0).len(), 2);
2036 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2037 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2038
2039 assert_eq!(col1.value(1).len(), 2);
2040 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2041 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2042 }
2043
2044 #[test]
2045 fn test_nested_list_json_arrays() {
2046 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2047 let a_struct_field = Field::new_struct(
2048 "a",
2049 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2050 true,
2051 );
2052 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2053 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2054 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2055 let json_content = r#"
2056 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2057 {"a": [{"b": false, "c": null}]}
2058 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2059 {"a": null}
2060 {"a": []}
2061 {"a": [null]}
2062 "#;
2063 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2064
2065 let d = StringArray::from(vec![
2067 Some("a_text"),
2068 Some("b_text"),
2069 None,
2070 Some("c_text"),
2071 Some("d_text"),
2072 None,
2073 None,
2074 ]);
2075 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2076 .len(7)
2077 .add_child_data(d.to_data())
2078 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2079 .build()
2080 .unwrap();
2081 let b = BooleanArray::from(vec![
2082 Some(true),
2083 Some(false),
2084 Some(false),
2085 Some(true),
2086 None,
2087 Some(true),
2088 None,
2089 ]);
2090 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2091 .len(7)
2092 .add_child_data(b.to_data())
2093 .add_child_data(c.clone())
2094 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2095 .build()
2096 .unwrap();
2097 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2098 .len(6)
2099 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2100 .add_child_data(a)
2101 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2102 .build()
2103 .unwrap();
2104 let expected = make_array(a_list);
2105
2106 let batch = reader.next().unwrap().unwrap();
2108 let read = batch.column(0);
2109 assert_eq!(read.len(), 6);
2110 let read: &ListArray = read.as_list::<i32>();
2112 let expected = expected.as_list::<i32>();
2113 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2114 assert_eq!(read.nulls(), expected.nulls());
2116 let struct_array = read.values().as_struct();
2118 let expected_struct_array = expected.values().as_struct();
2119
2120 assert_eq!(7, struct_array.len());
2121 assert_eq!(1, struct_array.null_count());
2122 assert_eq!(7, expected_struct_array.len());
2123 assert_eq!(1, expected_struct_array.null_count());
2124 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2126 let read_b = struct_array.column(0);
2128 assert_eq!(read_b.as_ref(), &b);
2129 let read_c = struct_array.column(1);
2130 assert_eq!(read_c.to_data(), c);
2131 let read_c = read_c.as_struct();
2132 let read_d = read_c.column(0);
2133 assert_eq!(read_d.as_ref(), &d);
2134
2135 assert_eq!(read, expected);
2136 }
2137
2138 #[test]
2139 fn test_skip_empty_lines() {
2140 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2141 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2142 let json_content = "
2143 {\"a\": 1}
2144 {\"a\": 2}
2145 {\"a\": 3}";
2146 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2147 let batch = reader.next().unwrap().unwrap();
2148
2149 assert_eq!(1, batch.num_columns());
2150 assert_eq!(3, batch.num_rows());
2151
2152 let schema = reader.schema();
2153 let c = schema.column_with_name("a").unwrap();
2154 assert_eq!(&DataType::Int64, c.1.data_type());
2155 }
2156
2157 #[test]
2158 fn test_with_multiple_batches() {
2159 let file = File::open("test/data/basic_nulls.json").unwrap();
2160 let mut reader = BufReader::new(file);
2161 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2162 reader.rewind().unwrap();
2163
2164 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2165 let mut reader = builder.build(reader).unwrap();
2166
2167 let mut num_records = Vec::new();
2168 while let Some(rb) = reader.next().transpose().unwrap() {
2169 num_records.push(rb.num_rows());
2170 }
2171
2172 assert_eq!(vec![5, 5, 2], num_records);
2173 }
2174
2175 #[test]
2176 fn test_timestamp_from_json_seconds() {
2177 let schema = Schema::new(vec![Field::new(
2178 "a",
2179 DataType::Timestamp(TimeUnit::Second, None),
2180 true,
2181 )]);
2182
2183 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2184 let batch = reader.next().unwrap().unwrap();
2185
2186 assert_eq!(1, batch.num_columns());
2187 assert_eq!(12, batch.num_rows());
2188
2189 let schema = reader.schema();
2190 let batch_schema = batch.schema();
2191 assert_eq!(schema, batch_schema);
2192
2193 let a = schema.column_with_name("a").unwrap();
2194 assert_eq!(
2195 &DataType::Timestamp(TimeUnit::Second, None),
2196 a.1.data_type()
2197 );
2198
2199 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2200 assert!(aa.is_valid(0));
2201 assert!(!aa.is_valid(1));
2202 assert!(!aa.is_valid(2));
2203 assert_eq!(1, aa.value(0));
2204 assert_eq!(1, aa.value(3));
2205 assert_eq!(5, aa.value(7));
2206 }
2207
2208 #[test]
2209 fn test_timestamp_from_json_milliseconds() {
2210 let schema = Schema::new(vec![Field::new(
2211 "a",
2212 DataType::Timestamp(TimeUnit::Millisecond, None),
2213 true,
2214 )]);
2215
2216 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2217 let batch = reader.next().unwrap().unwrap();
2218
2219 assert_eq!(1, batch.num_columns());
2220 assert_eq!(12, batch.num_rows());
2221
2222 let schema = reader.schema();
2223 let batch_schema = batch.schema();
2224 assert_eq!(schema, batch_schema);
2225
2226 let a = schema.column_with_name("a").unwrap();
2227 assert_eq!(
2228 &DataType::Timestamp(TimeUnit::Millisecond, None),
2229 a.1.data_type()
2230 );
2231
2232 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2233 assert!(aa.is_valid(0));
2234 assert!(!aa.is_valid(1));
2235 assert!(!aa.is_valid(2));
2236 assert_eq!(1, aa.value(0));
2237 assert_eq!(1, aa.value(3));
2238 assert_eq!(5, aa.value(7));
2239 }
2240
2241 #[test]
2242 fn test_date_from_json_milliseconds() {
2243 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2244
2245 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2246 let batch = reader.next().unwrap().unwrap();
2247
2248 assert_eq!(1, batch.num_columns());
2249 assert_eq!(12, batch.num_rows());
2250
2251 let schema = reader.schema();
2252 let batch_schema = batch.schema();
2253 assert_eq!(schema, batch_schema);
2254
2255 let a = schema.column_with_name("a").unwrap();
2256 assert_eq!(&DataType::Date64, a.1.data_type());
2257
2258 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2259 assert!(aa.is_valid(0));
2260 assert!(!aa.is_valid(1));
2261 assert!(!aa.is_valid(2));
2262 assert_eq!(1, aa.value(0));
2263 assert_eq!(1, aa.value(3));
2264 assert_eq!(5, aa.value(7));
2265 }
2266
2267 #[test]
2268 fn test_time_from_json_nanoseconds() {
2269 let schema = Schema::new(vec![Field::new(
2270 "a",
2271 DataType::Time64(TimeUnit::Nanosecond),
2272 true,
2273 )]);
2274
2275 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2276 let batch = reader.next().unwrap().unwrap();
2277
2278 assert_eq!(1, batch.num_columns());
2279 assert_eq!(12, batch.num_rows());
2280
2281 let schema = reader.schema();
2282 let batch_schema = batch.schema();
2283 assert_eq!(schema, batch_schema);
2284
2285 let a = schema.column_with_name("a").unwrap();
2286 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2287
2288 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2289 assert!(aa.is_valid(0));
2290 assert!(!aa.is_valid(1));
2291 assert!(!aa.is_valid(2));
2292 assert_eq!(1, aa.value(0));
2293 assert_eq!(1, aa.value(3));
2294 assert_eq!(5, aa.value(7));
2295 }
2296
2297 #[test]
2298 fn test_json_iterator() {
2299 let file = File::open("test/data/basic.json").unwrap();
2300 let mut reader = BufReader::new(file);
2301 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2302 reader.rewind().unwrap();
2303
2304 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2305 let reader = builder.build(reader).unwrap();
2306 let schema = reader.schema();
2307 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2308
2309 let mut sum_num_rows = 0;
2310 let mut num_batches = 0;
2311 let mut sum_a = 0;
2312 for batch in reader {
2313 let batch = batch.unwrap();
2314 assert_eq!(8, batch.num_columns());
2315 sum_num_rows += batch.num_rows();
2316 num_batches += 1;
2317 let batch_schema = batch.schema();
2318 assert_eq!(schema, batch_schema);
2319 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2320 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2321 }
2322 assert_eq!(12, sum_num_rows);
2323 assert_eq!(3, num_batches);
2324 assert_eq!(100000000000011, sum_a);
2325 }
2326
2327 #[test]
2328 fn test_decoder_error() {
2329 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2330 "a",
2331 vec![Field::new("child", DataType::Int32, false)],
2332 true,
2333 )]));
2334
2335 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2336 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2337 assert!(decoder.tape_decoder.has_partial_row());
2338 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2339 let _ = decoder.flush().unwrap_err();
2340 assert!(decoder.tape_decoder.has_partial_row());
2341 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2342
2343 let parse_err = |s: &str| {
2344 ReaderBuilder::new(schema.clone())
2345 .build(Cursor::new(s.as_bytes()))
2346 .unwrap()
2347 .next()
2348 .unwrap()
2349 .unwrap_err()
2350 .to_string()
2351 };
2352
2353 let err = parse_err(r#"{"a": 123}"#);
2354 assert_eq!(
2355 err,
2356 "Json error: whilst decoding field 'a': expected { got 123"
2357 );
2358
2359 let err = parse_err(r#"{"a": ["bar"]}"#);
2360 assert_eq!(
2361 err,
2362 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2363 );
2364
2365 let err = parse_err(r#"{"a": []}"#);
2366 assert_eq!(
2367 err,
2368 "Json error: whilst decoding field 'a': expected { got []"
2369 );
2370
2371 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2372 assert_eq!(
2373 err,
2374 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2375 );
2376
2377 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2378 assert_eq!(
2379 err,
2380 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2381 );
2382
2383 let err = parse_err(r#"{"a": true}"#);
2384 assert_eq!(
2385 err,
2386 "Json error: whilst decoding field 'a': expected { got true"
2387 );
2388
2389 let err = parse_err(r#"{"a": false}"#);
2390 assert_eq!(
2391 err,
2392 "Json error: whilst decoding field 'a': expected { got false"
2393 );
2394
2395 let err = parse_err(r#"{"a": "foo"}"#);
2396 assert_eq!(
2397 err,
2398 "Json error: whilst decoding field 'a': expected { got \"foo\""
2399 );
2400
2401 let err = parse_err(r#"{"a": {"child": false}}"#);
2402 assert_eq!(
2403 err,
2404 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2405 );
2406
2407 let err = parse_err(r#"{"a": {"child": []}}"#);
2408 assert_eq!(
2409 err,
2410 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2411 );
2412
2413 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2414 assert_eq!(
2415 err,
2416 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2417 );
2418
2419 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2420 assert_eq!(
2421 err,
2422 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2423 );
2424 }
2425
2426 #[test]
2427 fn test_serialize_timestamp() {
2428 let json = vec![
2429 json!({"timestamp": 1681319393}),
2430 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2431 ];
2432 let schema = Schema::new(vec![Field::new(
2433 "timestamp",
2434 DataType::Timestamp(TimeUnit::Second, None),
2435 true,
2436 )]);
2437 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2438 .build_decoder()
2439 .unwrap();
2440 decoder.serialize(&json).unwrap();
2441 let batch = decoder.flush().unwrap().unwrap();
2442 assert_eq!(batch.num_rows(), 2);
2443 assert_eq!(batch.num_columns(), 1);
2444 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2445 assert_eq!(values.values(), &[1681319393, -7200]);
2446 }
2447
2448 #[test]
2449 fn test_serialize_decimal() {
2450 let json = vec![
2451 json!({"decimal": 1.234}),
2452 json!({"decimal": "1.234"}),
2453 json!({"decimal": 1234}),
2454 json!({"decimal": "1234"}),
2455 ];
2456 let schema = Schema::new(vec![Field::new(
2457 "decimal",
2458 DataType::Decimal128(10, 3),
2459 true,
2460 )]);
2461 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2462 .build_decoder()
2463 .unwrap();
2464 decoder.serialize(&json).unwrap();
2465 let batch = decoder.flush().unwrap().unwrap();
2466 assert_eq!(batch.num_rows(), 4);
2467 assert_eq!(batch.num_columns(), 1);
2468 let values = batch.column(0).as_primitive::<Decimal128Type>();
2469 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2470 }
2471
2472 #[test]
2473 fn test_serde_field() {
2474 let field = Field::new("int", DataType::Int32, true);
2475 let mut decoder = ReaderBuilder::new_with_field(field)
2476 .build_decoder()
2477 .unwrap();
2478 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2479 let b = decoder.flush().unwrap().unwrap();
2480 let values = b.column(0).as_primitive::<Int32Type>().values();
2481 assert_eq!(values, &[1, 2, 3, 4]);
2482 }
2483
2484 #[test]
2485 fn test_serde_large_numbers() {
2486 let field = Field::new("int", DataType::Int64, true);
2487 let mut decoder = ReaderBuilder::new_with_field(field)
2488 .build_decoder()
2489 .unwrap();
2490
2491 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2492 let b = decoder.flush().unwrap().unwrap();
2493 let values = b.column(0).as_primitive::<Int64Type>().values();
2494 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2495
2496 let field = Field::new(
2497 "int",
2498 DataType::Timestamp(TimeUnit::Microsecond, None),
2499 true,
2500 );
2501 let mut decoder = ReaderBuilder::new_with_field(field)
2502 .build_decoder()
2503 .unwrap();
2504
2505 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2506 let b = decoder.flush().unwrap().unwrap();
2507 let values = b
2508 .column(0)
2509 .as_primitive::<TimestampMicrosecondType>()
2510 .values();
2511 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2512 }
2513
2514 #[test]
2515 fn test_coercing_primitive_into_string_decoder() {
2516 let buf = &format!(
2517 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2518 (i32::MAX as i64 + 10),
2519 i64::MAX - 10
2520 );
2521 let schema = Schema::new(vec![
2522 Field::new("a", DataType::Float64, true),
2523 Field::new("b", DataType::Utf8, true),
2524 Field::new("c", DataType::Utf8, true),
2525 ]);
2526 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2527 let schema_ref = Arc::new(schema);
2528
2529 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2531 let mut decoder = reader.build_decoder().unwrap();
2532 decoder.serialize(json_array.as_slice()).unwrap();
2533 let batch = decoder.flush().unwrap().unwrap();
2534 assert_eq!(
2535 batch,
2536 RecordBatch::try_new(
2537 schema_ref,
2538 vec![
2539 Arc::new(Float64Array::from(vec![
2540 1.0,
2541 2.0,
2542 (i32::MAX as i64 + 10) as f64,
2543 (i64::MAX - 10) as f64
2544 ])),
2545 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2546 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2547 ]
2548 )
2549 .unwrap()
2550 );
2551 }
2552
2553 fn _parse_structs(
2558 row: &str,
2559 struct_mode: StructMode,
2560 fields: Fields,
2561 as_struct: bool,
2562 ) -> Result<RecordBatch, ArrowError> {
2563 let builder = if as_struct {
2564 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2565 } else {
2566 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2567 };
2568 builder
2569 .with_struct_mode(struct_mode)
2570 .build(Cursor::new(row.as_bytes()))
2571 .unwrap()
2572 .next()
2573 .unwrap()
2574 }
2575
2576 #[test]
2577 fn test_struct_decoding_list_length() {
2578 use arrow_array::array;
2579
2580 let row = "[1, 2]";
2581
2582 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2583 let too_few_fields = Fields::from(fields.clone());
2584 fields.push(Field::new("b", DataType::Int32, true));
2585 let correct_fields = Fields::from(fields.clone());
2586 fields.push(Field::new("c", DataType::Int32, true));
2587 let too_many_fields = Fields::from(fields.clone());
2588
2589 let parse = |fields: Fields, as_struct: bool| {
2590 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2591 };
2592
2593 let expected_row = StructArray::new(
2594 correct_fields.clone(),
2595 vec![
2596 Arc::new(array::Int32Array::from(vec![1])),
2597 Arc::new(array::Int32Array::from(vec![2])),
2598 ],
2599 None,
2600 );
2601 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2602
2603 assert_eq!(
2604 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2605 "Json error: found extra columns for 1 fields".to_string()
2606 );
2607 assert_eq!(
2608 parse(too_few_fields, false).unwrap_err().to_string(),
2609 "Json error: found extra columns for 1 fields".to_string()
2610 );
2611 assert_eq!(
2612 parse(correct_fields.clone(), true).unwrap(),
2613 RecordBatch::try_new(
2614 Arc::new(Schema::new(vec![row_field])),
2615 vec![Arc::new(expected_row.clone())]
2616 )
2617 .unwrap()
2618 );
2619 assert_eq!(
2620 parse(correct_fields, false).unwrap(),
2621 RecordBatch::from(expected_row)
2622 );
2623 assert_eq!(
2624 parse(too_many_fields.clone(), true)
2625 .unwrap_err()
2626 .to_string(),
2627 "Json error: found 2 columns for 3 fields".to_string()
2628 );
2629 assert_eq!(
2630 parse(too_many_fields, false).unwrap_err().to_string(),
2631 "Json error: found 2 columns for 3 fields".to_string()
2632 );
2633 }
2634
2635 #[test]
2636 fn test_struct_decoding() {
2637 use arrow_array::builder;
2638
2639 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2640 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2641 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2642
2643 let struct_fields = Fields::from(vec![
2644 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2645 Field::new_map(
2646 "c",
2647 "entries",
2648 Field::new("keys", DataType::Utf8, false),
2649 Field::new("values", DataType::Int32, true),
2650 false,
2651 false,
2652 ),
2653 ]);
2654
2655 let list_array =
2656 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2657
2658 let map_array = {
2659 let mut map_builder = builder::MapBuilder::new(
2660 None,
2661 builder::StringBuilder::new(),
2662 builder::Int32Builder::new(),
2663 );
2664 map_builder.keys().append_value("d");
2665 map_builder.values().append_value(3);
2666 map_builder.append(true).unwrap();
2667 map_builder.finish()
2668 };
2669
2670 let struct_array = StructArray::new(
2671 struct_fields.clone(),
2672 vec![Arc::new(list_array), Arc::new(map_array)],
2673 None,
2674 );
2675
2676 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2677 let schema = Arc::new(Schema::new(fields.clone()));
2678 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2679
2680 let parse = |row: &str, struct_mode: StructMode| {
2681 _parse_structs(row, struct_mode, fields.clone(), false)
2682 };
2683
2684 assert_eq!(
2685 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2686 expected
2687 );
2688 assert_eq!(
2689 parse(nested_list_json, StructMode::ObjectOnly)
2690 .unwrap_err()
2691 .to_string(),
2692 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2693 );
2694 assert_eq!(
2695 parse(nested_mixed_json, StructMode::ObjectOnly)
2696 .unwrap_err()
2697 .to_string(),
2698 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2699 );
2700
2701 assert_eq!(
2702 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2703 expected
2704 );
2705 assert_eq!(
2706 parse(nested_object_json, StructMode::ListOnly)
2707 .unwrap_err()
2708 .to_string(),
2709 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2710 );
2711 assert_eq!(
2712 parse(nested_mixed_json, StructMode::ListOnly)
2713 .unwrap_err()
2714 .to_string(),
2715 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2716 );
2717 }
2718
2719 #[test]
2725 fn test_struct_decoding_empty_list() {
2726 let int_field = Field::new("a", DataType::Int32, true);
2727 let struct_field = Field::new(
2728 "r",
2729 DataType::Struct(Fields::from(vec![int_field.clone()])),
2730 true,
2731 );
2732
2733 let parse = |row: &str, as_struct: bool, field: Field| {
2734 _parse_structs(
2735 row,
2736 StructMode::ListOnly,
2737 Fields::from(vec![field]),
2738 as_struct,
2739 )
2740 };
2741
2742 assert_eq!(
2744 parse("[]", true, struct_field.clone())
2745 .unwrap_err()
2746 .to_string(),
2747 "Json error: found 0 columns for 1 fields".to_owned()
2748 );
2749 assert_eq!(
2750 parse("[]", false, int_field.clone())
2751 .unwrap_err()
2752 .to_string(),
2753 "Json error: found 0 columns for 1 fields".to_owned()
2754 );
2755 assert_eq!(
2756 parse("[]", false, struct_field.clone())
2757 .unwrap_err()
2758 .to_string(),
2759 "Json error: found 0 columns for 1 fields".to_owned()
2760 );
2761 assert_eq!(
2762 parse("[[]]", false, struct_field.clone())
2763 .unwrap_err()
2764 .to_string(),
2765 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2766 );
2767 }
2768
2769 #[test]
2770 fn test_decode_list_struct_with_wrong_types() {
2771 let int_field = Field::new("a", DataType::Int32, true);
2772 let struct_field = Field::new(
2773 "r",
2774 DataType::Struct(Fields::from(vec![int_field.clone()])),
2775 true,
2776 );
2777
2778 let parse = |row: &str, as_struct: bool, field: Field| {
2779 _parse_structs(
2780 row,
2781 StructMode::ListOnly,
2782 Fields::from(vec![field]),
2783 as_struct,
2784 )
2785 };
2786
2787 assert_eq!(
2789 parse(r#"[["a"]]"#, false, struct_field.clone())
2790 .unwrap_err()
2791 .to_string(),
2792 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2793 );
2794 assert_eq!(
2795 parse(r#"[["a"]]"#, true, struct_field.clone())
2796 .unwrap_err()
2797 .to_string(),
2798 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2799 );
2800 assert_eq!(
2801 parse(r#"["a"]"#, true, int_field.clone())
2802 .unwrap_err()
2803 .to_string(),
2804 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2805 );
2806 assert_eq!(
2807 parse(r#"["a"]"#, false, int_field.clone())
2808 .unwrap_err()
2809 .to_string(),
2810 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2811 );
2812 }
2813}